Version:

Ingestion

Overview

Since Kinetica is designed to work with big data sets, a mechanism, multi-head ingest, is provided to allow for fast data ingestion. Most Kinetica interactions are done through the single head node, and from there, parcelled out to the rest of the Kinetica cluster. However, multi-head ingest allows insertion of data through transactions with cluster nodes directly, bypassing the head node, and improving ingestion performance.

Architecture of an Ingest Process

To fully utilize multi-head ingest, the ingestion process should use a multinode parallel processing framework, such as MapReduce, Storm, or Spark. As data is divided up and flows through the processing nodes, a Kinetica BulkIngest object can be instantiated in each node to push data to the proper Kinetica cluster node. This greatly increases the speed of ingestion by both parallelizing the ingestion process and spreading the network traffic across multiple nodes.

In order for the Kinetica cluster nodes to receive data directly, the configuration on each node needs to be updated to allow the incoming HTTP requests that carry the data. The /opt/gpudb/core/etc/gpudb.conf file needs to have the following property set:

# Enable worker HTTP servers, each process runs its own server for direct ingest.
enable_worker_http_servers = true

All the functionality for this is encapsulated into the BulkInserter object. To do multi-head ingest, a WorkerList needs to be created for the BulkInserter to use, one entry for each node/process. This list can be autopopulated simply by passing a GPUdb object to the BulkInserter, which will then retrieve the list of available cluster nodes from Kinetica itself. Below is a Java API code snippet showing this:

GPUdb gpudb = new GPUdb("http://localhost:9191");
WorkerList workers = new WorkerList(gpudb);
bulkInserter = new BulkInserter<T>(gpudb, tableName, type, bulkThreshold, null, workers);

Note that in some cases, workers may be configured to use more than one IP address, not all of which may be accessible to the client; the worker list constructor uses the first IP returned by the server for each worker. In cases where workers may use more than one IP address and public URLs are not configured, you can use an ipRegex or ipPrefix to match the correct worker IP addresses:

BulkInserter.WorkerList(GPUdb gpudb, Pattern ipRegex)

or:

BulkInserter.WorkerList(GPUdb gpudb, String ipPrefix)

Example

The following is a Java API code block that demonstrates the use of the BulkInserter for ingesting data.

// Create a bulk inserter for batch data ingestion
BulkInserter<MyType> bulkInserter = new BulkInserter<MyType>(gpudb, tableName, type, batchSize, null);

// Generate data to be inserted into the table, automatically inserting
//   records whenever the batchSize limit is reached
for (int i = 0; i < numRecords; i++)
{
    MyType record = new MyType();
    record.put( 0, (i + 0.1) ); // col1
    record.put( 1, ("string " + String.valueOf( i ) ) ); // col2
    record.put( 2, "Group 1" );  // group_id
    bulkInserter.insert( record );
}

// To ensure all records are inserted, flush the bulk inserter object.
bulkInserter.flush();

Cautions

  • If using the Java API and MapReduce, there is a conflict between the version of Avro used by Kinetica and the one used by MapReduce. This conflict can be overcome by using the Maven shade plug-in with the relocation tag:

    <relocation>
      <pattern>org.apache.avro</pattern>
      <shadedPattern>org.shaded.apache.avro</shadedPattern>
    </relocation>