Overview
The concept of tables is at the heart of Kinetica interactions. A table is a data container associated with a specific type (set of columns & properties), much like tables in other database platforms. When querying a table, the result set will also have a single type associated with it.
Each table must exist within a schema.
Name Resolution
A table can be addressed by qualified name, prefixing the table name with the name of its containing schema, separated by a dot; e.g.:
<schema name>.<table name>
Tables referenced without a schema will be looked for in the user's default schema, if assigned; effectively, a reference like this:
<table name>
is resolved like this:
<default schema name>.<table name>
Naming Criteria
Each table is identified by a name, which must meet the following criteria:
- Between 1 and 200 characters long
- First character is alphanumeric or an underscore
- Alphanumeric, including spaces and these symbols: _ # { } [ ] ( ) : -
- Unique within its containing schema--cannot have the same name as another table or view in the same schema, but can have the same name as any schema
Column names must meet the following criteria:
- Between 1 and 200 characters long
- First character is alphanumeric or an underscore
- Alphanumeric, including these symbols: _ { } [ ] : .
Example
Using the /create/table endpoint, you can create an empty table that can later hold records.
For example, in Python, to create a simple 2-column table:
|
|
|
|
Distribution
Table data can be distributed across the Kinetica cluster using one of two schemes: sharding & replication. Within the sharding scheme, data can be distributed either by key or randomly in the absence of a shard key.
Sharding
Sharding is the distribution of table data by hashing a particular value for each record, and by that hash, determining on which Kinetica cluster node the record will reside.
The benefit of sharding is that it allows distributed queries to be run against a given data set, with each node responsible for managing its portion of the overall data set, while only storing each record once across the cluster. The parallelism inherent in the distributed queries allows for the query performance to scale linearly with the addition of each cluster node.
Another benefit of sharding is that it increases the performance of any aggregation run against the table where the aggregation group includes all of the shard key columns.
A limitation of sharding is that two sharded data sets can only be joined together if they are sharded in the same way, so that their corresponding records will all reside on the same nodes. Given that, in a typical database model, each pair of related data sets is associated by a different key, a single query may only be able to join together at most two sharded data sets along their relation.
Since sharding maximizes data storage & query efficiency, it is recommended that the largest table in the system (e.g. the fact table in a data warehouse) be sharded. The second largest table against which that table is joined should also be sharded along the join relation (e.g. the column on each side of the foreign key relationship).
Given that sharding allows for greater-performant aggregations, it is also recommended that any large tables subject to frequent aggregation queries also be sharded on the grouping columns.
For example, if the largest joined tables in a system were customer & order, and there was a foreign key relationship between the customer_id column of the order table & the id column of the customer table, the order table should be sharded on customer_id and the customer table should be sharded on id.
If, for example, there were also an order_history table, which was frequently aggregated on store_id, it could be sharded on store_id to increase the performance of those queries.
Specifying a shard key requires that you create a type with at least one column that has the shard_key property when calling the /create/type endpoint. See Shard Keys for details.
For example, in Python:
|
|
|
|
For SQL syntax, see CREATE TABLE.
Random Sharding
Random sharding is the distribution of table data by randomly selecting a shard for each batch of records during ingestion. Since all of the records from each batch will go to the selected shard, it is important to configure the batch size appropriately to ensure an even distribution of records across the cluster over the entire ingestion process.
Random sharding has the same benefit as sharding with respect to distributed query parallelism and storage minimization. It lacks, however, the benefit of greater-performant aggregations over the shard key, as the randomly-sharded data set lacks one.
A limitation of random sharding is that a randomly-sharded data set can only be joined with replicated data sets--it cannot be joined with other sharded or randomly-sharded data sets.
Given that random sharding maximizes data storage & query efficiency in the same way that sharding does, it is recommended that large tables in the system that are not good candidates for sharding with respect to joins & aggregations be randomly-sharded.
For example, if a large table in a system would only ever be joined with replicated tables and would never have aggregations run against it, the table would be a good candidate for random sharding.
Random sharding is the default distribution technique applied to tables that meet the following criteria:
- No primary key specified
- No shard key specified
- Not specified as replicated
For example, in Python:
|
|
|
|
For SQL syntax, see CREATE TABLE.
Replication
Replication is the distribution of table data by locating its entire data set on every Kinetica cluster node simultaneously, instead of being distributed across them.
The benefit of replication is that it allows data sets to be joined together when those data sets are not sharded on the columns being associated.
Normally, joining two data sets together requires them being joined on their shard keys so that the joining of the two data sets can occur locally on each processor shard. Since replicated data sets exist on all shards, they appear local to any other data set and can be joined on each shard as if they were local. The result sets from each shard are then accumulated into one and returned as the final result set.
Since replicated data sets exist in their entirety on all processors, it is recommended that they be relatively small. For example, a table containing one gigabyte of data, when replicated across a 10-node cluster will occupy 10 gigabytes of cluster memory overall.
A table is specified as replicated at creation time, by calling the /create/table endpoint with the is_replicated option set to true.
For example, in Python:
|
|
|
|
For SQL syntax, see CREATE TABLE.
Partitioning
Table data can also be partitioned, yielding benefits in terms of performance and data management.
The types of supported partitioning schemes are:
Each partitioning scheme requires a partition key to be defined, which is a column or column expression that will be evaluated for each record in a table to determine the appropriate partition for the record.
Important
Any record with a null value in one or more of its partition key columns will be placed in the default partition, regardless of partitioning scheme.
Track tables can only be partitioned by tracks (they can only have their TRACKID column used as the partition key) and can only be partitioned using the following schemes:
- automatic list partitioning
- hash partitioning
- series partitioning
Data is partitioned at a lower level than it is sharded. When distributing a record:
- The shard key (if any) is used to determine the appropriate shard for the record
- The partition key is used to determine the appropriate partition for the record, within the shard.
For SQL syntax, see:
Range
Each range partition is defined as a range of values, and all records having a partition key value in that range are assigned to that partition. When filtering data, only those partitions whose ranges are within the filter criteria will be accessed, improving query response time. This performance improvement on query comes at the cost of more complexity in setting up & managing the partitions.
When defining the range, one or more named partitions are specified, and each is given a minimum (inclusive) and/or maximum (exclusive) value. A name is required so the partition can be altered or deleted later. Ordering is important, and the partitions must be specified in ascending order so that the minimum and maximum of each partition can be inferred if it is not explicitly given. There can be no overlap between ranges, meaning one record can not qualify for more than one partition. However, there can be gaps between ranges, meaning that a record can fail to qualify for any of the defined partitions.
All records which don’t belong to any of the named partitions will be placed in the default partition. If a new partition is added, any records in the default partition that would qualify for being placed in the new partition will be moved there.
The following effective types are unsupported partition key types (though they can be used inside a column expression, as long as the type of the final expression is not one of these):
- byte
- string
- WKT
A table is range-partitioned at creation time, by calling the /create/table endpoint with a partition_type option of RANGE and partition_keys & partition_definitions options set appropriately.
For example, to create a range-partitioned table with the following criteria, in Python:
- partitioned on the date/time of the order
- partitions for years:
- 2014 - 2016
- 2017
- 2018
- 2019
- records not in that range go to the default partition
|
|
|
|
To add a partition to a range-partitioned table, in Python:
|
|
To remove a partition from a range-partitioned table, moving all contained data into the default partition, in Python:
|
|
To delete a partition from a range-partitioned table, erasing all contained data, in Python:
|
|
Interval
Interval partitions are defined as numeric or time-based intervals, and all records having partition key values that fall within a given partition's interval or on its lower boundary are assigned to that partition. When a record contains a key value that doesn't fall within the interval of any existing partition, and the value is after the specified starting point, a new partition is created with the corresponding interval and the record is assigned to it. Partitions are thereby created dynamically to fit the data set, rather than being defined at table creation time.
Tip
This is the preferred method for partitioning by date, as it will automatically create new partitions as time moves forward.
When defining the interval, a starting point and interval length can be given; if not given, the following defaults apply:
Type | Starting Point | Interval |
---|---|---|
Numeric | 0 | 1 |
Date & Date/Time | January 1st, 1970 | 1 day |
Time | 00:00:00 | N/A |
Names are not given to interval partitions, so they may not be removed directly. However, deleting all records whose key values fall within a given interval partition will effectively remove it.
All records that have key values lower than the defined starting point will be assigned to the default partition.
The following effective types are unsupported partition key types (though they can be used inside a column expression, as long as the type of the final expression is not one of these):
- byte
- string
- charN
- WKT
A table is interval-partitioned at creation time, by calling the /create/table endpoint with a partition_type option of INTERVAL and partition_keys & partition_definitions options set appropriately.
For example, to create an interval-partitioned table with the following criteria, in Python:
- partitioned on the date/time of the order
- one partition for each year from 2014 on
- later year partitions are added as necessary
- records prior to 2014 go to the default partition
|
|
|
|
To create an interval-partitioned table with the following criteria:
- partitioned on the date/time of the order
- one partition for each day from January 1st, 2014 on
- later day partitions are added as necessary
- records prior to 2014 go to the default partition
|
|
|
|
The same interval-partitioned scheme above can be created using the timestamp column directly, and date-aware INTERVAL syntax (supported date/time units are the same as those listed with the TIMESTAMPADD function under Date/Time Base Functions ):
|
|
|
|
This scheme can be easily modified to create an hourly partition instead:
|
|
List
List partitions are defined in one of two schemes:
The following effective types are unsupported partition key types (though they can be used inside a column expression, as long as the type of the final expression is not one of these):
- byte
- string
- WKT
A table is list-partitioned at creation time, by calling the /create/table endpoint with a partition_type option of LIST and partition_keys & partition_definitions options set appropriately. Set is_automatic_partition option to true in order to use an automatic partitioning scheme.
Manual Partitioning
Manual partitions are defined as a specific list of values, and all records having partition key values that are in one of those lists are assigned to the corresponding partition. A given list entry cannot exist in more than one list, meaning one record cannot qualify for more than one partition.
All records which don’t belong to any of the named partitions will be placed in the default partition. If a new partition is added, any records in the default partition that would qualify for being placed in the new partition will be moved there.
Single Column Partition Key
For example, to create a manual list-partitioned table with the following criteria, in Python:
- partitioned on the date/time of the order
- partitions for years:
- 2014 - 2016
- 2017
- 2018
- 2019
- records not in that list go to the default partition
|
|
|
|
To add a partition to a manual list-partitioned table, in Python:
|
|
To remove a partition from a manual list-partitioned table, moving all contained data into the default partition, in Python:
|
|
To delete a partition from a manual list-partitioned table, erasing all contained data, in Python:
|
|
Multi-Column Partition Key
To create a manual list-partitioned table with the following criteria, in Python:
- partitioned on the date/time of the order
- each partition corresponds to a unique year & month pair
- partitions for years/months:
- February 2016 & March 2016
- March 2020
- records not in that list go to the default partition
|
|
|
|
To add a partition to a manual list-partitioned table with a multi-column key, in Python:
|
|
To remove a partition from a manual list-partitioned table with a multi-column key, moving all contained data into the default partition, in Python:
|
|
To delete a partition from a manual list-partitioned table with a multi-column key, erasing all contained data, in Python:
|
|
Automatic Partitioning
Automatic partitions direct the database to automatically create a new partition for each distinct partition key value encountered in the data. A table with no data would have no data-based partitions, while a table with data that contained N unique partition key values would have N of them.
To create an automatic list-partitioned table with the following criteria, in Python:
- partitioned on the date/time of the order
- one partition for each unique year & month across all orders
- partitions are added as necessary
|
|
|
|
Hash
Hash partitions are defined as simply a static number of partitions. Each record is assigned to a partition based on the hash of its partition key.
A table is hash-partitioned at creation time, by calling the /create/table endpoint with a partition_type option of HASH, partition_definitions set to the number of partitions to create, and partition_keys set to whichever columns should be included in the hash.
For example, to create a hash-partitioned table with the following criteria, in Python:
- partitioned on the date/time of the order
- distributed among the fixed set of partitions, based on the hash of the year & month of the order
- 10 partitions
|
|
|
|
Series
Series partitioning segments data into a sequence of dynamically-allocated partitions, filling each to a given percentage threshold before moving on to the next.
As records associated with new partition keys are encountered, they are added to the table's only partition with an open partition key set. Once that partition has exceeded the specified capacity, its partition key set is closed. All partition keys associated with the records in a partition at the time its key set is closed become its fixed set of keys from that point onward--any subsequent records encountered with keys matching those in its fixed set will be added to it, causing it to grow without bound. The first record encountered after that with a partition key that is not part of any closed partition key set across all partitions will trigger the creation of a new partition with an open partition key set. That record and any subsequent records with partition keys not matching any closed set are added to the new partition, until that partition exceeds its capacity threshold, where its partition key set is closed, and the cycle repeats.
With this distribution scheme, each partition will have a variable number of partition keys, depending on the cardinality of the keys and the order in which the records were inserted into the table. A table with high cardinality in its partition keys may have mostly distinct keys in its partitions, while low cardinality keys could result in partitions with very few unique keys. Conversely, a table with mostly distinct keys that has data loaded in groups by key may have fewer keys per partition, while a table with few distinct keys that has all the unique keys loaded up front may end up with all its keys (and its data) in a single partition.
The initial capacity of a partition, against which the percentage threshold is measured, is equivalent to its chunk size--the designated number of records per block of allocated memory for this table. This is either set manually, via the chunk_size option, when the table is created, or it will default to the system default chunk_size, which is set in the system configuration. See the Persistence section of the Configuration Reference for details.
While this scheme was designed for colocating tracks from track tables into the same partition, it can be used with any table type, for suitable use cases.
A table is series-partitioned at creation time, by calling the /create/table endpoint with a partition_type option of SERIES, partition_definitions set to the percentage full the open partition should be before its partition key set is closed and a new one created, and partition_keys set to whichever columns should be included in the partition key.
Note
The default partition fill threshold is 50%.
For example, to create a series-partitioned table with the following criteria, in Python:
- partitioned on the customer of each order
- partitions with closed key sets will contain all orders from a set of unique customers
- 50% fill threshold
|
|
|
|
To create a series-partitioned track table with the following criteria, in Python:
- partitioned on the track ID
- partitions with closed key sets will contain all points from a unique set of tracks
- 25% fill threshold
|
|
|
|
Primary Keys
Primary key is a designation that can be applied to one or more columns in a table. A primary key composed of multiple columns is known as a composite primary key.
Purpose
The primary key is used to ensure the uniqueness of the data contained in the keyed column(s).
The uniqueness constraint is enforced upon insert in two ways:
- If a record to be inserted has key values that match those of an already
existing record in the target table, the new record’s values will either:
- Overwrite the existing record’s values, if the update_on_existing_pk option is set to true
- Be ignored (the new record effectively discarded), if the update_on_existing_pk option is set to false or not set
- If two or more records within a given batch insert have the same key values, with respect to the primary key of the target table, the entire batch of records to insert is rejected
Designation
By default, a table has no primary key. One must be explicitly designated in the creation of the type schema associated with the table.
Only one primary key can exist per table.
The primary key cannot exceed 320 bytes total. With composite primary keys, the total size of all columns composing the key must not exceed 320 bytes. For instance, the maximum number of integer columns (4 bytes each) that can compose a primary key is 80. Unlimited-width strings count 8 bytes each toward this total.
Note
Store-only and WKT columns cannot be used as all or part of a primary key. Since byte columns are store-only, they may not be part of a primary key.
Relation to Shard Key
The primary key for a table not created as replicated becomes its implicit shard key, used for distributing its records across processors. Replicated tables, by definition, are not sharded and will necessarily have no shard key, implicit or otherwise. This implicit shard key for non-replicated tables can only be overridden when the primary key is a composite primary key and one or more of the composite primary key columns is explicitly designated as the shard key.
Relation to Foreign Key
The primary key designation enables the column(s) to serve as the target of a foreign key. Note that if the primary key is a composite primary key, the foreign key must also be a composite foreign key, relating the columns from the source table to all columns in the primary key.
Index
The primary key designation applies a primary key index to the primary key columns.
Creation
For syntax and an example of creating a primary key in SQL, see CREATE TABLE.
Creating a table with a primary key requires two steps. First, mark the intended primary key column(s) with the primary_key property, and then create a table with those columns.
For example, to create a table with a primary key on id, in Python:
|
|
|
|
To create a table with a composite primary key on id_a & id_b:
|
|
|
|
Shard Keys
Shard key is a designation that can be applied to one or more columns in a table. A shard key composed of multiple columns is known as a composite shard key.
Purpose
The shard key is used in distributing records across processors. This distribution allows for processing of queries against a sharded table to be performed in parallel. In the absence of a shard key, for non-replicated tables, a hash is computed for each record and serves as the key by which the associated record is distributed to its corresponding processor, or shard.
Designation
By default, a table has no shard key. One can be explicitly designated in the creation of the type schema associated with the table. One is also implicitly designated when a non-replicated table is assigned a primary key. See the next section for details.
Only one shard key can exist per table.
Note
Store-only and WKT columns cannot be used as all or part of a shard key.
Relation to Primary Key
When a primary key is assigned to any table not created as replicated, the primary key becomes the default shard key. All columns involved in the primary key are used as the shard key in the order the columns were defined in the table creation.
Note
The ordering of columns within a composite shard key is critical when performing a join between the composite shard keys of two tables. If two tables are both sharded on the same columns, but the tables were created with different orderings of those columns, a sharded join between them will not be possible.
If a primary key exists on a table, one or more of the columns composing the primary key can be designated as the shard key; only primary key columns may be used--columns not part of the primary key may not be designated as shard key columns. Designating a shard key does not automatically create a corresponding primary key with the same column(s) for a table.
Creation
For syntax and an example of creating a shard key in SQL, see CREATE TABLE.
Creating a table with a shard key requires two steps. First, mark the intended shard key column(s) with the shard_key property, and then create a table with those columns.
For example, to create a table with a shard key on id, in Python:
|
|
|
|
To create a table with a composite shard key on id_a & id_b:
|
|
|
|
Lastly, to create a table with a shard key that is not the primary key (but consists of a proper subset of the columns), with the primary key on id_a & id_b and the shard key on id_b:
|
|
|
|
Note also that sharding applies only to non-replicated tables, and the default /create/table distribution scheme implied in the example above is the non-replicated one. If an attempt is made to create a table as replicated with a column set that specifies a shard key, the request will fail. An attempt to create a replicated table with a shard key that is the same as the primary key (all columns marked with primary_key are also marked with shard_key) will succeed in creating a replicated table--the shard key designation will be ignored.
Foreign Keys
Foreign key is a designation that can be applied to one or more columns in a source table that relate them to a matching set of primary key columns in a target table. A foreign key composed of multiple columns is known as a composite foreign key.
Purpose
The foreign key is a tuning option for joining tables. It acts as a relational index from a source table to corresponding records in a target table. As new records are inserted into the source table, the index is updated with references to the target records associated with them. If a foreign key is not used, the join relationships are established during query time.
Foreign keys do not provide referential integrity checks.
Designation
By default, a table has no foreign key. One can be explicitly designated in the creation of the type schema associated with the table. One can also be designated after table creation via the /alter/table endpoint.
Multiple foreign keys can exist on a source table. Multiple foreign keys may also point to a single target table; though, since foreign key targets must be primary keys, the foreign keys would all have to point to the primary key of the target table.
Note
Store-only columns cannot be used as all or part of a foreign key.
Relation to Primary Key
In order for the linkage to be made successfully, a foreign key must connect one or more columns in a source table with the primary key column(s) of a target table. The primary key target requirement guarantees the uniqueness of the values in the target table columns, which is necessary for the foreign key relation to be made.
In the case of a composite primary key on the target table, all of the columns that compose the composite primary key must be related by the foreign key. A foreign key cannot be made to a proper subset of the composite primary key columns in the target table.
If both tables are sharded there is a further restriction, noted below.
Relation to Shard Key
If both the source & target tables are sharded, the foreign key must also connect the shard key of the source table to the shard key of the target table. The sharding requirement exists to ensure the related records of each table being connected are located on the same processing node. If either or both tables are replicated, the sharding requirement is alleviated, as all of the records from a replicated table will exist on all processor nodes, and thus, appear to be local to all records from the other table involved in the relation.
For example, if table source_table is sharded on column A, and table target_table has a primary key & shard key on column B, the only possible foreign key from source_table to target_table would connect column A to column B. No other columns from source_table can be used as source columns and no other columns from target_table can serve as target columns for the foreign key.
In the case where the target table has a shard key whose columns are a proper subset of the primary key columns, the same rules apply. If a table source_table has columns A, B, & C and is sharded on B, and table target_table has primary key columns Y & Z and is sharded on Z, there are two possible foreign keys from source_table to target_table:
- source_table(A, B) -> target_table(Y, Z)
- source_table(C, B) -> target_table(Y, Z)
In both examples, all of the primary key columns (Y & Z) of the target table are pointed to by the foreign key, and the shard key columns (B & Z) of the two tables are related by the foreign key.
Creation
Foreign keys can be designated at the time of table creation, using the /create/table endpoint.
For example, in Python, to create a source table and connect the pkt_id column of the source table to the id column of an existing target table:
|
|
|
|
Foreign keys can also be added after table creation, via /alter/table.
For example, in Python, to create a composite foreign key relating the cpkt_id_a and cpkt_id_b columns of an existing source table to the id_a and id_b columns of an existing target table, respectively:
|
|
Multiple foreign keys can be specified at the same time by using a semicolon as a delimiter. Again, in Python:
|
|
|
|