Version:

Concepts

An introductory guide to understanding the core concepts of Kinetica.

Types

A type is analogous to a traditional database schema for a table. Before data can be stored in Kinetica, a type must be specified for that data. For every type, Kinetica assigns a unique GUID. Kinetica will use the same GUID for all types with identical characteristics.

Every type in Kinetica consists of the following:

  • Type Label (text to identify this type)
  • Type Schema (list of attributes with their primitive types)
  • Annotation (Security) Attribute (optional)

Type Label

A type label serves as a tagging mechanism for the type. The type label can be any text string specified by the client. The type label serves two purposes. First, it identifies tables with similar data. Second, it helps determine a type’s uniqueness.

Type Schema

A type schema consists of a set of column names and their respective primitive types. Each column can also be assigned a number of pre-defined properties that determine aspects of that column, like searchability and whether or not the column is indexed.

Primitive Types

A given column in a type schema must be of one of the following primitive types:

General Types Kinetica-implemented Types | Minimum Value Maximum Value
int integer (default) -231 231 - 1
int16 -215 215 - 1
int8 -27 27 - 1
long long (default) -263 263 - 1
timestamp -263 263 - 1
float float (default) -(2-223)*2127 (2-223)*2127
double double (default) -(2-252)*21023 (2-252)*21023
string string (default) (empty string) (10,000,000 characters)
char256 (empty string) (256 characters)
char128 (empty string) (128 characters)
char64 (empty string) (64 characters)
char32 (empty string) (32 characters)
char16 (empty string) (16 characters)
char8 (empty string) (8 characters)
char4 (empty string) (4 characters)
char2 (empty string) (2 characters)
char1 (empty string) (1 character)
ipv4 0.0.0.0 255.255.255.255
bytes bytes (default) (empty array) (no configured limit)

All data types other than bytes are queryable by Kinetica. Columns that are of type bytes are for store-only.

Column Properties

Kinetica has an additional layer of semantic regarding column data type. At the time of creating a type, a column can be given any number of the supported properties which give the column data type special meaning. Some properties enhance query performance, while others specify sharding or other special qualities. Here is a list of currently supported column properties:

Properties Description
data Default property for all numeric and string type columns; makes the column available for use in query expressions.
text_search Enables full text search for string columns. Can be set independently of data, disk_optimized, & store_only.
store_only Values are persisted, but only able to be retrieved via /get/records or similar call; the column will not be able to be used in a filter or aggregation expression. It is mutually exclusive with the data property. Any bytes column will be store_only by default. This property reduces system memory usage.
disk_optimized Prevents variable-width strings from being written to an indexing service, saving disk space at the cost of some functionality. Disallows /filter/bystring in contains, starts with, & regex modes, and /aggregate/groupby & /aggregate/unique; however, /aggregate/groupby will still function when a count or count_distinct is the grouped operation. Requires the data property.
timestamp Values represent timestamps in milliseconds since the Unix epoch: Jan 1 1970 00:00:00. Valid only for long columns.
char1 Values are strings of up to 1 character. Optimizes memory, disk, and query performance for the associated string column.
char2 Values are strings of up to 2 characters. Optimizes memory, disk, and query performance for the associated string column.
char4 Values are strings of up to 4 characters. Optimizes memory, disk, and query performance for the associated string column.
char8 Values are strings of up to 8 characters. Optimizes memory, disk, and query performance for the associated string column.
char16 Values are strings of up to 16 characters. Optimizes memory, disk, and query performance for the associated string column.
char32 Values are strings of up to 32 characters. Optimizes memory, disk, and query performance for the associated string column.
char64 Values are strings of up to 64 characters. Optimizes memory, disk, and query performance for the associated string column.
char128 Values are strings of up to 128 characters. Optimizes memory, disk, and query performance for the associated string column.
char256 Values are strings of up to 256 characters. Optimizes memory, disk, and query performance for the associated string column.
int8 Values are limited to 8-bit signed integers. Optimizes memory and query performance for the associated int column.
int16 Values are limited to 16-bit signed integers. Optimizes memory and query performance for the associated int column.
ipv4 Values represent dotted decimal IPv4 addresses of the form: A.B.C.D where A, B, C and D are between 0 and 255, inclusive; (e.g. 192.168.1.100). Optimizes memory, disk, and query performance for the associated string column.
primary_key Makes this column part of (or the entire) primary key
shard_key Makes this column part of (or the entire) shard key

Tables

The concept of tables is at the heart of Kinetica interactions. A table is a data container associated with a specific type schema (set of columns & types), much like tables in other database platforms. When querying a table, the result set will also have a single type schema associated with it.

Tables may exist either as top-level objects or as members of collections.

Each table is identified by a name, which must meet the following criteria:

  • Between 1 and 200 characters long
  • Alphanumeric, including spaces and these symbols: _ - ( ) { } [ ] . :
  • First character is alphanumeric or an underscore
  • Unique system-wide--cannot have the same name as another table, view, or collection, regardless of whether it is top-level or a member of a collection

Distribution

Table data can be distributed across the Kinetica cluster using one of two methods: sharding & replication.

Sharding

