Since Kinetica is designed to work with big data sets, a mechanism, multi-head ingest, is provided to allow for fast data ingestion. Most Kinetica interactions are done through the single head node, and from there, parcelled out to the rest of the Kinetica cluster. However, multi-head ingest allows insertion of data through transactions with cluster nodes directly, bypassing the head node, and improving ingestion performance.
To fully utilize multi-head ingest, the ingestion process should use a
multinode parallel processing framework, such as MapReduce, Storm, or
Spark. As data is divided up and flows through the processing nodes, a
Kinetica BulkIngest
object can be instantiated in each node to push data
to the proper Kinetica cluster node. This greatly increases the speed of
ingestion by both parallelizing the ingestion process and spreading the network
traffic across multiple nodes.
All the functionality for this is encapsulated into the BulkInserter
object.
To do multi-head ingest, a WorkerList
needs to be created for the
BulkInserter
to use, one entry for each node/process. This list can be
autopopulated simply by passing a GPUdb
object to the BulkInserter
,
which will then retrieve the list of available cluster nodes from Kinetica
itself. Below is a code snippet showing this:
BulkInserter.WorkerList workers = new BulkInserter.WorkerList(gpudb);
bulkInserter = new BulkInserter<T>(gpudb, tableName, type, bulkThreshold, null, workers);
In order for the Kinetica cluster nodes to receive data directly, the
configuration on each node needs to be updated to allow the incoming HTTP
requests that carry the data. The /opt/gpudb/core/etc/gpudb.conf
file needs to
have the following property set:
# Enable worker HTTP servers, each process runs its own server for direct ingest.
enable_worker_http_servers = true
The following is a code block that demonstrates the use of the BulkInserter
for ingesting data.
// Create a bulk inserter for batch data ingestion
BulkInserter<MyType> bulkInserter = new BulkInserter<MyType>(gpudb, tableName, type, batchSize, null);
// Generate data to be inserted into the table, automatically inserting
// records whenever the batchSize limit is reached
for (int i = 0; i < numRecords; i++)
{
MyType record = new MyType();
record.put( 0, (i + 0.1) ); // col1
record.put( 1, ("string " + String.valueOf( i ) ) ); // col2
record.put( 2, "Group 1" ); // group_id
bulkInserter.insert( record );
}
// To ensure all records are inserted, flush the bulk inserter object.
bulkInserter.flush();
If the Kinetica nodes have multiple IP addresses on each server (for
example, internal and external IP addresses), use either of the following
constructors, specifying the IP addresses to use. Otherwise, the
BulkInserter
might try to communicate with Kinetica through IP
addresses it cannot reach:
BulkInserter.WorkerList(GPUdb gpudb, Pattern ipRegex)
BulkInserter.WorkerList(GPUdb gpudb, String ipPrefix)
There is a conflict between the version of Avro used by Kinetica and the one used by MapReduce. This conflict can be overcome by using the Maven shade plug-in with the relocation tag:
<relocation>
<pattern>org.apache.avro</pattern>
<shadedPattern>org.shaded.apache.avro</shadedPattern>
</relocation>