> ## Documentation Index
> Fetch the complete documentation index at: https://docs.kinetica.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Distributed Ingest

*Distributed Ingest* (*Multi-Head Ingest*) is a mechanism that allows
[sharded](/content/concepts/tables#sharding) data to be ingested directly into cluster
nodes, bypassing the overhead of pushing the data through the *head node* to the
cluster nodes.  This greatly increases the speed of ingest by spreading
the network traffic across multiple nodes.

Operationally, the ingest mechanism calculates the target
[shard key](/content/concepts/tables#shard-key) of each record to insert, and sends batches
of co-located records to their respective target nodes.

<a id="multi-head-ingest-api-support" />

## API Support

| Language   | Distributed Ingest Mechanism |
| ---------- | ---------------------------- |
| C++        | `GPUdbIngestor`              |
| C#         | `KineticaIngestor`           |
| Java       | `BulkInserter`               |
| Javascript | **X**                        |
| Node.js    | **X**                        |
| Python     | `GPUdbIngestor`              |
| REST       | **X**                        |
| SQL        | *\<automatic>*               |

<a id="multi-head-ingest-configuration" />

## Configuration

Distributed operations are enabled through configuration:

* Distributed operations needs to be enabled on the server (default)
* Client-accessible URLs need to be set for each node in the cluster

### Server-Side

In order for the cluster nodes to receive data directly from an ingest client,
the configuration on each node needs to be set to allow the incoming HTTP
requests for that data.  The `/opt/gpudb/core/etc/gpudb.conf` file
needs to have the following property set for *distributed ingest* to work
properly:

```
# Enable worker HTTP servers...
enable_worker_http_servers = true
```

Each node can be configured with a public URL for the API to use to connect to
it, in this section of the `gpudb.conf`:

```
# Optionally, specify a public URL for each worker HTTP server that clients
# should use to connect for multi-head operations.
#
# NOTE: If specified for any ranks, a public URL must be specified for all
# ranks. Cannot be used in conjunction with nplus1.

rank0.public_url = http://192.168.0.10:9191
rank1.public_url = http://192.168.0.10:9192
rank2.public_url = http://192.168.0.11:9193
rank3.public_url = http://192.168.0.11:9194
```

If a `public_url` is not defined, each node can be connected to on any of its
available interfaces, taking [HA](/content/ha) and
[HTTPD](/content/security/sec_configuration) configurations, as well as any
general network security restrictions into account.

### Client-Side

The list of URLs for connecting to each worker node is automatically created
when using the following:

* Java API default `BulkInserter`
* Python API `GPUdbTable` configured for background *distributed* operations

<CodeGroup>
  ```java Java Default BulkInserter theme={null}
  import com.gpudb.GPUdb;
  import com.gpudb.BulkInserter;
  import com.gpudb.GenericRecord;

  GPUdb db = new GPUdb("http://<db.host>:9191", new GPUdb.Options());

  // Distributed Ingest Object Instantiation
  BulkInserter<GenericRecord> bulkInserter =
  	new BulkInserter<>(db, tableName, Type.fromTable(db, tableName), batchSize, options);
  ```

  ```python Python GPUdbTable theme={null}
  from gpudb import GPUdb, GPUdbTable

  db = GPUdb("http://<db.host>:9191", username = "<user>", password = "<pass>")

  t = GPUdbTable(name = table_name, db = db, use_multihead_io = True)
  ```
</CodeGroup>

The list will need to be manually configured in all other cases by using the
connection object to retrieve the list of available cluster nodes from the
database itself. Below is a code snippet showing an automatically populated
worker list and subsequent creation of a *distributed ingest* object with it:

<CodeGroup>
  ```java Java theme={null}
  import com.gpudb.GPUdb;
  import com.gpudb.BulkInserter;
  import com.gpudb.GenericRecord;
  import com.gpudb.WorkerList;

  GPUdb db = new GPUdb("http://<db.host>:9191", new GPUdb.Options());
  WorkerList workers = new WorkerList(db);

  // Distributed Ingest Object Instantiation
  BulkInserter<GenericRecord> bulkInserter =
  	new BulkInserter<>(db, tableName, Type.fromTable(db, tableName), batchSize, options, workers);
  ```

  ```python Python theme={null}
  from gpudb import GPUdb, GPUdbTable
  from gpudb_multihead_io import GPUdbWorkerList, GPUdbIngestor

  db = GPUdb("http://<db.host>:9191", username = "<user>", password = "<pass>")
  workers = GPUdbWorkerList(db)

  # Distributed Ingest Object Instantiation
  table_type = GPUdbTable(name = table_name, db = db).get_table_type()
  bulk_ingestor = GPUdbIngestor( db, table_name, table_type, batch_size, options, workers )
  ```
</CodeGroup>

Note that in cases where no `public_url` was configured on each server node,
workers may have more than one IP address, not all of which may be accessible to
the client.  The API worker list constructor uses the first IP in the list
returned by the server for each worker, by default.  To override this behavior,
a regular expression `Pattern` or prefix `String` can be used to match the
correct worker IP addresses:

<CodeGroup>
  ```java Java theme={null}
  // Match 172.X.Y.Z addresses
  WorkerList(db, Pattern.compile("172\\..*"));
  // or
  WorkerList(db, "172.");
  ```

  ```python Python theme={null}
  GPUdbWorkerList(db, "172.")
  ```
</CodeGroup>

<a id="multi-head-ingest-considerations" />

## Considerations

There are several factors to consider when using *distributed ingest*:

* There is a small performance penalty for calculating the
  [shard key](/content/concepts/tables#shard-key) of each record to be inserted into a
  [sharded](/content/concepts/tables#sharding) table.
* There is an additional per-record performance penalty for
  [primary key](/content/concepts/tables#primary-key) collision checks for any target table that
  has a *primary key* that is not the same as its *shard key* (the *shard key*
  columns are a *proper subset* of the *primary key* columns)
* [Randomly-sharded](/content/concepts/tables#random-sharding) tables benefit more from
  *distributed ingest* than *sharded* tables do, as there is no target
  *shard key* to calculate for each record and, therefore, no associated
  performance penalty for that calculation.
* The *batch size* used to configure the bulk ingest object determines the
  record threshold for each of the insert queues targeting the worker nodes, not
  the record threshold for the bulk ingest object itself.  Thus, ingestion into
  tables with non-uniform record distribution may require periodic flushes of
  the bulk ingest queues to ensure timely insertion of records in queues that
  infrequently reach their threshold.

## Example

All the functionality for *distributed ingestion* is encapsulated in the bulk
ingest object.  See [API Support](/content/tuning/multihead/multihead_ingest#multi-head-ingest-api-support)
for chart listing the API-specific object to use for bulk ingestion.

The following is a *Java* API code block that demonstrates the use of the
`BulkInserter` for ingesting data.

```java theme={null}
// Create a bulk inserter for batch data ingestion, using a
//   try-with-resources to invoke auto closing of the bulk inserter object.
try (
    BulkInserter<MyType> bulkInserter =
        new BulkInserter<MyType>(gpudb, tableName, type, batchSize)
)
{
    // Generate data to be inserted into the table, automatically inserting
    // records whenever the batchSize limit is reached
    for (int i = 0; i < numRecords; i++)
    {
        MyType record = new MyType();
        record.put( 0, (i + 0.1) ); // col1
        record.put( 1, ("string " + String.valueOf( i ) ) ); // col2
        record.put( 2, "Group 1" );  // group_id
        bulkInserter.insert( record );
    }

    // To ensure all records are inserted, flush the bulk inserter object.
    bulkInserter.flush();
}
```

<Info>
  If the `BulkInserter` is not declared as the resource in a
  *try-with-resources* block, the `close()` method needs to be called
  explicitly after use to release its resources.

  However, for a one-time use, as in this example, the `flush()` call can be
  removed, as the `close()` method will automatically call `flush()`.
</Info>

<a id="multi-head-ingest-java" />

## Java Distributed Ingest

There are several options for *distributed ingest* using the Java
`BulkInserter`.

<a id="multi-head-ingest-java-modes" />

### Ingest Modes

While all ingest schemes using the `BulkInserter` provide automatic batching
of records and reduction in network traffic to, and processing load on, the head
node, many also provide the benefit of parallelizing the batch inserts.

| Ingest Scheme      | Parallelism of Inserts                                                                                                                                                                              |
| ------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| By Record          | None; each given record is queued and any full queue is inserted serially                                                                                                                           |
| By List            | All records in the given record list are queued and then inserted in parallel                                                                                                                       |
| Manual Flush       | All queued records are inserted in parallel, at the user's direction                                                                                                                                |
| Timed Flush        | All queued records are inserted in parallel, at a user-specified interval                                                                                                                           |
| External Threading | Multiple threads use a single `BulkInserter` to insert records in parallel; note that the `BulkInserter` itself can be configured to use any of the other ingest schemes for additional parallelism |

<CodeGroup>
  ```java By Record theme={null}
  // Acquire a type for the target table
  final Type recordType = Type.fromTable(kdb, tableName);

  // Construct a bulk inserter to perform the distributed ingest
  try (BulkInserter<GenericRecord> bulkInserter = new BulkInserter<>(kdb, tableName, recordType, batchSize))
  {
  	// Insert records one at a time; worker queues should fill up and
  	// automatically flush, serially.
  	for (GenericRecord record : records)
  		bulkInserter.insert(record);

  	// Insert any remaining records by flushing the BulkInserter object
  	bulkInserter.flush();
  }
  ```

  ```java By List theme={null}
  // Acquire a type for the target table
  final Type recordType = Type.fromTable(kdb, tableName);

  // Construct a bulk inserter to perform the distributed ingest
  try (BulkInserter<GenericRecord> bulkInserter = new BulkInserter<>(kdb, tableName, recordType, batchSize))
  {
  	// Insert records; any worker queues that are full after this list
  	// is processed will be inserted in parallel
  	bulkInserter.insert(records);

  	// Insert any remaining records by flushing the BulkInserter object
  	bulkInserter.flush();
  }
  ```

  ```java Manual Flush theme={null}
  // Acquire a type for the target table
  final Type recordType = Type.fromTable(kdb, tableName);

  // Construct a bulk inserter to perform the distributed ingest
  try (BulkInserter<GenericRecord> bulkInserter = new BulkInserter<>(kdb, tableName, recordType, batchSize))
  {
  	int recordNum = 0;

  	// Insert records; worker queues should fill up, but never reach the
  	// batch size and automatically flush.  With more than one queue and
  	// an even distribution of records, by the time a batch-size number
  	// of records has been processed, there should only be batchSize/n
  	// records per queue, at which point a manual flush will be invoked.
  	for (GenericRecord record : records)
  	{
  		bulkInserter.insert(record);

  		// Insert records in parallel by flushing the BulkInserter object
  		// every batch-size number of records
  		if (++recordNum % batchSize == 0)
  			bulkInserter.flush();
  	}
  }
  ```

  ```java Timed Flush theme={null}
  // Acquire a type for the target table
  final Type recordType = Type.fromTable(kdb, tableName);

  // Create timed flush options:  all queues, regardless of fullness, every second
  BulkInserter.FlushOptions flushOptions = new BulkInserter.FlushOptions(false, 1);

  // Construct a bulk inserter to perform the distributed ingest
  try (BulkInserter<GenericRecord> bulkInserter =
  		new BulkInserter<>(kdb, tableName, recordType, batchSize, null, null, flushOptions))
  {
  	// Insert records; no worker queues should fill up and automatically
  	// flush, as the batch size equals the total record count.  This
  	// will allow the timed flush to insert queued records every second.
  	for (GenericRecord record : records)
  		bulkInserter.insert(record);

  	// Insert any remaining records by flushing the BulkInserter object
  	bulkInserter.flush();
  }
  ```

  ```java External Threading (Scheme) theme={null}
  // Acquire a type for the target table
  final Type recordType = Type.fromTable(kdb, tableName);

  // Construct a bulk inserter to perform the distributed ingest
  try (BulkInserter<GenericRecord> bulkInserter = new BulkInserter<>(kdb, tableName, recordType, batchSize))
  {
  	ExecutorService executorService = null;

  	try
  	{
  		// Create a thread pool for inserting data
  		executorService = Executors.newFixedThreadPool(totalThreads);

  		final int recordsPerThread = records.size() / totalThreads;

  		for (int startIndex = 0; startIndex < records.size(); startIndex+=recordsPerThread)
  		{
  			// Pass the shared BulkInserter & record list into each task
  			// with an offset for each to start inserting from
  			executorService.execute
  			(
  					new BatchInsert(bulkInserter, records, startIndex, recordsPerThread)
  			);
  		}
  	}
  	finally
  	{
  		// Shut down thread pool
  		if (executorService != null)
  		{
  			executorService.shutdown();
  			try
  			{
  				if (!executorService.awaitTermination(30, TimeUnit.SECONDS))
  					executorService.shutdownNow();
  			}
  			catch (@SuppressWarnings("unused") InterruptedException e)
  			{
  				executorService.shutdownNow();
  			}
  		}
  	}

  	// Insert any remaining records by flushing the BulkInserter object
  	bulkInserter.flush();
  }
  ```

  ```java External Threading (Helper) theme={null}
  /**
   * Helper class for parallelizing usage of the bulk ingest object.
   *
   * This class will generate a batch of inserts of a given size, starting at
   * a given index, and then add them to a given BulkInserter
   */
  private static class BatchInsert implements Runnable
  {
  	/* Bulk ingestion object to use for inserting records*/
  	BulkInserter<GenericRecord> bulkInserter;
  	/* Full set of records being inserted */
  	List<GenericRecord> records;
  	/* ID/index of first record to insert within the full set */
  	int startIndex;
  	/* ID/index of last record to insert within the full set */
  	int endIndex;

  	/*
  	 * Creates a new batch insert object, which will generate objects of the
  	 * specified type and add them to the specified bulk ingest object
  	 *
  	 * @param bulkInserter bulk ingest object to which records will be added
  	 * @param type database type schema to use for objects being inserted
  	 * @param startIndex starting index of records being inserted; will be
  	 *        used as order ID for inserted records
  	 * @param batchSize number of records to insert in this batch
  	 */
  	public BatchInsert(BulkInserter<GenericRecord> bulkInserter, List<GenericRecord> records, int startIndex, int recordCount)
  	{
  		this.bulkInserter = bulkInserter;
  		this.records = records;
  		this.startIndex = startIndex;
  		this.endIndex = startIndex + recordCount;
  	}

  	public void run()
  	{
  		try
  		{
  			for (int recIndex = this.startIndex; recIndex < this.endIndex; recIndex++)
  			{
  				this.bulkInserter.insert(this.records.get(recIndex));
  			}
  		}
  		catch (Exception e)
  		{
  			System.out.println("Error inserting record: " + e);
  		}
  	}
  }
  ```
</CodeGroup>

<a id="multi-head-ingest-java-objects" />

### Ingest Object Types

The `BulkInserter` supports three types of record objects, each offering a
different balance of type safety, flexibility, and schema migration support.

| Object Type     | Description                                                                                                                                                                                                                                                            |
| --------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Annotated Class | Uses a `RecordObject`-annotated Java class whose instances serve as both the schema definition and data container; offers compile-time type safety but cannot adapt to schema migrations that add or remove columns, as the class definition is fixed at compile time. |
| `GenericRecord` | Uses a dynamic record constructed from a `Type` retrieved from an existing table via `Type.fromTable()`; field values are set by column index or name, and a new `Type` can be acquired after schema changes to accommodate structural changes like column additions.  |
| JSON `String`   | Records are represented as JSON strings; the `BulkInserter` is constructed without a `Type`, making this the most flexible option--it naturally accommodates schema changes without updating the record format or re-acquiring the table type.                         |

<CodeGroup>
  ```java Annotated Class (Ingest) theme={null}
  // Construct a bulk inserter from the class type to perform the distributed ingest
  try (BulkInserter<Product> bulkInserter =
  		new BulkInserter<>(kdb, tableName, RecordObject.getType(Product.class), batchSize))
  {
  	// Insert populated instances of the class
  	for (Product product : products)
  		bulkInserter.insert(product);

  	// Insert any remaining records by flushing the BulkInserter object
  	bulkInserter.flush();
  }
  ```

  ```java Annotated Class (Definition) theme={null}
  /**
   * Create a class whose members map, by name and type, to the target
   * table it represents.  Annotate each column with its ordering in the
   * target table and any column properties.
   */
  public static class Product extends RecordObject
  {
  	@RecordObject.Column(order = 0, properties = { "primary_key" })
  	public Integer id;
  	@RecordObject.Column(order = 1, properties = { "char64" })
  	public String name;
  	@RecordObject.Column(order = 2)
  	public String description;
  	@RecordObject.Column(order = 3)
  	public Long ts;

  	public Product() {}

  	/* Create a constructor for the class that will take parameters so that
  	 * assigning values is easier */
  	public Product(Integer id, String name, String description)
  	{
  		this.id = id;
  		this.name = name;
  		this.description = description;
  		this.ts = Long.MIN_VALUE;  // Invalid timestamp to trigger INIT_WITH_NOW
  	}
  }
  ```

  ```java GenericRecord theme={null}
  // Acquire a type for the order history table
  final Type recordType = Type.fromTable(kdb, tableName);

  // Construct a bulk inserter from the table type to perform the distributed ingest
  try (BulkInserter<GenericRecord> bulkInserter =
  		new BulkInserter<>(kdb, tableName, recordType, batchSize))
  {
  	for (Map<String, Object> product : products)
  	{
  		// Create record from the table type and populate
  		GenericRecord record = new GenericRecord(recordType);
  		record.put(0, product.get("id"));
  		record.put(1, product.get("name"));
  		record.put(2, product.get("description"));
  		record.put(3, Long.MIN_VALUE);  // Invalid timestamp to trigger INIT_WITH_NOW
  		bulkInserter.insert(record);
  	}

  	// Insert any remaining records by flushing the BulkInserter object
  	bulkInserter.flush();
  }
  ```

  ```java JSON theme={null}
  // Construct a bulk inserter with no type specified to perform JSON distributed ingest
  try (BulkInserter<String> bulkInserter = new BulkInserter<>(kdb, tableName, batchSize))
  {
  	// Insert records as JSON strings
  	for (String product : products)
  		bulkInserter.insert(product);

  	// Insert any remaining records by flushing the BulkInserter object
  	bulkInserter.flush();
  }
  ```
</CodeGroup>

<a id="multi-head-ingest-python" />

## Python Distributed Ingest

There are two options for *distributed ingest* using Python.

Both objects provide automatic batching of records and reduction in network
traffic to, and processing load on, the head node; though, neither offers the
benefit of any inherent parallelism.

| Ingest Object   | Description                                                                                                                                            |
| --------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `GPUdbTable`    | When constructed with the `use_multihead_io` options set to `True`, a `GPUdbTable` object automatically uses a `GPUdbIngestor` object for inserts      |
| `GPUdbIngestor` | Handles *distributed ingest*, and may be faster than using a `GPUdbTable` object as there will be less overhead in the direct use of a `GPUdbIngestor` |

Note that the list insert functions are effectively wrappers for the single
insert functions in the examples below; they serve as a convenience and do not
provide any inherent performance benefit.

<CodeGroup>
  ```python GPUdbTable Single Insert theme={null}
  # Get a handle to the ingest target table
  t = gpudb.GPUdbTable(name = table_name, db = kinetica, use_multihead_io = True)

  for record in records:
      t.insert_records(record)

  t.flush_data_to_server()
  ```

  ```python GPUdbTable List Insert theme={null}
  # Get a handle to the ingest target table
  t = gpudb.GPUdbTable(name = table_name, db = kinetica, use_multihead_io = True)

  t.insert_records(records)

  t.flush_data_to_server()
  ```

  ```python GPUdbIngestor Single Insert theme={null}
  # Create a GPUdbIngestor to do the multi-head ingest, using a table object
  # to get the type of record being ingested
  table_type = gpudb.GPUdbTable(name = table_name, db = kinetica).get_table_type()
  gi = gpudb.GPUdbIngestor(kinetica, table_name, table_type, BATCH_SIZE)

  for record in records:
      gi.insert_record(record)

  gi.flush()
  ```

  ```python GPUdbIngestor List Insert theme={null}
  # Create a GPUdbIngestor to do the distributed ingest, using a table object
  # to get the type of record being ingested
  table_type = gpudb.GPUdbTable(name = table_name, db = kinetica).get_table_type()
  gi = gpudb.GPUdbIngestor(kinetica, table_name, table_type, BATCH_SIZE)

  gi.insert_records(records)

  gi.flush()
  ```
</CodeGroup>

## Cautions

If using the Java API and *MapReduce*, there is a conflict between the version
of *Avro* used by *Kinetica* and the one used by *MapReduce*.  This conflict
can be overcome by using the *Maven* shade plug-in with the `relocation` tag:

```xml theme={null}
<relocation>
  <pattern>org.apache.avro</pattern>
  <shadedPattern>org.shaded.apache.avro</shadedPattern>
</relocation>
```