Sharding is the distribution of table data by hashing a particular value for each record, and by that hash, determining on which Kinetica cluster node the record will reside.

The benefit of sharding is that it allows distributed queries to be run against a given data set, with each node responsible for managing its portion of the overall data set, while only storing each record once across the cluster. The parallelism inherent in the distributed queries allows for the query performance to scale linearly with the addition of each cluster node.

A limitation of sharding is that two sharded tables can only be joined together if they are sharded in the same way, so that their corresponding records will all reside on the same nodes. Given that, in a typical database model, each pair of related tables is associated by a different key, a single query may only be able to join together at most two sharded tables along their relation.

Since sharding maximizes data storage & query efficiency, it is recommended that the largest table in the system (e.g. the fact table in a data warehouse) be sharded. The second largest table against which that table is joined should also be sharded along the join relation (e.g. the column on each side of the foreign key relationship).

For example, if the largest joined tables in a system were customer & order, and there was a foreign key relationship between the customer_id column of the order table & the id column of the customer table, the order table should be sharded on customer_id and the customer table should be sharded on id.

Replication

Replication is the distribution of table data by locating its entire data set on every Kinetica cluster node simultaneously, instead of being distributed across them.

The benefit of replication is that it allows tables to be joined together when those tables are not sharded on the columns being associated.

Normally, joining two tables together requires them being joined on their shard keys, so that the joining of the two data sets can happen locally on each processor shard. Since replicated data sets exist on all shards, they appear local to any other data set, and can be joined on each shard as if they were local. The result sets from each shard are then accumulated into one and returned as the final result set.

Since replicated tables exist in their entirety on all processors, it is recommended that they be relatively small. For example, a table containing one gigabyte of data, when replicated across a 10-node cluster will occupy 10 gigabytes of cluster memory overall.

A table is specified as replicated at creation time, by calling the /create/table endpoint with the is_replicated option set to true.

For example, in Python,:

gpudb.create_table(
    table_name = set_id,
    type_id = type_id,
    options = {'is_replicated': 'true'}
)

Primary Keys

Primary key is designation that can be applied to a single table column or set of columns (in the case of a composite primary key) to ensure the uniqueness of the data contained in the keyed column(s).

The uniqueness constraint is enforced upon insert in two ways:

  • If a record to be inserted has key values that match those of an already existing record in the target table, the new record’s values will either:
    • Overwrite the existing record’s values, if the update_on_existing_pk option is set to true
    • Be ignored (the new record effectively discarded), if the update_on_existing_pk option is set to false or not set
  • If two or more records within a given batch insert have the same key values, with respect to the primary key of the target table, the entire batch of records to insert is rejected

By default, a table has no primary key. One must be explicitly designated in the creation of the type schema associated with the table. Only one primary key can exist per table.

The primary key for a table becomes its implicit shard key, used for distributing its records across processors. This shard key designation can only be overridden when the primary key is a composite primary key and one or more of the composite primary key columns is explicitly designated as the shard key.

The primary key designation also applies a hash-based index to the column. This index optimizes the performance of the following operations, when those operations are given an equality-based (e.g. (x = 0)) filter expression:

Lastly, the primary key designation enables the column to serve as the target of a foreign key.

Assigning a primary key to a table column requires two steps. The first is to create a type schema, marking the primary key field with the primary_key property. For example, to create a product type schema with a primary key on product_id, in Python:

response = gpudb.create_type(
    type_definition = """{
            "type": "record",
            "name": "product_type",
            "fields": [
                {"name":"product_id","type":"int"},
                {"name":"product_name","type":"string"}
            ]
        }""",
    label = 'product_type',
    properties = {'product_id': ['primary_key','int16']}
)

Note that the type_definition is a JSON string defining the type schema.

The second step is to create a table with that type schema, using the type_id that is returned from the create_type call. Continuing from the previous example:

gpudb.create_table(
    table_name = 'product',
    type_id = response['type_id']
)

Shard Keys

Shard key is designation that can be applied to a single table column or set of columns (in the case of a composite shard key) whose values are used in distributing records across processors. This distribution allows for processing of queries against a sharded table to be performed in parallel.

By default, a hash is computed for each record in a table and serves as the key by which the associated record is distributed to its corresponding processor, or shard. A shard key can be explicitly designated by assigning the SHARD_KEY property to one or more columns of a table. A shard key is implicitly designated when a primary key is assigned to a table; all columns involved in the primary key are used as the shard key. If a primary key exists on a table, one or more of the columns comprising the primary key can be designated as the shard key; columns not part of the primary key may not be given this designation.

Only one shard key can exist per table.

Foreign Keys

A foreign key is a tuning option for joining tables. It acts as a relational index from a source table to corresponding records in a target table. As new records are inserted into the source table, the index is updated with references to the target records associated with them. If a foreign key is not used, the join relationships are established during query time.

In order for the linkage to be made successfully, it must connect the shard key column of a source table with either the similarly sharded primary key column of a target table or the primary key column of a replicated target table. The sharding requirement exists to ensure the related records of each table being joined are located on the same processing node. The replication alternative alleviates the sharding requirement, as all of the records from the replicated target table will exist on all processor nodes, and thus, appear to be local to all records from the source table involved in the relation.

