Distributed Ingest & Key Lookup Use Case

A common, highly-performant use case for Kinetica is distributed ingest into a fact table and distributed key lookup from a materialized view created from that fact table, periodically refreshed at some interval.

The example given below can be downloaded & run to demonstrate the use case on a local Kinetica instance.

This guide will demonstrate a simple use case, implemented in Java & SQL, consisting of four entities:

  1. Distributed Ingest Client
  2. Fact Table (ingestion target)
  3. Materialized View from Fact Table (key lookup source)
  4. Distributed Key Lookup Client
Distributed Ingest & Key Lookup

Distributed Ingest & Key Lookup


These components work together in four phases:

  1. Setup Phase
  2. Ingest Phase
  3. Refresh Phase
  4. Key Lookup Phase

Setup Phase

First, a database connection needs to be established.

Establish Database Connection
1
2
3
4
GPUdbBase.Options options = new GPUdbBase.Options();
options.setUsername(user);
options.setPassword(pass);
this.db = new GPUdb(url, options);

If specified as a parameter, a schema can be used to contain the other database objects subsequently created; it will be created if it doesn't exist.

Create/Use Optional Schema
1
2
3
if (this.schemaName != null)
    if (!this.db.hasSchema(this.schemaName, null).getSchemaExists())
        this.db.createSchema(this.schemaName, null);

A table, order_history, needs to be created before ingestion can begin. It serves as the distributed ingestion target.

The table is sharded on store_id, so that the subsequent aggregation on store_id by the materialized view can execute quickly, without having to synchronize aggregation results across shards.

Create Order History Table
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
String ddlOrderHistory = "" +
"   CREATE OR REPLACE TABLE " + this.tableNameHistory   +
"    (                                                " +
"        store_id INT NOT NULL,                       " +
"        id INT NOT NULL,                             " +
"        total_amount DOUBLE NOT NULL,                " +
"        timestamp TYPE_TIMESTAMP NOT NULL,           " +
"        PRIMARY KEY (id, store_id),                  " +
"        SHARD KEY (store_id)                         " +
"    );                                               ";
this.db.executeSql(new ExecuteSqlRequest().setStatement(ddlOrderHistory));

A materialized view, store_sales, needs to be created before keyed lookups can begin. It serves as the distributed key lookup source.

The materialized view aggregates each store's total sales by date from the order_history table, grouping on store_id and the date portion of the order timestamp. It also uses the KI_HINT_GROUP_BY_PK hint to make a primary key out of these two grouping columns. Doing so automatically creates a primary key index on the two columns, meeting the keyed lookup criteria that all columns involved in the lookup be indexed.

The refresh interval given to the materialized view is for reference only; in this example, the view will be manually refreshed to allow results to be shown immediately.

Create Store Sales Materialized View
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
String ddlStoreSales = "" +
"    CREATE OR REPLACE MATERIALIZED VIEW " + this.viewNameSales    +
"    REFRESH EVERY 10 MINUTES AS                                 " +
"    SELECT  /* KI_HINT_GROUP_BY_PK */                           " +
"        store_id,                                               " +
"        DATE(timestamp) AS order_date,                          " +
"        SUM(total_amount) AS total_sales                        " +
"    FROM                                                        " +
"        " + this.tableNameHistory + "                           " +
"    GROUP BY                                                    " +
"        store_id,                                               " +
"        DATE(timestamp);                                        ";
this.db.executeSql(new ExecuteSqlRequest().setStatement(ddlStoreSales));

Ingest Phase

First, the Type schema of the ingest target table needs to be extracted, for later use in BulkInserter construction and record insertion.

Get Table Schema
1
Type orderType = Type.fromTable(this.db, this.tableNameHistory);

The BulkInserter is the Java object that performs distributed ingestion. To create one, give it the database handle, table name & Type schema, record ingest queue size, and any ingest options.

Configure & Create a BulkInserter
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// Configure bulk inserter options to store up per-record insertion errors
Map<String,String> options = GPUdbBase.options
(
        InsertRecordsRequest.Options.RETURN_INDIVIDUAL_ERRORS, InsertRecordsRequest.Options.TRUE,
        InsertRecordsRequest.Options.ALLOW_PARTIAL_BATCH, InsertRecordsRequest.Options.TRUE
);

// Construct a bulk inserter to perform the distributed ingest
try (BulkInserter<GenericRecord> bulkInserter =
        new BulkInserter<>(this.db, this.tableNameHistory, orderType, queueSize, options))

