> ## Documentation Index
> Fetch the complete documentation index at: https://docs.kinetica.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Distributed Ingest & Key Lookup Use Case

A common, highly-performant use case for Kinetica is
[distributed ingest](/content/tuning/multihead/multihead_ingest) into a
fact table and
[distributed key lookup](/content/tuning/multihead/multihead_egress) from a
materialized view created from that fact table, periodically refreshed at some
interval.

The example given below can be
[downloaded & run](/content/tuning/multihead/multihead_use_case#multi-head-use-case-download) to demonstrate the use
case on a local Kinetica instance.

This guide will demonstrate a simple use case, implemented in *Java* & *SQL*,
consisting of four entities:

1. Distributed Ingest Client
2. Fact Table *(ingestion target)*
3. Materialized View from Fact Table *(key lookup source)*
4. Distributed Key Lookup Client

<Frame caption="Distributed Ingest & Key Lookup">
  <img src="https://mintcdn.com/kinetica/1Hn5MNMQp397m0Vp/content/tuning/multihead/multihead_use_case.png?fit=max&auto=format&n=1Hn5MNMQp397m0Vp&q=85&s=9bfe6845a8ac9042b8d425a210a6936e" alt="Distributed Ingest & Key Lookup" width="950" height="382" data-path="content/tuning/multihead/multihead_use_case.png" />
</Frame>

These components work together in four phases:

1. [Setup Phase](/content/tuning/multihead/multihead_use_case#multi-head-use-case-setup)
2. [Ingest Phase](/content/tuning/multihead/multihead_use_case#multi-head-use-case-ingest)
3. [Refresh Phase](/content/tuning/multihead/multihead_use_case#multi-head-use-case-refresh)
4. [Key Lookup Phase](/content/tuning/multihead/multihead_use_case#multi-head-use-case-egress)

<a id="multi-head-use-case-setup" />

## Setup Phase

First, a database connection needs to be established.

```java Establish Database Connection theme={null}
GPUdbBase.Options options = new GPUdbBase.Options();
options.setUsername(user);
options.setPassword(pass);
this.db = new GPUdb(url, options);
```

If specified as a parameter, a [schema](/content/concepts/schemas) can be
used to contain the other database objects subsequently created; it will be
created if it doesn't exist.

```java Create/Use Optional Schema theme={null}
if (this.schemaName != null)
	if (!this.db.hasSchema(this.schemaName, null).getSchemaExists())
		this.db.createSchema(this.schemaName, null);
```

A [table](/content/concepts/tables), `order_history`, needs to be created
before ingestion can begin. It serves as the distributed ingestion target.

The table is [sharded](/content/concepts/tables#sharding) on `store_id`, so that the
subsequent aggregation on `store_id` by the materialized view can execute
quickly, without having to synchronize aggregation results across shards.

```java Create Order History Table theme={null}
String ddlOrderHistory = "" +
"   CREATE OR REPLACE TABLE " + this.tableNameHistory   +
"    (                                                " +
"        store_id INT NOT NULL,                       " +
"        id INT NOT NULL,                             " +
"        total_amount DOUBLE NOT NULL,                " +
"        timestamp TYPE_TIMESTAMP NOT NULL,           " +
"        PRIMARY KEY (id, store_id),                  " +
"        SHARD KEY (store_id)                         " +
"    );                                               ";
this.db.executeSql(new ExecuteSqlRequest().setStatement(ddlOrderHistory));
```

A [materialized view](/content/sql/ddl#sql-create-materialized-view),
`store_sales`, needs to be created before keyed lookups can begin.  It serves
as the *distributed key lookup* source.

The materialized view aggregates each store's total sales by date from the
`order_history` table, grouping on `store_id` and the date portion of the
order `timestamp`.  It also uses the `KI_HINT_GROUP_BY_PK` hint to make a
[primary key](/content/concepts/tables#primary-key) out of these two grouping columns.
Doing so automatically creates a
[primary key index](/content/concepts/indexes#primary-key-index) on the two columns, meeting
the [keyed lookup criteria](/content/tuning/multihead/multihead_egress#multi-head-egress-considerations) that
all columns involved in the lookup be indexed.

The refresh interval given to the materialized view is for reference only; in
this example, the view will be manually refreshed to allow results to be shown
immediately.

```java Create Store Sales Materialized View theme={null}
String ddlStoreSales = "" +
"    CREATE OR REPLACE MATERIALIZED VIEW " + this.viewNameSales    +
"    REFRESH EVERY 10 MINUTES AS                                 " +
"    SELECT  /* KI_HINT_GROUP_BY_PK */                           " +
"        store_id,                                               " +
"        DATE(timestamp) AS order_date,                          " +
"        SUM(total_amount) AS total_sales                        " +
"    FROM                                                        " +
"        " + this.tableNameHistory + "                           " +
"    GROUP BY                                                    " +
"        store_id,                                               " +
"        DATE(timestamp);                                        ";
this.db.executeSql(new ExecuteSqlRequest().setStatement(ddlStoreSales));
```

<a id="multi-head-use-case-ingest" />

## Ingest Phase

First, the `Type` schema of the ingest target table needs to be extracted,
for later use in `BulkInserter` construction and record insertion.

```java Get Table Schema theme={null}
Type orderType = Type.fromTable(this.db, this.tableNameHistory);
```

The `BulkInserter` is the *Java* object that performs distributed ingestion.
To create one, give it the database handle, table name & `Type` schema, record
ingest queue size, and any ingest options.

```java Configure & Create a BulkInserter theme={null}
// Configure bulk inserter options to store up per-record insertion errors
Map<String,String> options = GPUdbBase.options
(
		InsertRecordsRequest.Options.RETURN_INDIVIDUAL_ERRORS, InsertRecordsRequest.Options.TRUE,
		InsertRecordsRequest.Options.ALLOW_PARTIAL_BATCH, InsertRecordsRequest.Options.TRUE
);

// Construct a bulk inserter to perform the distributed ingest
try (BulkInserter<GenericRecord> bulkInserter =
		new BulkInserter<>(this.db, this.tableNameHistory, orderType, queueSize, options))
```

Once the `BulkInserter` has been created, `GenericRecord` objects of the
given `Type` are inserted into it.  The indexes used in assigning column
values to each `GenericRecord` are 0-based and match the column order of the
target table: `store_id`, `id`, `total_amount`, and then `timestamp`.

```java Insert Data theme={null}
GenericRecord order = new GenericRecord(this.type);
order.put(0, storeId);
order.put(1, orderId);
order.put(2, totalAmount);
order.put(3, timestamp);
this.bulkInserter.insert(order);
```

The `BulkInserter` automatically inserts records as each of its queues reaches
the configured `queueSize`.  Before ending the ingest session, the
`BulkInserter` is flushed, initiating inserts of any remaining queued records.

```java Flush Data to Server theme={null}
bulkInserter.flush();
```

<Tip>
  See the `ingest` method of
  [DistributedIOUseCase.java](/content/tuning/multihead/multihead_use_case#multi-head-use-case-download) for details.
</Tip>

<a id="multi-head-use-case-refresh" />

## Refresh Phase

Though the materialized view in this example (and in the associated use case) is
configured to periodically refresh, a manual refresh is done between ingest &
key lookup phases to avoid having to wait for the refresh cycle.

The manual refresh requires only a database connection in order to be initiated.

```java Refresh Materialized View theme={null}
this.db.alterTable(this.viewNameSales, AlterTableRequest.Action.REFRESH, null, null);
```

<Tip>
  See the `refresh` method of
  [DistributedIOUseCase.java](/content/tuning/multihead/multihead_use_case#multi-head-use-case-download) for details.
</Tip>

<a id="multi-head-use-case-egress" />

## Key Lookup Phase

The `RecordRetriever` is the *Java* object that performs
*distributed key lookup*.  Creating one requires steps similar to creating a
`BulkInserter`.

First, the database connection is used to extract the `Type` schema of the
lookup source.

```java Get View Schema theme={null}
Type storeTotalType = Type.fromTable(this.db, this.viewNameSales);
```

Then, the `RecordRetriever` is created with that `Type`.

```java Create a RecordRetriever theme={null}
RecordRetriever<GenericRecord> recordRetriever =
		new RecordRetriever<>(this.db, this.viewNameSales, storeTotalType);
```

*Distributed key lookup* is accomplished by specifying the key(s) to look up and
any filters that need to be applied.

The optional lookup filter is fixed to the current day's date.

```java Create Key Lookup Filter theme={null}
final String today = LocalDate.now().toString();
final String todayFilter = "order_date = '" + today + "'";
```

The lookup keys are established, one for each store number being looked up.
Each key value is matched against the shard key column, `store_id`, of the
source view, `store_sales`.  Since the view has only one column in its shard
key, there is only one entry in each key `List`.

```java Create Key Lookup Shard Value List theme={null}
List<Object> storeKeyset = GPUdbBase.list((Object)storeNum);
```

With each store number lookup key and the fixed date filter for today, the
`RecordRetriever` extracts the corresponding store sales total and outputs the
`store_id` & `total_sales`.  Similar to the assigning of column values with
the `GenericRecord` during ingest, the indexes used in extracting column
values from the result of the keyed lookup are 0-based and match the column
order of the source view.

```java Execute Key Lookup theme={null}
GetRecordsResponse<GenericRecord> grResp = recordRetriever.getByKey(storeKeyset, todayFilter);
for (GenericRecord storeTotal : grResp.getData())
	System.out.printf("%7d %13.2f%n", storeTotal.get(0), storeTotal.get(2));
```

<Tip>
  See the `lookup` method of
  [DistributedIOUseCase.java](/content/tuning/multihead/multihead_use_case#multi-head-use-case-download) for details.
</Tip>

<a id="multi-head-use-case-download" />

## Download & Run

Included below are the artifacts needed to run this example on an instance of
Kinetica.  *Maven* will be used to compile the *Java* program.

* [pom.xml](https://raw.githubusercontent.com/kineticadb/kinetica-docs/master/content/examples/java/multi_head/pom.xml) - for compiling the
  Java example program
* [DistributedIOUseCase.java](https://raw.githubusercontent.com/kineticadb/kinetica-docs/master/content/examples/java/multi_head/src/main/java/DistributedIOUseCase.java)
  * the Java example program itself

To run the example, compile the *Java* program & run.

1. Create a directory for the example Java project and move the example files
   into it as shown:

   ```bash title="Distributed Ingest & Key Lookup Project Layout" theme={null}
   DistributedIOUseCase/
      ├── pom.xml
      └── src/
         └── main/
               └── java/
                     └── DistributedIOUseCase.java
   ```

2. Compile the Java program in the directory with the <Badge color="gray">pom.xml</Badge>:

   ```bash title="Compile Distributed Ingest & Key Lookup Project" theme={null}
   cd DistributedIOUseCase
   mvn clean package
   ```

3. Run the example:

   ```bash title="Run Distributed Ingest & Key Lookup Project" theme={null}
   java -jar target/distributed-ops-1.0-jar-with-dependencies.jar run [<hostname> [<username> <password>]]
   ```

4. Verify the output consists of aggregated sales totals for each store for the
   current day and should be similar to the
   following:
   ```Distributed Ingest & Key Lookup Project Output theme={null}
   Summary for date: [2026-02-16]
   Store #  Total Sales
   ======= =============
         1   50183315.04
         2   49968365.50
         3   49985043.96
         4   49476362.15
         5   49844709.28
         6   50105162.61
         7   49979852.50
         8   50013925.24
         9   49556358.92
        10   50197639.68
   ```