Foreign keys link single source columns to single primary key target columns; multi-column foreign keys are not supported.

Multiple foreign keys can exist on a source table. Multiple foreign keys may also exist on target tables; though, since foreign key targets must be primary keys, the foreign keys would all have to target the same primary key column.

Foreign keys do not provide referential integrity checks.

Indexes

An index can be applied to one or more individual table columns to improve the performance of operations applied to the table using the indexed column in a filter or join expression.

The index is implemented as a b-tree, which provides performance improvements for both equality-based (e.g. (x = 0)) and range-based (e.g. (x > 0)) filter criteria on individual columns. Multi-column and function-based indexes are not supported at this time.

As primary key indexes only optimize the performance of equality-based filter criteria, an index can be applied to a primary key column for a performance boost on range-based filter criteria--the two index types are not mutually exclusive.

Indexes optimize the performance of the following operations, when those operations are given an equality-based or range-based filter expression:

An index will additionally improve the performance of the following operations:

To apply an index to a column, the /alter/table endpoint should be called with the create_index option. For example, in Python:

retobj = gpudb.alter_table(
             table_name = '<name_of_table>',
             action = 'create_index'
             value = '<name_of_column>'
         )

Views

A view is a queryable filter of a table, collection, or another view. A new view is created as a result of each filter query run within Kinetica, and as such, is also referred to as a result table.

When a filter operation is performed on a table or view, the resulting view will be created as a top-level entity (not as part of a collection). When a filter operation is performed on a collection, a new collection is created containing the result set views, mirroring the original collection. Only tables in the queried collection that contained the queried columns will have corresponding views within the result collection.

Views cannot be modified once created with one exception: a view can be updated to insert the missing data points for a series from the underlying table, using /update/records/byseries.

A view can be used in place of a table for queries, allowing query chaining, by filtering the views created by previous filters.

When the record contained in a view is modified by an update of the underlying table, the record is removed from the view, regardless of whether the update impacted the record's qualifications for membership in the filtered result set.

When a table is cleared (dropped) all the views over the table are also automatically cleared.

As they are result sets, and given the need to minimize memory usage, views are given a default time-to-live, after which they will expire and be removed from memory. Each access of the view causes the remaining time-to-live to be reset. Thus, a view accessed more frequently than its time-to-live will effectively remain in memory indefinitely.

Views have the same naming criteria as tables.

Query Chaining

Query Chaining in Kinetica is defined as performing a filter on a view, which itself was created from a previous filter operation. Kinetica allows access to all intermediate views after each query. This means that each view can be used to support multiple subsequent filters. Also, given a long sequence of queries, a trace can be performed through the progression of each query to help refine analysis.

As an example, consider an analysis that consisted of three queries chained in sequence. If, at the end of the chain, the expected results were not returned, the result of the second query can be examined. If that query consisted of several records that were expected in the final analysis, it could be deduced that a problem exists with the third query.

Collections

A collection is a container for tables, somewhat akin to a schema in other databases. The contained tables can be of uniform or heterogeneous type schemas.

A collection can also be a container of views. A view collection is what results from performing a filter operation against another collection. Other types of views, like joins, can also be created within collections.

Besides providing a means for logical grouping of tables, a collection provides the ability to query the contained tables, regardless of the mixture of type schemas, for columns shared between all tables. Tables within the collection that lack the queried-for columns will simply not contribute to the resulting data set.

For example, a transportation department may have two teams gathering ongoing spatial information in their respective jurisdictions. Team A ingests geo-referenced objects from Twitter into a table named TWITTER inside of the collection MASTER. Their objects have X and Y columns corresponding to longitude and latitude, as well as TIMESTAMP and other columns. Meanwhile, Team B collects vehicle movement information, adding their objects with X, Y, TIMESTAMP, TRACK_ID and other columns to a table named VEHICLE_TRACKS also inside the MASTER collection. Now, queries for X, Y, & TIMESTAMP on the collection MASTER will be applied to both tables.

Collections do not themselves have a time-to-live in the way that views do. However, a collection will be removed from memory automatically when the last table or view within it is removed, either directly or via expiration. The deletion event itself is what triggers the removal of the collection; thus, a collection created with no tables or views in it will remain in memory, though empty, as no deletion event of a contained table or view within it will trigger its removal.

Setting a time-to-live on a collection will set the time-to-live of every table & view contained within it. This creates an effective time-to-live for the collection, as each access of a member of the collection will extend its life.

Collections have the same naming criteria as tables.

Expressions