Once the BulkInserter has been created, GenericRecord objects of the given Type are inserted into it. The indexes used in assigning column values to each GenericRecord are 0-based and match the column order of the target table: store_id, id, total_amount, and then timestamp.

Insert Data
1
2
3
4
5
6
GenericRecord order = new GenericRecord(this.type);
order.put(0, storeId);
order.put(1, orderId);
order.put(2, totalAmount);
order.put(3, timestamp);
this.bulkInserter.insert(order);

The BulkInserter automatically inserts records as each of its queues reaches the configured queueSize. Before ending the ingest session, the BulkInserter is flushed, initiating inserts of any remaining queued records.

Flush Data to Server
1
bulkInserter.flush();

Tip

See the ingest method of DistributedIOUseCase.java for details.

Refresh Phase

Though the materialized view in this example (and in the associated use case) is configured to periodically refresh, a manual refresh is done between ingest & key lookup phases to avoid having to wait for the refresh cycle.

The manual refresh requires only a database connection in order to be initiated.

Refresh Materialized View
1
this.db.alterTable(this.viewNameSales, AlterTableRequest.Action.REFRESH, null, null);

Tip

See the refresh method of DistributedIOUseCase.java for details.

Key Lookup Phase

The RecordRetriever is the Java object that performs distributed key lookup. Creating one requires steps similar to creating a BulkInserter.

First, the database connection is used to extract the Type schema of the lookup source.

Get View Schema
1
Type storeTotalType = Type.fromTable(this.db, this.viewNameSales);

Then, the RecordRetriever is created with that Type.

Create a RecordRetriever
1
2
RecordRetriever<GenericRecord> recordRetriever =
        new RecordRetriever<>(this.db, this.viewNameSales, storeTotalType);

Distributed key lookup is accomplished by specifying the key(s) to look up and any filters that need to be applied.

The optional lookup filter is fixed to the current day's date.

Create Key Lookup Filter
1
2
final String today = LocalDate.now().toString();
final String todayFilter = "order_date = '" + today + "'";

The lookup keys are established, one for each store number being looked up. Each key value is matched against the shard key column, store_id, of the source view, store_sales. Since the view has only one column in its shard key, there is only one entry in each key List.

Create Key Lookup Shard Value List
1
List<Object> storeKeyset = GPUdbBase.list((Object)storeNum);

With each store number lookup key and the fixed date filter for today, the RecordRetriever extracts the corresponding store sales total and outputs the store_id & total_sales. Similar to the assigning of column values with the GenericRecord during ingest, the indexes used in extracting column values from the result of the keyed lookup are 0-based and match the column order of the source view.

Execute Key Lookup
1
2
3
GetRecordsResponse<GenericRecord> grResp = recordRetriever.getByKey(storeKeyset, todayFilter);
for (GenericRecord storeTotal : grResp.getData())
    System.out.printf("%7d %13.2f%n", storeTotal.get(0), storeTotal.get(2));

Tip

See the lookup method of DistributedIOUseCase.java for details.

Download & Run

Included below are the artifacts needed to run this example on an instance of Kinetica. Maven will be used to compile the Java program.


To run the example, compile the Java program & run.

  1. Create a directory for the example Java project and move the example files into it as shown:

    Distributed Ingest & Key Lookup Project Layout
    1
    2
    3
    4
    5
    6
    
    DistributedIOUseCase/
       ├── pom.xml
       └── src/
          └── main/
                └── java/
                      └── DistributedIOUseCase.java
    
  2. Compile the Java program in the directory with the pom.xml:

    Compile Distributed Ingest & Key Lookup Project
    1
    2
    
    cd DistributedIOUseCase
    mvn clean package
    
  3. Run the example:

    Run Distributed Ingest & Key Lookup Project
    1
    
    java -jar target/distributed-ops-1.0-jar-with-dependencies.jar run [<hostname> [<username> <password>]]
    
  4. Verify the output consists of aggregated sales totals for each store for the current day and should be similar to the following:

    Distributed Ingest & Key Lookup Project Output
    Summary for date: [2026-02-16]
    Store #  Total Sales
    ======= =============
          1   50183315.04
          2   49968365.50
          3   49985043.96
          4   49476362.15
          5   49844709.28
          6   50105162.61
          7   49979852.50
          8   50013925.24
          9   49556358.92
         10   50197639.68