A common, highly-performant use case for Kinetica is multi-head ingest into a fact table and multi-head egress from a materialized view created from that fact table, periodically refreshed at some interval.
The example given below can be downloaded & run to demonstrate the use case on a local Kinetica instance.
This guide will demonstrate a simple use case, implemented in Java & SQL, consisting of four entities:
These components work together in four phases:
A table, order_history
, needs to be created before
ingestion can begin. It serves as the multi-head ingestion target.
The table is sharded on store_id
, so that the subsequent
aggregation on store_id
by the materialized view can execute quickly,
without having to synchronize aggregation results across shards.
CREATE OR REPLACE TABLE order_history
(
store_id INT NOT NULL,
id INT NOT NULL,
total_amount DOUBLE NOT NULL,
timestamp TYPE_TIMESTAMP NOT NULL,
PRIMARY KEY (id, store_id),
SHARD KEY (store_id)
);
A materialized view, store_sales
,
needs to be created before keyed lookups can begin. It serves as the multi-head
egress source.
The materialized view aggregates each store's total sales by date from the
order_history
table, grouping on store_id
and the date portion of the
order timestamp
. It also uses the KI_HINT_GROUP_BY_PK
hint to make a
primary key out of these two grouping columns. Doing so
automatically creates a primary key index on the two
columns, meeting the
keyed lookup criteria that all columns
involved in the lookup be indexed.
The refresh interval given to the materialized view is for reference only; in this example, the view will be manually refreshed to allow results to be shown immediately.
CREATE OR REPLACE MATERIALIZED VIEW store_sales
REFRESH EVERY 10 MINUTES AS
SELECT /* KI_HINT_GROUP_BY_PK */
store_id,
DATE(timestamp) AS order_date,
SUM(total_amount) AS total_sales
FROM
order_history
GROUP BY
store_id,
DATE(timestamp);
Tip
See the multihead_use_case.kisql DDL script for details.
The BulkInserter
is the Java object that performs multi-head ingestion.
To create one, a database connection is first established. From there, the
Type
schema of the ingest target table is extracted from the database and
the BulkInserter
created with that Type
.
final String tableName = "order_history";
// Acquire a handle to the database
GPUdb db = new GPUdb(kineticaUrl);
// Acquire a type for the order history table
Type orderType = Type.fromTable(db, tableName);
// Acquire a bulk inserter to perform the multi-head ingest
BulkInserter<GenericRecord> bulkInserter =
new BulkInserter<>(db, tableName, orderType, queueSize, null, new WorkerList(db));
Once the BulkInserter
has been created, GenericRecord
objects of the
given Type
are inserted into it. The indexes used in assigning column
values to each GenericRecord
are 0-based and match the column order of the
target table: store_id
, id
, total_amount
, and then timestamp
.
GenericRecord order = new GenericRecord(type);
order.put(0, storeId);
order.put(1, orderId);
order.put(2, totalAmount);
order.put(3, timestamp);
bulkInserter.insert(order);
The BulkInserter
automatically inserts records as each of its queues reaches
the configured queueSize
. Before ending the ingest session, the
BulkInserter
is flushed, initiating inserts of any remaining queued records.
bulkInserter.flush();
Tip
See the ingest
method of
MultiHeadUseCase.java for details.
Though the materialized view in this example (and in the associated use case) is configured to periodically refresh, a manual refresh is done between ingest & egress phases to avoid having to wait for the refresh cycle.
The manual refresh requires only a database connection in order to be initiated.
GPUdb db = new GPUdb(kineticaUrl);
db.alterTable("store_sales", AlterTableRequest.Action.REFRESH, null, null);
Tip
See the refresh
method of
MultiHeadUseCase.java for details.
The RecordRetriever
is the Java object that performs multi-head egress.
Creating one requires steps similar to creating a BulkInserter
. First, a
database connection is established. Next, the Type
schema of the egress
source table is extracted from the database and the RecordRetriever
created
with that Type
.
GPUdb db = new GPUdb(kineticaUrl);
// Acquire a type for the store total table
Type storeTotalType = Type.fromTable(db, viewName);
// Acquire a record retriever to perform the multi-head egress
RecordRetriever<GenericRecord> recordRetriever =
new RecordRetriever<>(db, viewName, storeTotalType, new WorkerList(db));
Multi-head egress is accomplished through a keyed lookup of records. At this point, the lookup keys & filters are established.
The lookup filter is fixed to the current day's date.
final String today = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
final String todayFilter = "order_date = '" + today + "'";
The lookup keys are established, one for each store number being looked up.
Each key value is matched against the shard key column, store_id
, of the
source view, store_sales
. Since the view has only one column in its shard
key, there is only one entry in each key List
.
List<Object> storeKeyset = GPUdb.list((Object)storeNum);
With each store number lookup key and the fixed date filter for today, the
RecordRetriever
extracts the corresponding store sales total and outputs the
store_id
& total_sales
. Similar to the assigning of column values with
the GenericRecord
during ingest, the indexes used in extracting column
values from the result of the keyed lookup are 0-based and match the column
order of the source view.
GetRecordsResponse<GenericRecord> grResp = recordRetriever.getByKey(storeKeyset, todayFilter);
for (GenericRecord storeTotal : grResp.getData())
System.out.printf("%7d %13.2f%n", storeTotal.get(0), storeTotal.get(2));
Tip
See the egress
method of
MultiHeadUseCase.java for details.
Included below are the artifacts needed to run this example on an instance of Kinetica. Maven will be used to compile the Java program.
multihead_use_case.kisql
- SQL DDL script for creating the ingestion target table and egress source
materialized viewpom.xml
- for compiling the
Java example programMultiHeadUseCase.java
- the ingress/egress Java example program itselfTo run the example, the database objects need to be created and then the Java program compiled & run.
Run the DDL script using any ODBC client, GAdmin, or as shown here via KiSQL on the Kinetica server:
/opt/gpudb/bin/kisql -f multihead_use_case.kisql
Create a directory for the example Java project and move the example files into it as shown:
MultiHeadUseCase/
├── pom.xml
└── src/
└── main/
└── java/
└── MultiHeadUseCase.java
Compile the Java program in the directory with the pom.xml
:
cd MultiHeadUseCase
mvn clean package
Run the example:
java -cp target/multi-head-1.0-jar-with-dependencies.jar MultiHeadUseCase run
Verify the output consists of aggregated sales totals for each store for the current day and should be similar to the following:
Summary for date: [2018-10-01]
Store # Total Sales
======= =============
1 49920144.83
2 49924795.62
3 50096909.16
4 49986967.66
5 49987397.21
6 50250738.93
7 50280934.38
8 50145700.21
9 49807242.51
10 49875441.35