Kinetica accepts expressions as inputs while querying data. These expression are mathematical expression involving one or more constants (both numeric and string) and table columns. As described in the sub-sections below, these expressions can be column, filter, or aggregation expressions. The expressions follow certain constraints based on where they are used, but all the expressions should follow the basic guidelines outlined below:

  • Use parentheses liberally to ensure correct order-of-operations.
  • Constants:
    • String constants must be enclosed in single quotes or double quotes.
    • Numerical constants can be expressed as:
      • decimal integers (31242)
      • hex integers (0x3420123a)
      • doubles (3.41e5)
      • floats (3.1415F)
  • Operators:
    • Mathematical: + - * /
    • Bitwise: & | << >> ~ ^
    • Comparison: > < >= <= == = != <> in
    • Logical (when applied directly to numeric columns, these will interpret non-zero values as true and zero values as false, returning will return 1 for true and 0 for false):
      • and - both arguments true
      • or - either argument true
      • xor - one argument true, one false
      • not - argument false
      • ! - synonym for not
  • Functions:
    • Logical:
      • if(C,T,F) - if C is true, return T, otherwise return F
        • C - any true/false condition (when an integer column is used directly, this function will interpret non-zero values as true and zero values as false)
        • T - any numeric value
        • F - any numeric value
    • Trigonometric: sin cos tan cot asin acos atan atan2 atn2 degrees radians hypot sinh cosh tanh asinh acosh atanh
    • Arithmetic: abs mod greatest least sign exp ln log log10 sqrt cbrt pow power ldexp
    • Rounding: ceil ceiling floor round
    • Date/Time (integer fields assumed to be seconds since the epoch; long/timestamp fields assumed to be milliseconds since the epoch):
      • year(timestamp) - 4-digit year, A.D.
      • month(timestamp) - number of month [ 1 - 12 ]
      • day(timestamp) - day of month [ 1 - 31 ]
      • hour(timestamp) - hour of day [ 0 - 23 ]
      • minute(timestamp) - minute of hour [ 0 - 59 ]
      • sec(timestamp) - second of minute [ 0 - 59 ]
      • msec(timestamp) - millisecond of second [ 0 - 999 ]
      • quarter(timestamp) - quarter of year [ 1 - 4 ]; (1 = Jan, Feb, & Mar)
      • week(timestamp) - week (or partial week) of year [ 1 - 53 ]; each full week starts on Sunday (1 = week containing Jan 1st)
      • day_of_week(timestamp) - day of week [ 1 - 7 ]; (1 = Sunday)
      • day_of_year(timestamp) - day of year [ 1 - 366 ]
    • Timestamp-only (operated-on field must have timestamp annotation):
      • timestampadd(interval_type, interval_amount, timestamp) - adds the positive or negative interval_amount of interval_type units to timestamp
        • interval_type - valid values are:
          • YEAR - year is modified by interval amount (not affected by leap year, etc.)
          • MONTH - month is modified by interval amount and year adjusted if overflow/underflow occurs; day adjusted to last day of calculated month if not a valid day for that month (e.g. Apr 31st -> Apr 30th)
          • DAY - day is modified by interval amount (time not affected by daylight savings time, etc.); month & year are adjusted, if overflow/underflow occurs
          • HOUR - hour is modified by interval amount (time not affected by daylight savings time, etc.); date is adjusted, if overflow/underflow occurs
          • MINUTE - minute is modified by interval amount; hour & date are adjusted, if overflow/underflow occurs
          • SECOND - second is modified by interval amount; minute, hour, & date are adjusted, if overflow/underflow occurs
          • FRAC_SECOND - milliseconds are modified by interval amount; time & date are adjusted, if overflow/underflow occurs
          • QUARTER - month is modified by three times the interval amount, irrespective of the number of days in the months in between; day adjusting performed the same way as in MONTH description, but only on final month (i.e. Jan 31st + 1 quarter will be Apr 30th, not Apr 28th because of February)
          • WEEK - day is modified by 7 times the interval amount (time not affected by daylight savings time, etc.); month & year are adjusted, if overflow/underflow occurs
        • interval_amount - positive (or negative) integer specifying how many interval_type units will be added (or subtracted) from timestamp
        • timestamp - any field with a timestamp annotation
      • timestampdiff(unit_type, begin_timestamp, end_timestamp) - calculates the difference between two dates, returning the result in the units specified; more precisely, how many unit_type units need to be added to or subtracted from begin_timestamp to equal end_timestamp (or get as close as possible without going past it) using the rules specified in timestampadd. NOTE: this is not symmetric with timestampadd in all cases, as adding 1 MONTH to Mar 31st results in Apr 30th, but the timestampdiff in MONTH units between those two dates is 0.
        • unit_type - same tokens as specified for the interval_type of timestampadd
        • begin_timestamp - any field with a timestamp annotation
        • end_timestamp - any field with a timestamp annotation
    • Geodetic:
      • dist(x1,y1,x2,y2) - computes the Euclidean distance, i.e. sqrt( (x1-x2)*(x1-x2) + (y1-y2)*(y1-y2) )
      • geodist(lon1,lat1,lon2,lat2) - computes the geographic great-circle distance (in meters) between two lat/lon points
    • String:
      • length(string) - number of characters in string (only fixed-width string fields, char1 - char256)
    • Other:
      • divz(a,b,c) - retuns the quotient a / b unless b == 0, in which case it returns c

Column expressions

Many functions in Kinetica accept expressions as inputs in place of column names for selecting data from the tables e.g. /aggregate/minmax. Column expressions cannot contain aggregation functions. Given below are some examples of column expressions:

(x + y)
(2 * col1) + col2

Filter Expressions

