Multi-Head Ingest

Multi-Head Ingest (Distributed Ingest) is a mechanism that allows sharded data to be ingested directly into cluster nodes, bypassing the overhead of pushing the data through the head node to the cluster nodes.

Operationally, the ingest mechanism calculates the target shard of each record to insert, and sends batches of would-be co-located records to their respective target nodes.

To fully utilize multi-head ingest, the ingestion process should use a multi-node parallel processing framework, such as MapReduce, Storm, or Spark. As data is divided up and flows through the processing nodes, a Kinetica bulk ingest object can be instantiated in each node to push data to the proper Kinetica cluster node. This greatly increases the speed of ingestion by both parallelizing the ingestion process and spreading the network traffic across multiple nodes.

API Support

LanguageMulti-head Ingest Mechanism
C++GPUdbIngestor
C#KineticaIngestor
JavaBulkInserter
JavascriptX
Node.jsX
PythonGPUdbIngestor
RESTX
SQL<automatic>

Configuration

Server-Side

In order for the cluster nodes to receive data directly from an ingest client, the configuration on each node needs to be set to allow the incoming HTTP requests for that data. The /opt/gpudb/core/etc/gpudb.conf file needs to have the following property set for multi-head ingest to work properly:

# Enable worker HTTP servers...
enable_worker_http_servers = true

Each node can be configured with a public URL for the API to use to connect to it, in this section of the gpudb.conf:

# Optionally, specify a public URL for each worker HTTP server that clients
# should use to connect for multi-head operations.
#
# NOTE: If specified for any ranks, a public URL must be specified for all
# ranks. Cannot be used in conjunction with nplus1.

rank0.public_url = http://192.168.0.10:9191
rank1.public_url = http://192.168.0.10:9192
rank2.public_url = http://192.168.0.11:9193
rank3.public_url = http://192.168.0.11:9194

If a public_url is not defined, each node can be connected to on any of its available interfaces, taking HA and HTTPD configurations, as well as any general network security restrictions into account.

Client-Side

With the exception of the Python background multi-head process, the multi-head ingest object of each API requires a list of worker nodes to use to distribute the insert requests, with one entry in the list for each node/process. This list can be auto-populated simply by using a GPUdb connection object, which can retrieve the list of available cluster nodes from the database itself. Below is a code snippet showing an automatically populated worker list and subsequent creation of a multi-head ingest object with it:

Java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import com.gpudb.GPUdb;
import com.gpudb.WorkerList;
import com.gpudb.GenericRecord;
import com.gpudb.BulkInserter;

GPUdb db = new GPUdb("http://<db.host>:9191");
WorkerList workers = new WorkerList(db);

// Multi-head Ingest Object Instantiation
BulkInserter<GenericRecord> bulkInserter =
    new BulkInserter<>(db, tableName, recordType, batchSize, options, workers);
Python
1
2
3
4
5
6
7
8
from gpudb import GPUdb
from gpudb_multihead_io import GPUdbWorkerList, GPUdbIngestor

db = GPUdb(host=["http://<db.host>:9191"])
workers = GPUdbWorkerList(db)

# Multi-head Ingest Object Instantiation
bulk_ingestor = GPUdbIngestor( db, table_name, record_type, batch_size, options, workers )

Note that in cases where no public_url was configured on each server node, workers may have more than one IP address, not all of which may be accessible to the client. The API worker list constructor uses the first IP in the list returned by the server for each worker, by default. To override this behavior, a regular expression Pattern or prefix String can be used to match the correct worker IP addresses:

Java
1
2
3
4
//Match 172.X.Y.Z addresses
WorkerList(db, Pattern.compile("172\\..*"));
//or
WorkerList(db, "172.");
Python
1
GPUdbWorkerList(db, "172.")

Considerations

There are several factors to consider when using multi-head ingest:

  • Replicated tables are not supported
  • There is a small performance penalty for calculating the shard key of each record to be inserted into a sharded table.
  • There is an additional per-record performance penalty for primary key collision checks for any target table that has a primary key that is not the same as its shard key (the shard key columns are a proper subset of the primary key columns)
  • Randomly-sharded tables benefit more from multi-head ingest than sharded tables do, as there is no target shard key to calculate for each record and, therefore, no associated performance penalty for that calculation.
  • The batch size used to configure the bulk ingest object determines the record threshold for each of the insert queues targeting the worker nodes, not the record threshold for the bulk ingest object itself. Thus, ingestion into tables with non-uniform record distribution may require periodic flushes of the bulk ingest queues to ensure timely insertion of records in queues that infrequently reach their threshold.

Example

All the functionality for multi-head ingestion is encapsulated in the bulk ingest object. See API Support for chart listing the API-specific object to use for bulk ingestion.

The following is a Java API code block that demonstrates the use of the BulkInserter for ingesting data.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// Create a bulk inserter for batch data ingestion, using a
//   try-with-resources to invoke auto closing of the bulk inserter object.
try (
    BulkInserter<MyType> bulkInserter =
        new BulkInserter<MyType>(gpudb, tableName, type, batchSize, null, new WorkerList(gpudb))
)
{
    // Generate data to be inserted into the table, automatically inserting
    // records whenever the batchSize limit is reached
    for (int i = 0; i < numRecords; i++)
    {
        MyType record = new MyType();
        record.put( 0, (i + 0.1) ); // col1
        record.put( 1, ("string " + String.valueOf( i ) ) ); // col2
        record.put( 2, "Group 1" );  // group_id
        bulkInserter.insert( record );
    }

    // To ensure all records are inserted, flush the bulk inserter object.
    bulkInserter.flush();
}

Note

If the BulkInserter is not declared as the resource in a try-with-resources block, the close() method needs to be called explicitly after use to release its resources.

However, for a one-time use, as in this example, the flush() call can be removed, as the close() method will automatically call flush().

Python Background Multihead

In the Python API, the GPUdbTable constructor has a use_multihead_io parameter, which allows a GPUdbTable object to handle all GPUdbIngestor interactions with the associated table in the background:

1
ingest_table = gpudb.GPUdbTable(_type=ingest_columns, name="ingest_table", db=h_db, use_multihead_io=True)

Cautions

If using the Java API and MapReduce, there is a conflict between the version of Avro used by Kinetica and the one used by MapReduce. This conflict can be overcome by using the Maven shade plug-in with the relocation tag:

1
2
3
4
<relocation>
  <pattern>org.apache.avro</pattern>
  <shadedPattern>org.shaded.apache.avro</shadedPattern>
</relocation>