Version:

Multi-Head Use Case

A common, highly-performant use case for Kinetica is multi-head ingest into a fact table and multi-head egress from a materialized view created from that fact table, periodically refreshed at some interval.

The example given below can be downloaded & run to demonstrate the use case on a local Kinetica instance.

This guide will demonstrate a simple use case, implemented in Java & SQL, consisting of four entities:

  1. Multi-Head Ingest Client
  2. Fact Table (ingestion target)
  3. Materialized View from Fact Table (egress source)
  4. Multi-Head Egress Client
Multi-Head Ingest/Egress

Multi-Head Ingest/Egress


These components work together in four phases:

  1. Setup Phase
  2. Ingest Phase
  3. Refresh Phase
  4. Egress Phase

Setup Phase

A table, order_history, needs to be created before ingestion can begin. It serves as the multi-head ingestion target.

The table is sharded on store_id, so that the subsequent aggregation on store_id by the materialized view can execute quickly, without having to synchronize aggregation results across shards.

CREATE OR REPLACE TABLE order_history
(
    store_id INT NOT NULL,
    id INT NOT NULL,
    total_amount DOUBLE NOT NULL,
    timestamp TYPE_TIMESTAMP NOT NULL,
    PRIMARY KEY (id, store_id),
    SHARD KEY (store_id)
);

A materialized view, store_sales, needs to be created before keyed lookups can begin. It serves as the multi-head egress source.

The materialized view aggregates each store's total sales by date from the order_history table, grouping on store_id and the date portion of the order timestamp. It also uses the KI_HINT_GROUP_BY_PK hint to make a primary key out of these two grouping columns. Doing so automatically creates a primary key index on the two columns, meeting the keyed lookup criteria that all columns involved in the lookup be indexed.

The refresh interval given to the materialized view is for reference only; in this example, the view will be manually refreshed to allow results to be shown immediately.

CREATE OR REPLACE MATERIALIZED VIEW store_sales
REFRESH EVERY 10 MINUTES AS
SELECT  /* KI_HINT_GROUP_BY_PK */
    store_id,
    DATE(timestamp) AS order_date,
    SUM(total_amount) AS total_sales
FROM
    order_history
GROUP BY
    store_id,
    DATE(timestamp);

Tip

See the multihead_use_case.kisql DDL script for details.

Ingest Phase

The BulkInserter is the Java object that performs multi-head ingestion. To create one, a database connection is first established. From there, the Type schema of the ingest target table is extracted from the database and the BulkInserter created with that Type.

final String tableName = "order_history";

// Acquire a handle to the database
GPUdb db = new GPUdb(kineticaUrl);

// Acquire a type for the order history table
Type orderType = Type.fromTable(db, tableName);

// Acquire a bulk inserter to perform the multi-head ingest
BulkInserter<GenericRecord> bulkInserter =
		new BulkInserter<>(db, tableName, orderType, queueSize, null, new WorkerList(db));

Once the BulkInserter has been created, GenericRecord objects of the given Type are inserted into it. The indexes used in assigning column values to each GenericRecord are 0-based and match the column order of the target table: store_id, id, total_amount, and then timestamp.

GenericRecord order = new GenericRecord(type);
order.put(0, storeId);
order.put(1, orderId);
order.put(2, totalAmount);
order.put(3, timestamp);
bulkInserter.insert(order);

The BulkInserter automatically inserts records as each of its queues reaches the configured queueSize. Before ending the ingest session, the BulkInserter is flushed, initiating inserts of any remaining queued records.

bulkInserter.flush();

Tip

See the ingest method of MultiHeadUseCase.java for details.

Refresh Phase

Though the materialized view in this example (and in the associated use case) is configured to periodically refresh, a manual refresh is done between ingest & egress phases to avoid having to wait for the refresh cycle.

The manual refresh requires only a database connection in order to be initiated.

GPUdb db = new GPUdb(kineticaUrl);
db.alterTable("store_sales", AlterTableRequest.Action.REFRESH, null, null);

Tip

See the refresh method of MultiHeadUseCase.java for details.

Egress Phase

The RecordRetriever is the Java object that performs multi-head egress. Creating one requires steps similar to creating a BulkInserter. First, a database connection is established. Next, the Type schema of the egress source table is extracted from the database and the RecordRetriever created with that Type.

GPUdb db = new GPUdb(kineticaUrl);

// Acquire a type for the store total table
Type storeTotalType = Type.fromTable(db, viewName);

// Acquire a record retriever to perform the multi-head egress
RecordRetriever<GenericRecord> recordRetriever =
		new RecordRetriever<>(db, viewName, storeTotalType, new WorkerList(db));

Multi-head egress is accomplished through a keyed lookup of records. At this point, the lookup keys & filters are established.

The lookup filter is fixed to the current day's date.

final String today = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
final String todayFilter = "order_date = '" + today + "'";

The lookup keys are established, one for each store number being looked up. Each key value is matched against the shard key column, store_id, of the source view, store_sales. Since the view has only one column in its shard key, there is only one entry in each key List.

List<Object> storeKeyset = GPUdb.list((Object)storeNum);

With each store number lookup key and the fixed date filter for today, the RecordRetriever extracts the corresponding store sales total and outputs the store_id & total_sales. Similar to the assigning of column values with the GenericRecord during ingest, the indexes used in extracting column values from the result of the keyed lookup are 0-based and match the column order of the source view.

GetRecordsResponse<GenericRecord> grResp = recordRetriever.getByKey(storeKeyset, todayFilter);
for (GenericRecord storeTotal : grResp.getData())
	System.out.printf("%7d %13.2f%n", storeTotal.get(0), storeTotal.get(2));

Tip

See the egress method of MultiHeadUseCase.java for details.

Download & Run

Included below are the artifacts needed to run this example on an instance of Kinetica. Maven will be used to compile the Java program.


To run the example, the database objects need to be created and then the Java program compiled & run.

  1. Run the DDL script using any ODBC client, GAdmin, or as shown here via KiSQL on the Kinetica server:

    /opt/gpudb/bin/kisql -f multihead_use_case.kisql
    
  2. Create a directory for the example Java project and move the example files into it as shown:

    MultiHeadUseCase/
     ├── pom.xml
     └── src/
          └── main/
               └── java/
                    └── MultiHeadUseCase.java
    
  3. Compile the Java program in the directory with the pom.xml:

    cd MultiHeadUseCase
    mvn clean package
    
  4. Run the example:

    java -cp target/multi-head-1.0-jar-with-dependencies.jar MultiHeadUseCase run
    
  5. Verify the output consists of aggregated sales totals for each store for the current day and should be similar to the following:

    Summary for date: [2018-10-01]
    Store #  Total Sales
    ======= =============
          1   49920144.83
          2   49924795.62
          3   50096909.16
          4   49986967.66
          5   49987397.21
          6   50250738.93
          7   50280934.38
          8   50145700.21
          9   49807242.51
         10   49875441.35