Many functions in Kinetica accept expressions as inputs for filtering the number of records returned by the query e.g. /filter. A filter expression cannot contain aggregation functions and should evaluate to a logical value ( true or false ). When the result of an expression evaluation is a numerical value, the result is converted to a logical value as follows: 0 is considered false and any other value is considered as true. Some examples of filter expressions are given below:

(x > y)
(a != b) or (c = d)
(timestamp > 1456749296789) and (x <= 10.0)
abs(timestamp - 1456749296789) < 60 * 60 * 1000
quarter(timestamp) = 1 and mod(year(timestamp), 4) = 0
msg_id == 'MSGID1'

Aggregate Expressions

Some functions in Kinetica accept aggregation expressions as inputs for selecting data from the tables e.g. aggregate_group_by. Such expressions can only contain aggregation functions and non-nested functions of aggregation functions.

Available aggregation functions:

  • count(*)
  • sum
  • min
  • max
  • avg
  • mean - synonym for avg
  • stddev - the population standard deviation (i.e. the denominator is N)
  • stddev_pop - the population standard deviation (i.e. the denominator is N)
  • stddev_samp - the sample standard deviation (i.e. the denominator is N-1)
  • var - the population variance (i.e. the denominator is N)
  • var_pop - the population variance (i.e. the denominator is N)
  • var_samp - the sample variance (i.e. the denominator is N-1)

Some examples of aggregate expressions:

sum(sale_price) - sum(base_price)
max(ceil(x)) - min(floor(x))
avg(abs(z - 100.0))

Dynamic Schemas

For some endpoints (e.g. /aggregate/groupby, /aggregate/unique, /get/records/bycolumn), the response from Kinetica is formatted depending on the request and the data being returned. This is known as a dynamic schema. The response will contain an Avro schema string (in the response_schema_str field) and then either an Avro binary encoded response (in the binary_encoded_response field) or an Avro JSON encoded response (in the json_encoded_response field).

For dynamic schema responses, the response always consists of a series of arrays of various types, corresponding to the columns being returned. These arrays are labeled column_1, column_2, etc. In addition there is a string array column named column_headers containing the names or aliases of the returned columns. The first name in column_headers corresponds to column_1, the second to column_2 and so on.

Dynamic Schemas in Python

As an example, using Python, consider doing an aggregate_group_by on a string column named msg_id and a double column named x, computing the sum of a double column named y:

retobj = gpudb.aggregate_group_by(
             table_name = 'my_table',
             column_names = ['msg_id','x','sum(y)'],
             offset = 0,
             limit = 10,
             encoding = 'json'
         )

In this case, retobj['response_schema_str'] will look like:

{
    "type":"record",
    "name":"generic_response",
    "fields":
    [
        {
            "name":"column_1",
            "type":{"type":"array","items":"string"}
        },
        {
            "name":"column_2",
            "type":{"type":"array","items":"double"}
        },
        {
            "name":"column_3",
            "type":{"type":"array","items":"double"}
        },
        {
            "name":"column_headers",
            "type":{"type":"array","items":"string"}
        }
    ]
}

And the actual data response, retobj['json_encoded_response'], will look like:

{
    "column_1":["MSGID1","MSGID1","MSGID2"],
    "column_2":[5.0,10.0,20.0],
    "column_3":[11.0,12.0,13.0],
    "column_headers":["msg_id","x","sum(y)"]
}

The enumerated columns can be translated into their textual names using the retobj = gpudb.parse_dynamic_response(retobj)['response'] function, resulting in this:

OrderedDict
(
    [
        (u'msg_id', [u'MSGID1',u'MSGID1',u'MSGID2']),
        (u'x', [5.0,10.0,20.0]),
        (u'sum(y)', [11.0,12.0,13.0])
    ]
)

Dynamic Schemas in Java

Dynamic schemas are simpler to handle in Java. The Record, Type, and Column classes take care of the lower level details and provide a simple way to access the data. Once the records are returned by your query, you can get the type of the record and all the column details from the type. Below is an example of a query, iterating through the records, and iterating through each column in each record.:

AggregateGroupByResponse resp1 = gpudb.aggregateGroupBy(aggGroupByRequest);
List<Record> records = resp1.getData();
for(Record rec : records){
    Type type = rec.getType();

    List<Type.Column> columns = type.getColumns();
    for(int i=0;i<columns.size();i++){
        System.out.printf("%s:%s, ", columns.get(i).getName() ,rec.get(i));
    }
    System.out.println();
}

Joins

Kinetica supports the SQL concept of joining tables. It does so through the /create/jointable endpoint, which creates an intermediary join view, whose purpose is to connect the tables involved in the join before any query or aggregation operations are applied. In addition to any equi-join clauses specified in the join view creation expression, filtering clauses may also be used to further refine the data set.

Tables being joined together must either be sharded similarly or replicated, to avoid extensive interprocessor communication. While a join may include any number of replicated tables, it may include, at most, two non-replicated tables that are sharded on the same key. Distributed joins, or joins that connect non-replicated tables that are also not similarly sharded are not supported at this time.

