An introductory guide to understanding the core concepts of Kinetica.
A type is analogous to a traditional database schema for a table. Before data
can be stored in Kinetica, a type must be specified for that data. For every
type, Kinetica assigns a unique GUID. Kinetica will use the same GUID for all
types with identical characteristics.
Every type in Kinetica consists of the following:
- Type Label (text to identify this type)
- Type Schema (list of attributes with their primitive types)
- Annotation (Security) Attribute (optional)
A type label serves as a tagging mechanism for the type. The type label can
be any text string specified by the client. The type label serves two purposes.
First, it identifies tables with similar data. Second, it helps determine a
type’s uniqueness.
A type schema consists of a set of column names and their respective primitive
types. Each column can also be assigned a number of pre-defined properties
that determine aspects of that column, like searchability and whether or not the
column is indexed.
A given column in a type schema must be of one of the following
primitive types:
General Types |
Kinetica-implemented Types | Minimum Value |
Maximum Value |
int |
integer (default) |
-231 |
231 - 1 |
int16 |
-215 |
215 - 1 |
int8 |
-27 |
27 - 1 |
long |
long (default) |
-263 |
263 - 1 |
timestamp |
-263 |
263 - 1 |
float |
float (default) |
-(2-223)*2127 |
(2-223)*2127 |
double |
double (default) |
-(2-252)*21023 |
(2-252)*21023 |
string |
string (default) |
(empty string) |
(10,000,000 characters) |
char256 |
(empty string) |
(256 characters) |
char128 |
(empty string) |
(128 characters) |
char64 |
(empty string) |
(64 characters) |
char32 |
(empty string) |
(32 characters) |
char16 |
(empty string) |
(16 characters) |
char8 |
(empty string) |
(8 characters) |
char4 |
(empty string) |
(4 characters) |
char2 |
(empty string) |
(2 characters) |
char1 |
(empty string) |
(1 character) |
ipv4 |
0.0.0.0 |
255.255.255.255 |
bytes |
bytes (default) |
(empty array) |
(no configured limit) |
All data types other than bytes are queryable by Kinetica. Columns that are of
type bytes are for store-only.
Kinetica has an additional layer of semantic regarding column data type. At the
time of creating a type, a column can be given any number of the supported
properties which give the column data type special meaning. Some properties
enhance query performance, while others specify sharding or other special
qualities. Here is a list of currently supported column properties:
Properties |
Description |
data |
Default property for all numeric and string type columns;
makes the column available for use in query expressions. |
text_search |
Enables full text search for string columns. Can be set
independently of data , disk_optimized , & store_only . |
store_only |
Values are persisted, but only able to be retrieved via
/get/records or similar call; the column will not be
able to be used in a filter or aggregation expression. It is
mutually exclusive with the data property. Any bytes
column will be store_only by default. This property reduces
system memory usage. |
disk_optimized |
Prevents variable-width strings from being written to an
indexing service, saving disk space at the cost of some
functionality. Disallows /filter/bystring in
contains, starts with, & regex modes, and
/aggregate/groupby & /aggregate/unique;
however, /aggregate/groupby will still function when
a count or count_distinct is the grouped operation.
Requires the data property. |
timestamp |
Values represent timestamps in milliseconds since the Unix
epoch: Jan 1 1970 00:00:00. Valid only for long columns. |
char1 |
Values are strings of up to 1 character. Optimizes memory,
disk, and query performance for the associated string column. |
char2 |
Values are strings of up to 2 characters. Optimizes memory,
disk, and query performance for the associated string column. |
char4 |
Values are strings of up to 4 characters. Optimizes memory,
disk, and query performance for the associated string column. |
char8 |
Values are strings of up to 8 characters. Optimizes memory,
disk, and query performance for the associated string column. |
char16 |
Values are strings of up to 16 characters. Optimizes memory,
disk, and query performance for the associated string column. |
char32 |
Values are strings of up to 32 characters. Optimizes memory,
disk, and query performance for the associated string column. |
char64 |
Values are strings of up to 64 characters. Optimizes memory,
disk, and query performance for the associated string column. |
char128 |
Values are strings of up to 128 characters. Optimizes memory,
disk, and query performance for the associated string column. |
char256 |
Values are strings of up to 256 characters. Optimizes memory,
disk, and query performance for the associated string column. |
int8 |
Values are limited to 8-bit signed integers. Optimizes memory
and query performance for the associated int column. |
int16 |
Values are limited to 16-bit signed integers. Optimizes memory
and query performance for the associated int column. |
ipv4 |
Values represent dotted decimal IPv4 addresses of the form:
A.B.C.D where A , B , C and D are between
0 and 255 , inclusive; (e.g. 192.168.1.100 ).
Optimizes memory, disk, and query performance for the associated
string column. |
primary_key |
Makes this column part of (or the entire) primary key |
shard_key |
Makes this column part of (or the entire) shard key |
The concept of tables is at the heart of Kinetica interactions. A table
is a data container associated with a specific type schema (set of columns &
types), much like tables in other database platforms. When querying a table,
the result set will also have a single type schema associated with it.
Tables may exist either as top-level objects or as members of collections.
Each table is identified by a name, which must meet the following criteria:
- Between 1 and 200 characters long
- Alphanumeric, including spaces and these symbols:
_
-
(
)
{
}
[
]
.
:
- First character is alphanumeric or an underscore
- Unique system-wide--cannot have the same name as another table, view, or
collection, regardless of whether it is top-level or a member of a
collection
Table data can be distributed across the Kinetica cluster using one of two
methods: sharding & replication.
Sharding is the distribution of table data by hashing a particular value for
each record, and by that hash, determining on which Kinetica cluster node the
record will reside.
The benefit of sharding is that it allows distributed queries to be run
against a given data set, with each node responsible for managing its portion of
the overall data set, while only storing each record once across the cluster.
The parallelism inherent in the distributed queries allows for the query
performance to scale linearly with the addition of each cluster node.
A limitation of sharding is that two sharded tables can only be joined
together if they are sharded in the same way, so that their corresponding
records will all reside on the same nodes. Given that, in a typical database
model, each pair of related tables is associated by a different key, a single
query may only be able to join together at most two sharded tables along their
relation.
Since sharding maximizes data storage & query efficiency, it is recommended
that the largest table in the system (e.g. the fact table in a data warehouse)
be sharded. The second largest table against which that table is joined
should also be sharded along the join relation (e.g. the column on each side of
the foreign key relationship).
For example, if the largest joined tables in a system were customer
&
order
, and there was a foreign key relationship between the customer_id
column of the order
table & the id
column of the customer
table,
the order
table should be sharded on customer_id
and the customer
table should be sharded on id
.
Replication is the distribution of table data by locating its entire data
set on every Kinetica cluster node simultaneously, instead of being
distributed across them.
The benefit of replication is that it allows tables to be joined together
when those tables are not sharded on the columns being associated.
Normally, joining two tables together requires them being joined on their
shard keys, so that the joining of the two data sets can happen locally on
each processor shard. Since replicated data sets exist on all shards,
they appear local to any other data set, and can be joined on each shard as if
they were local. The result sets from each shard are then accumulated into
one and returned as the final result set.
Since replicated tables exist in their entirety on all processors, it is
recommended that they be relatively small. For example, a table containing
one gigabyte of data, when replicated across a 10-node cluster will occupy 10
gigabytes of cluster memory overall.
A table is specified as replicated at creation time, by calling the
/create/table endpoint with the is_replicated
option set to
true
.
For example, in Python,:
gpudb.create_table(
table_name = set_id,
type_id = type_id,
options = {'is_replicated': 'true'}
)
Primary key is designation that can be applied to a single table column or
set of columns (in the case of a composite primary key) to ensure the
uniqueness of the data contained in the keyed column(s).
The uniqueness constraint is enforced upon insert in two ways:
- If a record to be inserted has key values that match those of an already
existing record in the target table, the new record’s values will either:
- Overwrite the existing record’s values, if the
update_on_existing_pk
option is set to true
- Be ignored (the new record effectively discarded), if the
update_on_existing_pk
option is set to false
or not set
- If two or more records within a given batch insert have the same key values,
with respect to the primary key of the target table, the entire batch of
records to insert is rejected
By default, a table has no primary key. One must be explicitly designated
in the creation of the type schema associated with the table. Only one
primary key can exist per table.
The primary key for a table becomes its implicit shard key, used for
distributing its records across processors. This shard key designation can
only be overridden when the primary key is a composite primary key and one
or more of the composite primary key columns is explicitly designated as the
shard key.
The primary key designation also applies a hash-based index to the column.
This index optimizes the performance of the following operations, when those
operations are given an equality-based (e.g. (x = 0)) filter expression:
Lastly, the primary key designation enables the column to serve as the target
of a foreign key.
Assigning a primary key to a table column requires two steps. The first is
to create a type schema, marking the primary key field with the
primary_key
property. For example, to create a product type schema with a
primary key on product_id
, in Python:
response = gpudb.create_type(
type_definition = """{
"type": "record",
"name": "product_type",
"fields": [
{"name":"product_id","type":"int"},
{"name":"product_name","type":"string"}
]
}""",
label = 'product_type',
properties = {'product_id': ['primary_key','int16']}
)
Note that the type_definition
is a JSON string defining the type schema.
The second step is to create a table with that type schema, using the
type_id
that is returned from the create_type
call. Continuing from
the previous example:
gpudb.create_table(
table_name = 'product',
type_id = response['type_id']
)
Shard key is designation that can be applied to a single table column or set
of columns (in the case of a composite shard key) whose values are used in
distributing records across processors. This distribution allows for processing
of queries against a sharded table to be performed in parallel.
By default, a hash is computed for each record in a table and serves as the key
by which the associated record is distributed to its corresponding processor, or
shard. A shard key can be explicitly designated by assigning the
SHARD_KEY
property to one or more columns of a table. A shard key is
implicitly designated when a primary key is assigned to a table; all
columns involved in the primary key are used as the shard key. If a
primary key exists on a table, one or more of the columns comprising the
primary key can be designated as the shard key; columns not part of the
primary key may not be given this designation.
Only one shard key can exist per table.
A foreign key is a tuning option for joining tables. It acts as a
relational index from a source table to corresponding records in a target
table. As new records are inserted into the source table, the index is
updated with references to the target records associated with them. If a
foreign key is not used, the join relationships are established during
query time.
In order for the linkage to be made successfully, it must connect the
shard key column of a source table with either the similarly sharded
primary key column of a target table or the primary key column of a
replicated target table. The sharding requirement exists to ensure the
related records of each table being joined are located on the same
processing node. The replication alternative alleviates the sharding
requirement, as all of the records from the replicated target table will
exist on all processor nodes, and thus, appear to be local to all records from
the source table involved in the relation.
Foreign keys link single source columns to single primary key target
columns; multi-column foreign keys are not supported.
Multiple foreign keys can exist on a source table. Multiple foreign keys
may also exist on target tables; though, since foreign key targets must be
primary keys, the foreign keys would all have to target the same
primary key column.
Foreign keys do not provide referential integrity checks.
An index can be applied to one or more individual table columns to improve
the performance of operations applied to the table using the indexed column
in a filter or join expression.
The index is implemented as a b-tree, which provides performance improvements
for both equality-based (e.g. (x = 0)) and range-based (e.g. (x > 0)) filter
criteria on individual columns. Multi-column and function-based indexes are
not supported at this time.
As primary key indexes only optimize the performance of equality-based
filter criteria, an index can be applied to a primary key column for a
performance boost on range-based filter criteria--the two index types are not
mutually exclusive.
Indexes optimize the performance of the following operations, when those
operations are given an equality-based or range-based filter expression:
An index will additionally improve the performance of the following
operations:
To apply an index to a column, the /alter/table endpoint should be
called with the create_index
option. For example, in Python:
retobj = gpudb.alter_table(
table_name = '<name_of_table>',
action = 'create_index'
value = '<name_of_column>'
)
A view is a queryable filter of a table, collection, or another view. A
new view is created as a result of each filter query run within Kinetica,
and as such, is also referred to as a result table.
When a filter operation is performed on a table or view, the resulting
view will be created as a top-level entity (not as part of a collection).
When a filter operation is performed on a collection, a new collection is
created containing the result set views, mirroring the original collection.
Only tables in the queried collection that contained the queried columns
will have corresponding views within the result collection.
Views cannot be modified once created with one exception: a view can be
updated to insert the missing data points for a series from the underlying
table, using /update/records/byseries.
A view can be used in place of a table for queries, allowing
query chaining, by filtering the views created by previous filters.
When the record contained in a view is modified by an update of the underlying
table, the record is removed from the view, regardless of whether the update
impacted the record's qualifications for membership in the filtered result set.
When a table is cleared (dropped) all the views over the table are also
automatically cleared.
As they are result sets, and given the need to minimize memory usage, views are
given a default time-to-live, after which they will expire and be removed from
memory. Each access of the view causes the remaining time-to-live to be
reset. Thus, a view accessed more frequently than its time-to-live will
effectively remain in memory indefinitely.
Views have the same naming criteria as tables.
Query Chaining in Kinetica is defined as performing a filter on a view,
which itself was created from a previous filter operation. Kinetica allows
access to all intermediate views after each query. This means that each
view can be used to support multiple subsequent filters. Also, given a long
sequence of queries, a trace can be performed through the progression of each
query to help refine analysis.
As an example, consider an analysis that consisted of three queries chained in
sequence. If, at the end of the chain, the expected results were not returned,
the result of the second query can be examined. If that query consisted of
several records that were expected in the final analysis, it could be deduced
that a problem exists with the third query.
A collection is a container for tables, somewhat akin to a schema in other
databases. The contained tables can be of uniform or heterogeneous
type schemas.
A collection can also be a container of views. A view collection is what
results from performing a filter operation against another collection. Other
types of views, like joins, can also be created within collections.
Besides providing a means for logical grouping of tables, a collection
provides the ability to query the contained tables, regardless of the mixture
of type schemas, for columns shared between all tables. Tables within the
collection that lack the queried-for columns will simply not contribute to the
resulting data set.
For example, a transportation department may have two teams gathering ongoing
spatial information in their respective jurisdictions. Team A ingests
geo-referenced objects from Twitter into a table named TWITTER
inside of
the collection MASTER
. Their objects have X
and Y
columns
corresponding to longitude and latitude, as well as TIMESTAMP
and other
columns. Meanwhile, Team B collects vehicle movement information, adding
their objects with X
, Y
, TIMESTAMP
, TRACK_ID
and other columns
to a table named VEHICLE_TRACKS
also inside the MASTER
collection.
Now, queries for X
, Y
, & TIMESTAMP
on the collection MASTER
will be applied to both tables.
Collections do not themselves have a time-to-live in the way that views
do. However, a collection will be removed from memory automatically when the
last table or view within it is removed, either directly or via expiration.
The deletion event itself is what triggers the removal of the collection;
thus, a collection created with no tables or views in it will remain in
memory, though empty, as no deletion event of a contained table or view
within it will trigger its removal.
Setting a time-to-live on a collection will set the time-to-live of every
table & view contained within it. This creates an effective time-to-live
for the collection, as each access of a member of the collection will extend
its life.
Collections have the same naming criteria as tables.
Kinetica accepts expressions as inputs while querying data. These expression are
mathematical expression involving one or more constants (both numeric and
string) and table columns. As described in the sub-sections below, these
expressions can be column, filter, or aggregation expressions. The expressions
follow certain constraints based on where they are used, but all the expressions
should follow the basic guidelines outlined below:
- Use parentheses liberally to ensure correct order-of-operations.
- Constants:
- String constants must be enclosed in single quotes or double quotes.
- Numerical constants can be expressed as:
- decimal integers (
31242
)
- hex integers (
0x3420123a
)
- doubles (
3.41e5
)
- floats (
3.1415F
)
- Operators:
- Mathematical:
+
-
*
/
- Bitwise:
&
|
<<
>>
~
^
- Comparison:
>
<
>=
<=
==
=
!=
<>
in
- Logical (when applied directly to numeric columns, these will interpret
non-zero values as true and zero values as false, returning will return
1
for true and 0
for false):
and
- both arguments true
or
- either argument true
xor
- one argument true, one false
not
- argument false
!
- synonym for not
- Functions:
- Logical:
if(C,T,F)
- if C
is true, return T
, otherwise return F
C
- any true/false condition (when an integer column is used
directly, this function will interpret non-zero values as true and zero
values as false)
T
- any numeric value
F
- any numeric value
- Trigonometric:
sin
cos
tan
cot
asin
acos
atan
atan2
atn2
degrees
radians
hypot
sinh
cosh
tanh
asinh
acosh
atanh
- Arithmetic:
abs
mod
greatest
least
sign
exp
ln
log
log10
sqrt
cbrt
pow
power
ldexp
- Rounding:
ceil
ceiling
floor
round
- Date/Time (integer fields assumed to be seconds since the epoch;
long/timestamp fields assumed to be milliseconds since the epoch):
year(timestamp)
- 4-digit year, A.D.
month(timestamp)
- number of month [ 1
- 12
]
day(timestamp)
- day of month [ 1
- 31
]
hour(timestamp)
- hour of day [ 0
- 23
]
minute(timestamp)
- minute of hour [ 0
- 59
]
sec(timestamp)
- second of minute [ 0
- 59
]
msec(timestamp)
- millisecond of second [ 0
- 999
]
quarter(timestamp)
- quarter of year [ 1
- 4
];
(1
= Jan, Feb, & Mar)
week(timestamp)
- week (or partial week) of year [ 1
- 53
];
each full week starts on Sunday (1
= week containing Jan 1st)
day_of_week(timestamp)
- day of week [ 1
- 7
];
(1
= Sunday)
day_of_year(timestamp)
- day of year [ 1
- 366
]
- Timestamp-only (operated-on field must have timestamp annotation):
timestampadd(interval_type, interval_amount, timestamp)
- adds the
positive or negative interval_amount
of interval_type
units to
timestamp
interval_type
- valid values are:
YEAR
- year is modified by interval amount (not affected by leap
year, etc.)
MONTH
- month is modified by interval amount and year adjusted if
overflow/underflow occurs; day adjusted to last day of calculated
month if not a valid day for that month (e.g. Apr 31st -> Apr 30th)
DAY
- day is modified by interval amount (time not affected by
daylight savings time, etc.); month & year are adjusted, if
overflow/underflow occurs
HOUR
- hour is modified by interval amount (time not affected by
daylight savings time, etc.); date is adjusted, if overflow/underflow
occurs
MINUTE
- minute is modified by interval amount; hour & date are
adjusted, if overflow/underflow occurs
SECOND
- second is modified by interval amount; minute, hour, &
date are adjusted, if overflow/underflow occurs
FRAC_SECOND
- milliseconds are modified by interval amount; time &
date are adjusted, if overflow/underflow occurs
QUARTER
- month is modified by three times the interval amount,
irrespective of the number of days in the months in between; day
adjusting performed the same way as in MONTH
description, but only
on final month (i.e. Jan 31st + 1 quarter will be Apr 30th, not Apr
28th because of February)
WEEK
- day is modified by 7 times the interval amount (time not
affected by daylight savings time, etc.); month & year are adjusted,
if overflow/underflow occurs
interval_amount
- positive (or negative) integer specifying how
many interval_type
units will be added (or subtracted) from
timestamp
timestamp
- any field with a timestamp annotation
timestampdiff(unit_type, begin_timestamp, end_timestamp)
-
calculates the difference between two dates, returning the result in the
units specified; more precisely, how many unit_type
units need to be
added to or subtracted from begin_timestamp
to equal end_timestamp
(or get as close as possible without going past it) using the rules
specified in timestampadd
. NOTE: this is not symmetric with
timestampadd
in all cases, as adding 1 MONTH
to Mar 31st
results in Apr 30th, but the timestampdiff
in MONTH
units
between those two dates is 0.
unit_type
- same tokens as specified for the interval_type
of
timestampadd
begin_timestamp
- any field with a timestamp annotation
end_timestamp
- any field with a timestamp annotation
- Geodetic:
dist(x1,y1,x2,y2)
- computes the Euclidean distance, i.e.
sqrt( (x1-x2)*(x1-x2) + (y1-y2)*(y1-y2) )
geodist(lon1,lat1,lon2,lat2)
- computes the geographic great-circle
distance (in meters) between two lat/lon points
- String:
length(string)
- number of characters in string
(only fixed-width
string fields, char1
- char256
)
- Other:
divz(a,b,c)
- retuns the quotient a / b
unless b == 0
, in
which case it returns c
Many functions in Kinetica accept expressions as inputs in place of column names
for selecting data from the tables e.g. /aggregate/minmax.
Column expressions cannot contain aggregation functions. Given below are some
examples of column expressions:
(x + y)
(2 * col1) + col2
Many functions in Kinetica accept expressions as inputs for filtering the number
of records returned by the query e.g. /filter. A filter expression
cannot contain aggregation functions and should evaluate to a logical value
( true or false ). When the result of an expression evaluation is a
numerical value, the result is converted to a logical value as follows: 0
is considered false and any other value is considered as true. Some
examples of filter expressions are given below:
(x > y)
(a != b) or (c = d)
(timestamp > 1456749296789) and (x <= 10.0)
abs(timestamp - 1456749296789) < 60 * 60 * 1000
quarter(timestamp) = 1 and mod(year(timestamp), 4) = 0
msg_id == 'MSGID1'
Some functions in Kinetica accept aggregation expressions as inputs for selecting
data from the tables e.g. aggregate_group_by. Such expressions
can only contain aggregation functions and non-nested functions of aggregation
functions.
Available aggregation functions:
count(*)
sum
min
max
avg
mean
- synonym for avg
stddev
- the population standard deviation (i.e. the denominator is N)
stddev_pop
- the population standard deviation (i.e. the denominator is N)
stddev_samp
- the sample standard deviation (i.e. the denominator is N-1)
var
- the population variance (i.e. the denominator is N)
var_pop
- the population variance (i.e. the denominator is N)
var_samp
- the sample variance (i.e. the denominator is N-1)
Some examples of aggregate expressions:
sum(sale_price) - sum(base_price)
max(ceil(x)) - min(floor(x))
avg(abs(z - 100.0))
For some endpoints (e.g. /aggregate/groupby,
/aggregate/unique, /get/records/bycolumn), the response
from Kinetica is formatted depending on the request and the data being returned.
This is known as a dynamic schema. The response will contain an Avro schema
string (in the response_schema_str
field) and then either an Avro binary
encoded response (in the binary_encoded_response
field) or an Avro JSON
encoded response (in the json_encoded_response
field).
For dynamic schema responses, the response always consists of a series of
arrays of various types, corresponding to the columns being returned. These
arrays are labeled column_1
, column_2
, etc. In addition there is a
string array column named column_headers
containing the names or aliases of
the returned columns. The first name in column_headers
corresponds to
column_1
, the second to column_2
and so on.
Dynamic Schemas in Python
As an example, using Python, consider doing an aggregate_group_by
on a string column named msg_id
and a double column named x
, computing
the sum of a double column named y
:
retobj = gpudb.aggregate_group_by(
table_name = 'my_table',
column_names = ['msg_id','x','sum(y)'],
offset = 0,
limit = 10,
encoding = 'json'
)
In this case, retobj['response_schema_str']
will look like:
{
"type":"record",
"name":"generic_response",
"fields":
[
{
"name":"column_1",
"type":{"type":"array","items":"string"}
},
{
"name":"column_2",
"type":{"type":"array","items":"double"}
},
{
"name":"column_3",
"type":{"type":"array","items":"double"}
},
{
"name":"column_headers",
"type":{"type":"array","items":"string"}
}
]
}
And the actual data response, retobj['json_encoded_response']
, will look
like:
{
"column_1":["MSGID1","MSGID1","MSGID2"],
"column_2":[5.0,10.0,20.0],
"column_3":[11.0,12.0,13.0],
"column_headers":["msg_id","x","sum(y)"]
}
The enumerated columns can be translated into their textual names using the
retobj = gpudb.parse_dynamic_response(retobj)['response']
function,
resulting in this:
OrderedDict
(
[
(u'msg_id', [u'MSGID1',u'MSGID1',u'MSGID2']),
(u'x', [5.0,10.0,20.0]),
(u'sum(y)', [11.0,12.0,13.0])
]
)
Dynamic Schemas in Java
Dynamic schemas are simpler to handle in Java. The Record, Type, and Column classes take care of the lower level details and provide a simple way to access the data. Once the records are returned by your query, you can get the type of the record and all the column details from the type. Below is an example of a query, iterating through the records, and iterating through each column in each record.:
AggregateGroupByResponse resp1 = gpudb.aggregateGroupBy(aggGroupByRequest);
List<Record> records = resp1.getData();
for(Record rec : records){
Type type = rec.getType();
List<Type.Column> columns = type.getColumns();
for(int i=0;i<columns.size();i++){
System.out.printf("%s:%s, ", columns.get(i).getName() ,rec.get(i));
}
System.out.println();
}
Kinetica supports the SQL concept of joining tables. It does so through the
/create/jointable endpoint, which creates an intermediary
join view, whose purpose is to connect the tables involved in the join
before any query or aggregation operations are applied. In addition to any
equi-join clauses specified in the join view creation expression, filtering
clauses may also be used to further refine the data set.
Tables being joined together must either be sharded similarly or
replicated, to avoid extensive interprocessor communication. While a join
may include any number of replicated tables, it may include, at most, two
non-replicated tables that are sharded on the same key.
Distributed joins, or joins that connect non-replicated tables that are
also not similarly sharded are not supported at this time.
When a join view is created, it will contain the results of all join and
filter clauses specified. Any replicated tables not joined in the specified
expression will be merged into the join view as cross-products. Since this
is a generally undesirable outcome, it is recommended that all tables
specified in the join view creation be joined within the specified
expression. Any tables not merged into the overall join by the expression
can be subsequently merged by using the /filter endpoint to connect
the unmerged tables.
Once the join view has been created, adding or deleting data from the
joined view base tables does not change the data queried from the join view,
though updates will be reflected. The join view may be used as a source table
in most API calls.
Joins can be performed on any number and combination of tables and views,
but not on collections.
These limitations, and others, are discussed in further detail in the
Limitations & Cautions section.
To create a join view, the /create/jointable endpoint requires four
parameters:
- the name of the join view to create
- the list of member tables to be joined (
TableA
, TableB
, ..., TableZ
)
- the corresponding list of aliases for the joined tables (
A
, B
, ..., Z
)
- the SQL-style expression by which the tables can be joined and,
optionally, filtered (
A.b_id = B.id and ... A.z_id = Z.id
)
Each source table must have a corresponding alias, and all
non-replicated tables must be associated through the given expression.
In Python, given tables customer
, order
and lineitem
, a
join view can be created via:
retobj = gpudb.create_join_table(
join_table_name = 'customer_order_item',
table_names = ['customer','order','lineitem'],
aliases = ['c','o','l'],
expression = 'l.orderkey = o.orderkey and o.customerkey = c.customerkey'
)
The view produced is an equi-join of the member tables, containing the
columns from all three tables, each prefixed with the specified alias of the
corresponding source table. The result would match what would be produced by
the SQL:
SELECT c.*, o.*, l.*
FROM
lineitem l,
order o,
customer c
WHERE
l.orderkey = o.orderkey AND
o.customerkey = c.customerkey
or:
SELECT c.*, o.*, l.*
FROM
lineitem l
INNER JOIN
order o ON l.orderkey = o.orderkey
INNER JOIN
customer c ON o.customerkey = c.customerkey
The list of endpoints currently implemented to operate against join view is
as follows:
The only difference in calling endpoints on tables vs. join views is that
for join views, alias prefixes must be used to specify column names.
A Python example filter on the join view created in the
Creating a Join View section above for customer orders containing multiples
of the same product is:
retobj = gpudb.filter(
table_name = 'customer_order_item',
view_name = 'customer_order_item_multiple',
expression = 'l.quantity > 1'
)
The filter endpoint produces a view that is also a join view, when executed
against a join view. A chain of such filters can be used to create more and
more restrictive views of the original data set. The filter endpoint can also
be used to merge any tables not merged during the creation of the original
join view or during subsequent filtering of the view.
To call the group by endpoint, in Python:
retobj = gpudb.aggregate_group_by(
table_name = 'customer_order_item',
column_names = [
'o.totalprice',
'o.orderdate',
'c.custkey',
'sum(l.quantity)'
],
offset = 0,
limit = 100
)
To call the statistics endpoint, in Python:
retobj = gpudb.aggregate_statistics(
table_name = 'customer_order_item',
column_name = 'l.extendedprice * (1 - l.discount)',
stats = 'sum'
)
To call the endoint to retrieve data from specific join view columns, in
Python:
retobj = gpudb.get_records_by_column(
table_name = 'customer_order_item',
column_names = ['l.extendedprice', 'o.orderdate']
offset = 0,
limit = 50
)
This example shows how the TCP-H benchmark's Query 5 SQL statement can be
implemented in the Kinetica Python interface. The SQL for Query 5 is:
select
n_name,
sum(l_extendedprice * (1 - l_discount)) as revenue
from
customer,
orders,
lineitem,
supplier,
nation,
region
where
c_custkey = o_custkey and
l_orderkey = o_orderkey and
l_suppkey = s_suppkey and
c_nationkey = s_nationkey and
s_nationkey = n_nationkey and
n_regionkey = r_regionkey and
r_name = 'ASIA' and
o_orderdate >= 757382400 and
o_orderdate < 788918400
group by
n_name
order by
revenue desc;
To implement the above statement, a join view needs to be created with all of
the tables listed in the FROM
clause, each with an appropriate alias, and
an expression, based on the WHERE
clause, which binds the tables together:
gpudb.create_join_table(
join_table_name = 'tcph_query_5_join',
table_names = ['customer','orders','lineitem','supplier','nation','region'],
aliases = ['c','o','l','s','n','r'],
expression = 'c.c_custkey = o.o_custkey and ' \
'l.l_orderkey = o.o_orderkey and ' \
'l.l_suppkey = s.s_suppkey and ' \
'c.c_nationkey = s.s_nationkey and ' \
's.s_nationkey = n.n_nationkey and ' \
'n.n_regionkey = r.r_regionkey'
)
The join view can then have the WHERE
clause filters applied in a
separate step:
gpudb.filter(
table_name = 'tcph_query_5_join',
view_name = 'tcph_query_5_join_filter',
expression = 'r.r_name = 'ASIA' and ' \
'o.o_orderdate >= 757382400 and ' \
'o.o_orderdate < 788918400'
)
The column names of the join view will have the original table aliases with
which they were associated as part of their permanent names. In the above
filter call, r.r_name
and other column names are the actual column names of
the joined view, not indicative of further aliasing being applied.
This join view and filter could also have been done in a single step:
gpudb.create_join_table(
join_table_name = 'tcph_query_5_join_filter',
table_names = ['customer','orders','lineitem','supplier','nation','region'],
aliases = ['c','o','l','s','n','r'],
expression = 'c.c_custkey = o.o_custkey and ' \
'l.l_orderkey = o.o_orderkey and ' \
'l.l_suppkey = s.s_suppkey and ' \
'c.c_nationkey = s.s_nationkey and ' \
's.s_nationkey = n.n_nationkey and ' \
'n.n_regionkey = r.r_regionkey and ' \
'r.r_name = 'ASIA' and ' \
'(o.o_orderdate >= 757382400) and ' \
'(o.o_orderdate < 788918400)'
)
Regardless of whether the join/filter was done in one step or two, the results
need to be grouped and summed to match the TPC-H query. The following call is
given the directive to sort_by
the value
, which will sort on the
aggregation column:
retobj = gpudb.aggregate_group_by(
table_name = 'tcph_query_5_join_filter',
column_names = [
'n.n_name',
'sum(l.l_extendedprice * (1 - l.l_discount))'
],
offset = 0,
limit = gpudb.END_OF_SET,
encoding = 'json',
options = {'sort_by':'value', 'sort_order':'descending'}
)
The results can be displayed by calling:
gpudb.parse_dynamic_response(retobj, do_print=True)
Joins must merge together simple data sets, and can therefore use both tables
and views as sources, but not collections.
Join views are affected by data modifications on underlying tables in
different ways, based on the type of data modification.
- Inserts: If a record is added to an underlying table, it will not appear
in queries of the join view, whether or not the applied filter would include
it
- Updates: If a record that already contributes to a join view record set
is modified in its underlying table, it will appear with the modified values
when the join view is re-queried. If a record that does not contribute to a
join view record set is modified in its underlying table in a way that
would make it part of the join view record set, it will not appear when the
join view is re-queried.
- Deletes: If a record that already contributes to a join view record set
is deleted from its underlying table, it will still appear when the
join view is re-queried.
Overall, data manipulation within the tables supporting a join view will not
cause the number of records returned by the join view to change, though
updates to records already part of the join view record set will be reflected
in the data values retrieved from the join view.
Though the sharded tables in a join view must be joined in the
join view creation expression, the replicated tables do not have to be.
In this case, a Cartesian Product will result for every unmerged replicated
table.
For example, given table A
with integer columns x
and y
and three
records, and table B
with a string column r
and an integer column
s
and two records, as follows:
Table A
:
Table B
:
Then the join view of A
and B
with aliases a
and b
and with no
expression given is a 4-column view with 6 records, as follows:
a.x |
a.y |
b.r |
b.s |
1 |
10 |
'x' |
100 |
1 |
10 |
'y' |
200 |
2 |
20 |
'x' |
100 |
2 |
20 |
'y' |
200 |
3 |
30 |
'x' |
100 |
3 |
30 |
'y' |
200 |
It should be noted that while the view produced is the cross-product of the
input tables, the actual cross-product is never realized in memory. However,
when doing an aggregate operation (such as group-by or histogram) on the
join view, every object in the cross-product will be visited. Filter
operations that transform input join views into smaller result join views
should be done prior to any aggregation queries, reducing the number of records
that must be visited.
NOTE: In order for a join to have a Cartesian Product, both the server
and the client call need to be configured properly. See the Configuration Reference for
server configuration details and /create/jointable endpoint
documentation for API details. If not configured properly, a
Cartesian Product may result in an error.
Kinetica supports the concept of a materialized view, which is a
greater-performant query across one or more tables.
In Kinetica, this view can be insert-triggered; meaning that it can be
configured to refresh based on the presence of new records--updates & deletes
will not prompt the refresh behavior. This scheme is available primarily
because Kinetica serves the OLAP use case best, where a system is heavy on
inserts into a fact table and light on updates & deletes.
An update to a record in a table supporting the materialized view will be
reflected automatically in the view, regardless of whether the update would
cause the record to be excluded from the result set, were the query re-issued.
For instance, if a record was updated in a way that made it no longer pass the
query's filter criteria, it would still be present in the materialized view.
The deletion of a record in a table supporting the materialized view will
not be reflected in the view until it is manually refreshed.
A materialized view query will be analyzed by Kinetica in an attempt to
determine a base table, which is defined as either:
- the sole table in the query
- the root table, linked directly or indirectly to all other tables
through relationships with the primary keys of those tables; it is
therefore necessary that all other tables have primary keys
The materialized view will follow an operational course based on whether a
base table could be determined, what mode the materialized view has been set
to, and what data manipulation operations have been applied to the tables
composing it.
There are three modes in which a materialized view can operate: manual,
on query, and on insert.
In manual mode, the materialized view will only be made fully current by
performing a manual refresh of the view. If an incremental refresh is
applied, only inserts into the base table will be reflected in the updated
content of the materialized view. If inserts are made into tables other
than the base table, a full refresh will be performed. This will result in
a rebuilding of all the view's data, accounting for any updates & deletes.
In on query mode, the materialized view will be made current when a query is
made against it and records have been inserted into any of the data sets that
compose the view. If records have only been added to the base table, an
incremental refresh will be performed; otherwise, a full refresh will be
performed. If no data is added between the issuance of two queries against the
materialized view, no rebuilding of the view will be performed.
In on insert mode, the materialized view will be kept current by
automatically adding to it each record inserted into the base table. Outside
of base table inserts, this mode functions the same as on query mode.
The primary use case for this mode is for a view against a set of tables where
the data manipulation operations consist almost entirely of base table inserts
and the queries against the materialized view need to be fast and current,
with respect to those inserts. An example would be an OLAP system consisting of
mostly inserts into a fact table.
A manual refresh of a materialized view can be performed on any
materialized view, regardless of operating mode, to update the view's data.
As a materialized view is insert-triggered, it may be necessary to refresh
the data periodically to properly reflect updates & deletes that have taken
place since the materialized view was created or since the last refresh.
There are two types of refresh operations that can be performed: full &
incremental.
A full refresh will cause a rebuilding of the entire materialized view,
irrespective of what data manipulation operations have taken place. This
operation takes the same time as creating the original view, but is the one that
is guaranteed to properly reflect updates & deletes made to the underlying
tables.
An incremental refresh is insert-based. If one is requested on a
materialized view in which the only new data are records inserted into the
base table, only those records will be added to the materialized view. If
inserts have been made into tables other than the base table, a
full refresh will be performed.
Note that it is generally unnecessary to issue an incremental refresh against
a materialized view in on insert mode or on query mode, as both of these
use the incremental refresh automatically to stay current.
An example, in Python, of performing an incremental refresh of a
materialized view (recent_customer_order
):
response = gpudb.create_join_table(
join_table_name = 'recent_customer_order',
table_names = [],
aliases = [],
options = {'refresh':'refresh'}
)
An example of performing a full refresh on the same materialized view:
response = gpudb.create_join_table(
join_table_name = 'recent_customer_order',
table_names = [],
aliases = [],
options = {'refresh':'full_refresh'}
)
A materialized view differs from a filtered view in several important ways:
- inserts - additions to the data set that composes a view will not be
reflected, but can be reflected in a materialized view
- updates - modifications in the data set that composes a view will result
in those modified records being removed from the view; they will be updated
in a materialzied view, even if the modifications would result in those
records being filtered out of the materialized view, were the query
re-issued
- deletes - deletions in the data set that composes a view will be reflected
properly; in a materialized view, those records will still appear
A standard SQL view acts as a persisted query on a given data set. When a user
queries the view, the view's query is re-executed and the new results returned.
Kinetica can mimic this behavior, to some extent, with a materialized view. A
materialized view can be created in manual mode (to avoid the extra
processing that occurs during inserts in on insert mode) and then have a
manual full refresh requested of it before being requeried. This will ensure
that all recent inserts, updates, & deletes are accurately reflected in the
result set.
The following are examples of working with materialized views in Python.
To create a materialized view (recent_order
) on a single table
(order
), and keep recent_order
current as new records are inserted into
order
:
response = gpudb.create_join_table(
join_table_name = 'recent_order',
table_names = ['order'],
aliases = ['o'],
options = {'refresh_method':'on_insert'}
)
To create a materialized view (recent_customer_order
) across three
tables: a base table (order
), a parent table (customer
), and a
parent table to that (nation
), and keep recent_customer_order
current
as new records are inserted into order
:
response = gpudb.create_join_table(
join_table_name = 'recent_customer_order',
table_names = ['order', 'customer', 'nation'],
aliases = ['o', 'c', 'n'],
expression = \
'o.custkey = c.custkey and ' \
'c.nationkey = n.nationkey',
options = {'refresh_method':'on_insert'}
)
NOTE: In the example above, the customer
& nation
tables must
have primary keys on custkey
& nationkey
, respectively, for the
on insert scheme to function properly.
To create a materialized view (customer_location
) across three tables: a
table (customer
), its parent table (nation
), and the parent table
of that (region
), and make customer_location
current upon querying it
after new records are inserted into customer
, nation
, or region
:
response = gpudb.create_join_table(
join_table_name = 'customer_location',
table_names = ['customer', 'nation', 'region'],
aliases = ['c', 'n', 'r'],
expression = \
'c.nationkey = n.nationkey and ' \
'n.regionkey = r.regionkey',
options = {'refresh_method':'on_query'}
)
NOTE: In the example above, there are no primary key requirements.