Version:

Ingestion

Overview

Since Kinetica is designed to work with big data sets, a mechanism, multi-head ingest, is provided to allow for fast data ingestion. Most Kinetica interactions are done through the single head node, and from there, parcelled out to the rest of the Kinetica cluster. However, multi-head ingest allows insertion of data through transactions with cluster nodes directly, bypassing the head node, and improving ingestion performance.

Architecture of an Ingest Process

To fully utilize multi-head ingest, the ingestion process should use a multinode parallel processing framework, such as MapReduce, Storm, or Spark. As data is divided up and flows through the processing nodes, a Kinetica BulkIngest object can be instantiated in each node to push data to the proper Kinetica cluster node. This greatly increases the speed of ingestion by both parallelizing the ingestion process and spreading the network traffic across multiple nodes.

All the functionality for this is encapsulated into the BulkInserter object. To do multi-head ingest, a WorkerList needs to be created for the BulkInserter to use, one entry for each node/process. This list can be autopopulated simply by passing a GPUdb object to the BulkInserter, which will then retrieve the list of available cluster nodes from Kinetica itself. Below is a code snippet showing this:

BulkInserter.WorkerList workers = new BulkInserter.WorkerList(gpudb);
bulkInserter = new BulkInserter<T>(gpudb, tableName, type, bulkThreshold, null, workers);

In order for the Kinetica cluster nodes to receive data directly, the configuration on each node needs to be updated to allow the incoming HTTP requests that carry the data. The /opt/gpudb/core/etc/gpudb.conf file needs to have the following property set:

# Enable worker HTTP servers, each process runs its own server for direct ingest.
enable_worker_http_servers = true

Example

The following is a code block that demonstrates the use of the BulkInserter for ingesting data.

// Create a bulk inserter for batch data ingestion
BulkInserter<MyType> bulkInserter = new BulkInserter<MyType>(gpudb, tableName, type, batchSize, null);

// Generate data to be inserted into the table, automatically inserting
//   records whenever the batchSize limit is reached
for (int i = 0; i < numRecords; i++)
{
    MyType record = new MyType();
    record.put( 0, (i + 0.1) ); // col1
    record.put( 1, ("string " + String.valueOf( i ) ) ); // col2
    record.put( 2, "Group 1" );  // group_id
    bulkInserter.insert( record );
}

// To ensure all records are inserted, flush the bulk inserter object.
bulkInserter.flush();

Cautions

  1. If the Kinetica nodes have multiple IP addresses on each server (for example, internal and external IP addresses), use either of the following constructors, specifying the IP addresses to use. Otherwise, the BulkInserter might try to communicate with Kinetica through IP addresses it cannot reach:

    BulkInserter.WorkerList(GPUdb gpudb, Pattern ipRegex)
    BulkInserter.WorkerList(GPUdb gpudb, String ipPrefix)
    
  2. There is a conflict between the version of Avro used by Kinetica and the one used by MapReduce. This conflict can be overcome by using the Maven shade plug-in with the relocation tag:

    <relocation>
      <pattern>org.apache.avro</pattern>
      <shadedPattern>org.shaded.apache.avro</shadedPattern>
    </relocation>