When a join view is created, it will contain the results of all join and filter clauses specified. Any replicated tables not joined in the specified expression will be merged into the join view as cross-products. Since this is a generally undesirable outcome, it is recommended that all tables specified in the join view creation be joined within the specified expression. Any tables not merged into the overall join by the expression can be subsequently merged by using the /filter endpoint to connect the unmerged tables.

Once the join view has been created, adding or deleting data from the joined view base tables does not change the data queried from the join view, though updates will be reflected. The join view may be used as a source table in most API calls.

Joins can be performed on any number and combination of tables and views, but not on collections.

These limitations, and others, are discussed in further detail in the Limitations & Cautions section.

Creating a Join View

To create a join view, the /create/jointable endpoint requires four parameters:

  1. the name of the join view to create
  2. the list of member tables to be joined (TableA, TableB, ..., TableZ)
  3. the corresponding list of aliases for the joined tables (A, B, ..., Z)
  4. the SQL-style expression by which the tables can be joined and, optionally, filtered (A.b_id = B.id and ... A.z_id = Z.id)

Each source table must have a corresponding alias, and all non-replicated tables must be associated through the given expression.

Examples

In Python, given tables customer, order and lineitem, a join view can be created via:

retobj = gpudb.create_join_table(
             join_table_name = 'customer_order_item',
             table_names = ['customer','order','lineitem'],
             aliases = ['c','o','l'],
             expression = 'l.orderkey = o.orderkey and o.customerkey = c.customerkey'
         )

The view produced is an equi-join of the member tables, containing the columns from all three tables, each prefixed with the specified alias of the corresponding source table. The result would match what would be produced by the SQL:

SELECT c.*, o.*, l.*
FROM
    lineitem l,
    order o,
    customer c
WHERE
    l.orderkey = o.orderkey AND
    o.customerkey = c.customerkey

or:

SELECT c.*, o.*, l.*
FROM
    lineitem l
INNER JOIN
    order o ON l.orderkey = o.orderkey
INNER JOIN
    customer c ON o.customerkey = c.customerkey

Operating on a Join View

The list of endpoints currently implemented to operate against join view is as follows:

The only difference in calling endpoints on tables vs. join views is that for join views, alias prefixes must be used to specify column names.

Examples

A Python example filter on the join view created in the Creating a Join View section above for customer orders containing multiples of the same product is:

retobj = gpudb.filter(
             table_name = 'customer_order_item',
             view_name = 'customer_order_item_multiple',
             expression = 'l.quantity > 1'
         )

The filter endpoint produces a view that is also a join view, when executed against a join view. A chain of such filters can be used to create more and more restrictive views of the original data set. The filter endpoint can also be used to merge any tables not merged during the creation of the original join view or during subsequent filtering of the view.

To call the group by endpoint, in Python:

retobj = gpudb.aggregate_group_by(
             table_name = 'customer_order_item',
             column_names = [
                 'o.totalprice',
                 'o.orderdate',
                 'c.custkey',
                 'sum(l.quantity)'
             ],
             offset = 0,
             limit = 100
         )

To call the statistics endpoint, in Python:

retobj = gpudb.aggregate_statistics(
             table_name = 'customer_order_item',
             column_name = 'l.extendedprice * (1 - l.discount)',
             stats = 'sum'
         )

To call the endoint to retrieve data from specific join view columns, in Python:

retobj = gpudb.get_records_by_column(
             table_name = 'customer_order_item',
             column_names = ['l.extendedprice', 'o.orderdate']
             offset = 0,
             limit = 50
         )

SQL Conversion Example

This example shows how the TCP-H benchmark's Query 5 SQL statement can be implemented in the Kinetica Python interface. The SQL for Query 5 is:

select
    n_name,
    sum(l_extendedprice * (1 - l_discount)) as revenue
from
    customer,
    orders,
    lineitem,
    supplier,
    nation,
    region
where
    c_custkey = o_custkey and
    l_orderkey = o_orderkey and
    l_suppkey = s_suppkey and
    c_nationkey = s_nationkey and
    s_nationkey = n_nationkey and
    n_regionkey = r_regionkey and
    r_name = 'ASIA' and
    o_orderdate >= 757382400 and
    o_orderdate < 788918400
group by
    n_name
order by
    revenue desc;

To implement the above statement, a join view needs to be created with all of the tables listed in the FROM clause, each with an appropriate alias, and an expression, based on the WHERE clause, which binds the tables together:

gpudb.create_join_table(
    join_table_name = 'tcph_query_5_join',
    table_names = ['customer','orders','lineitem','supplier','nation','region'],
    aliases = ['c','o','l','s','n','r'],
    expression = 'c.c_custkey = o.o_custkey and ' \
                 'l.l_orderkey = o.o_orderkey and ' \
                 'l.l_suppkey = s.s_suppkey and ' \
                 'c.c_nationkey = s.s_nationkey and ' \
                 's.s_nationkey = n.n_nationkey and ' \
                 'n.n_regionkey = r.r_regionkey'
)

The join view can then have the WHERE clause filters applied in a separate step:

gpudb.filter(
    table_name = 'tcph_query_5_join',
    view_name = 'tcph_query_5_join_filter',
    expression = 'r.r_name = 'ASIA' and ' \
                 'o.o_orderdate >= 757382400 and ' \
                 'o.o_orderdate < 788918400'
)

The column names of the join view will have the original table aliases with which they were associated as part of their permanent names. In the above filter call, r.r_name and other column names are the actual column names of the joined view, not indicative of further aliasing being applied.

This join view and filter could also have been done in a single step:

gpudb.create_join_table(
    join_table_name = 'tcph_query_5_join_filter',
    table_names = ['customer','orders','lineitem','supplier','nation','region'],
    aliases = ['c','o','l','s','n','r'],
    expression = 'c.c_custkey = o.o_custkey and ' \
                 'l.l_orderkey = o.o_orderkey and ' \
                 'l.l_suppkey = s.s_suppkey and ' \
                 'c.c_nationkey = s.s_nationkey and ' \
                 's.s_nationkey = n.n_nationkey and ' \
                 'n.n_regionkey = r.r_regionkey and ' \
                 'r.r_name = 'ASIA' and ' \
                 '(o.o_orderdate >= 757382400) and ' \
                 '(o.o_orderdate < 788918400)'
)

Regardless of whether the join/filter was done in one step or two, the results need to be grouped and summed to match the TPC-H query. The following call is given the directive to sort_by the value, which will sort on the aggregation column:

retobj = gpudb.aggregate_group_by(
             table_name = 'tcph_query_5_join_filter',
             column_names = [
                 'n.n_name',
                 'sum(l.l_extendedprice * (1 - l.l_discount))'
             ],
             offset = 0,
             limit = gpudb.END_OF_SET,
             encoding = 'json',
             options = {'sort_by':'value', 'sort_order':'descending'}
         )

The results can be displayed by calling:

gpudb.parse_dynamic_response(retobj, do_print=True)

Performance Optimization

In order to improve the performance of a join operation, the columns used to make the relation should be indexed.

When a join is performed along a foreign key relation, it is indexed automatically as a function of the foreign key itself--no additional setup is required.

When a join is performed along a relation where no foreign key exists, the indexing must be done as a separate configuration step. Both columns, on either side of the join relation, should have an index applied. An error may result if a join is attempted across a relation where only one side of the relation is indexed.

Limitations & Cautions

Joins must merge together simple data sets, and can therefore use both tables and views as sources, but not collections.

Operations on Underlying Tables

Join views are affected by data modifications on underlying tables in different ways, based on the type of data modification.

  • Inserts: If a record is added to an underlying table, it will not appear in queries of the join view, whether or not the applied filter would include it
  • Updates: If a record that already contributes to a join view record set is modified in its underlying table, it will appear with the modified values when the join view is re-queried. If a record that does not contribute to a join view record set is modified in its underlying table in a way that would make it part of the join view record set, it will not appear when the join view is re-queried.
  • Deletes: If a record that already contributes to a join view record set is deleted from its underlying table, it will still appear when the join view is re-queried.

Overall, data manipulation within the tables supporting a join view will not cause the number of records returned by the join view to change, though updates to records already part of the join view record set will be reflected in the data values retrieved from the join view.

Cartesian Products

Though the sharded tables in a join view must be joined in the join view creation expression, the replicated tables do not have to be. In this case, a Cartesian Product will result for every unmerged replicated table.

For example, given table A with integer columns x and y and three records, and table B with a string column r and an integer column s and two records, as follows:

Table A:

x y
1 10
2 20
3 30

Table B:

r s
'x' 100
'y' 200

Then the join view of A and B with aliases a and b and with no expression given is a 4-column view with 6 records, as follows:

a.x a.y b.r b.s
1 10 'x' 100
1 10 'y' 200
2 20 'x' 100
2 20 'y' 200
3 30 'x' 100
3 30 'y' 200

It should be noted that while the view produced is the cross-product of the input tables, the actual cross-product is never realized in memory. However, when doing an aggregate operation (such as group-by or histogram) on the join view, every object in the cross-product will be visited. Filter operations that transform input join views into smaller result join views should be done prior to any aggregation queries, reducing the number of records that must be visited.

NOTE: In order for a join to have a Cartesian Product, both the server and the client call need to be configured properly. See the Configuration Reference for server configuration details and /create/jointable endpoint documentation for API details. If not configured properly, a Cartesian Product may result in an error.

Materialized Views

Kinetica supports the concept of a materialized view, which is a greater-performant query across one or more tables.

In Kinetica, this view can be insert-triggered; meaning that it can be configured to refresh based on the presence of new records--updates & deletes will not prompt the refresh behavior. This scheme is available primarily because Kinetica serves the OLAP use case best, where a system is heavy on inserts into a fact table and light on updates & deletes.

An update to a record in a table supporting the materialized view will be reflected automatically in the view, regardless of whether the update would cause the record to be excluded from the result set, were the query re-issued. For instance, if a record was updated in a way that made it no longer pass the query's filter criteria, it would still be present in the materialized view.

The deletion of a record in a table supporting the materialized view will not be reflected in the view until it is manually refreshed.

A materialized view query will be analyzed by Kinetica in an attempt to determine a base table, which is defined as either:

  • the sole table in the query
  • the root table, linked directly or indirectly to all other tables through relationships with the primary keys of those tables; it is therefore necessary that all other tables have primary keys

The materialized view will follow an operational course based on whether a base table could be determined, what mode the materialized view has been set to, and what data manipulation operations have been applied to the tables composing it.

There are three modes in which a materialized view can operate: manual, on query, and on insert.

Manual Mode

In manual mode, the materialized view will only be made fully current by performing a manual refresh of the view. If an incremental refresh is applied, only inserts into the base table will be reflected in the updated content of the materialized view. If inserts are made into tables other than the base table, a full refresh will be performed. This will result in a rebuilding of all the view's data, accounting for any updates & deletes.

On Query Mode

In on query mode, the materialized view will be made current when a query is made against it and records have been inserted into any of the data sets that compose the view. If records have only been added to the base table, an incremental refresh will be performed; otherwise, a full refresh will be performed. If no data is added between the issuance of two queries against the materialized view, no rebuilding of the view will be performed.

On Insert Mode

In on insert mode, the materialized view will be kept current by automatically adding to it each record inserted into the base table. Outside of base table inserts, this mode functions the same as on query mode.

The primary use case for this mode is for a view against a set of tables where the data manipulation operations consist almost entirely of base table inserts and the queries against the materialized view need to be fast and current, with respect to those inserts. An example would be an OLAP system consisting of mostly inserts into a fact table.

Manual Refresh

A manual refresh of a materialized view can be performed on any materialized view, regardless of operating mode, to update the view's data. As a materialized view is insert-triggered, it may be necessary to refresh the data periodically to properly reflect updates & deletes that have taken place since the materialized view was created or since the last refresh. There are two types of refresh operations that can be performed: full & incremental.

A full refresh will cause a rebuilding of the entire materialized view, irrespective of what data manipulation operations have taken place. This operation takes the same time as creating the original view, but is the one that is guaranteed to properly reflect updates & deletes made to the underlying tables.

An incremental refresh is insert-based. If one is requested on a materialized view in which the only new data are records inserted into the base table, only those records will be added to the materialized view. If inserts have been made into tables other than the base table, a full refresh will be performed.

Note that it is generally unnecessary to issue an incremental refresh against a materialized view in on insert mode or on query mode, as both of these use the incremental refresh automatically to stay current.

An example, in Python, of performing an incremental refresh of a materialized view (recent_customer_order):

response = gpudb.create_join_table(
               join_table_name = 'recent_customer_order',
               table_names = [],
               aliases = [],
               options = {'refresh':'refresh'}
           )

An example of performing a full refresh on the same materialized view:

response = gpudb.create_join_table(
               join_table_name = 'recent_customer_order',
               table_names = [],
               aliases = [],
               options = {'refresh':'full_refresh'}
           )

Materialized View vs. Filtered View

A materialized view differs from a filtered view in several important ways:

  • inserts - additions to the data set that composes a view will not be reflected, but can be reflected in a materialized view
  • updates - modifications in the data set that composes a view will result in those modified records being removed from the view; they will be updated in a materialzied view, even if the modifications would result in those records being filtered out of the materialized view, were the query re-issued
  • deletes - deletions in the data set that composes a view will be reflected properly; in a materialized view, those records will still appear

SQL View

A standard SQL view acts as a persisted query on a given data set. When a user queries the view, the view's query is re-executed and the new results returned. Kinetica can mimic this behavior, to some extent, with a materialized view. A materialized view can be created in manual mode (to avoid the extra processing that occurs during inserts in on insert mode) and then have a manual full refresh requested of it before being requeried. This will ensure that all recent inserts, updates, & deletes are accurately reflected in the result set.

Examples

The following are examples of working with materialized views in Python.

Creating a Materialized View

To create a materialized view (recent_order) on a single table (order), and keep recent_order current as new records are inserted into order:

response = gpudb.create_join_table(
               join_table_name = 'recent_order',
               table_names = ['order'],
               aliases = ['o'],
               options = {'refresh_method':'on_insert'}
           )

To create a materialized view (recent_customer_order) across three tables: a base table (order), a parent table (customer), and a parent table to that (nation), and keep recent_customer_order current as new records are inserted into order:

response = gpudb.create_join_table(
               join_table_name = 'recent_customer_order',
               table_names = ['order', 'customer', 'nation'],
               aliases = ['o', 'c', 'n'],
               expression = \
                   'o.custkey = c.custkey and ' \
                   'c.nationkey = n.nationkey',
               options = {'refresh_method':'on_insert'}
           )

NOTE: In the example above, the customer & nation tables must have primary keys on custkey & nationkey, respectively, for the on insert scheme to function properly.

To create a materialized view (customer_location) across three tables: a table (customer), its parent table (nation), and the parent table of that (region), and make customer_location current upon querying it after new records are inserted into customer, nation, or region:

response = gpudb.create_join_table(
               join_table_name = 'customer_location',
               table_names = ['customer', 'nation', 'region'],
               aliases = ['c', 'n', 'r'],
               expression = \
                   'c.nationkey = n.nationkey and ' \
                   'n.regionkey = r.regionkey',
               options = {'refresh_method':'on_query'}
           )

NOTE: In the example above, there are no primary key requirements.