Distributed Ingest (Multi-Head Ingest) is a mechanism that allows
sharded data to be ingested directly into cluster
nodes, bypassing the overhead of pushing the data through the head node to the
cluster nodes. This greatly increases the speed of ingest by spreading
the network traffic across multiple nodes.
Operationally, the ingest mechanism calculates the target
shard key of each record to insert, and sends batches
of co-located records to their respective target nodes.
API Support
Language
Distributed Ingest Mechanism
C++
GPUdbIngestor
C#
KineticaIngestor
Java
BulkInserter
Javascript
X
Node.js
X
Python
GPUdbIngestor
REST
X
SQL
<automatic>
Configuration
Distributed operations are enabled through configuration:
Distributed operations needs to be enabled on the server (default)
Client-accessible URLs need to be set for each node in the cluster
Server-Side
In order for the cluster nodes to receive data directly from an ingest client,
the configuration on each node needs to be set to allow the incoming HTTP
requests for that data. The /opt/gpudb/core/etc/gpudb.conf file
needs to have the following property set for distributed ingest to work
properly:
Each node can be configured with a public URL for the API to use to connect to
it, in this section of the gpudb.conf:
# Optionally, specify a public URL for each worker HTTP server that clients
# should use to connect for multi-head operations.
#
# NOTE: If specified for any ranks, a public URL must be specified for all
# ranks. Cannot be used in conjunction with nplus1.
rank0.public_url = http://192.168.0.10:9191
rank1.public_url = http://192.168.0.10:9192
rank2.public_url = http://192.168.0.11:9193
rank3.public_url = http://192.168.0.11:9194
If a public_url is not defined, each node can be connected to on any of its
available interfaces, taking HA and
HTTPD configurations, as well as any
general network security restrictions into account.
Client-Side
The list of URLs for connecting to each worker node is automatically created
when using the following:
Java API default BulkInserter
Python API GPUdbTable configured for background distributed operations
The list will need to be manually configured in all other cases by using the
connection object to retrieve the list of available cluster nodes from the
database itself. Below is a code snippet showing an automatically populated
worker list and subsequent creation of a distributed ingest object with it:
Note that in cases where no public_url was configured on each server node,
workers may have more than one IP address, not all of which may be accessible to
the client. The API worker list constructor uses the first IP in the list
returned by the server for each worker, by default. To override this behavior,
a regular expression Pattern or prefix String can be used to match the
correct worker IP addresses:
// Match 172.X.Y.Z addressesWorkerList(db,Pattern.compile("172\\..*"));// orWorkerList(db,"172.");
Python
1
GPUdbWorkerList(db,"172.")
Considerations
There are several factors to consider when using distributed ingest:
There is a small performance penalty for calculating the
shard key of each record to be inserted into a
sharded table.
There is an additional per-record performance penalty for
primary key collision checks for any target table that
has a primary key that is not the same as its shard key (the shard key
columns are a proper subset of the primary key columns)
Randomly-sharded tables benefit more from
distributed ingest than sharded tables do, as there is no target
shard key to calculate for each record and, therefore, no associated
performance penalty for that calculation.
The batch size used to configure the bulk ingest object determines the
record threshold for each of the insert queues targeting the worker nodes, not
the record threshold for the bulk ingest object itself. Thus, ingestion into
tables with non-uniform record distribution may require periodic flushes of
the bulk ingest queues to ensure timely insertion of records in queues that
infrequently reach their threshold.
Example
All the functionality for distributed ingestion is encapsulated in the bulk
ingest object. See API Support
for chart listing the API-specific object to use for bulk ingestion.
The following is a Java API code block that demonstrates the use of the
BulkInserter for ingesting data.
// Create a bulk inserter for batch data ingestion, using a// try-with-resources to invoke auto closing of the bulk inserter object.try(BulkInserter<MyType>bulkInserter=newBulkInserter<MyType>(gpudb,tableName,type,batchSize)){// Generate data to be inserted into the table, automatically inserting// records whenever the batchSize limit is reachedfor(inti=0;i<numRecords;i++){MyTyperecord=newMyType();record.put(0,(i+0.1));// col1record.put(1,("string "+String.valueOf(i)));// col2record.put(2,"Group 1");// group_idbulkInserter.insert(record);}// To ensure all records are inserted, flush the bulk inserter object.bulkInserter.flush();}
Note
If the BulkInserter is not declared as the resource in a
try-with-resources block, the close() method needs to be called
explicitly after use to release its resources.
However, for a one-time use, as in this example, the flush() call can be
removed, as the close() method will automatically call flush().
Java Distributed Ingest
There are several options for distributed ingest using the Java
BulkInserter.
Ingest Modes
While all ingest schemes using the BulkInserter provide automatic batching
of records and reduction in network traffic to, and processing load on, the head
node, many also provide the benefit of parallelizing the batch inserts.
Ingest Scheme
Parallelism of Inserts
By Record
None; each given record is queued and any full queue is inserted serially
By List
All records in the given record list are queued and then inserted in parallel
Manual Flush
All queued records are inserted in parallel, at the user's direction
Timed Flush
All queued records are inserted in parallel, at a user-specified interval
External Threading
Multiple threads use a single BulkInserter to insert records in parallel;
note that the BulkInserter itself can be configured to use any of the other
ingest schemes for additional parallelism
// Acquire a type for the target tablefinalTyperecordType=Type.fromTable(kdb,tableName);// Construct a bulk inserter to perform the distributed ingesttry(BulkInserter<GenericRecord>bulkInserter=newBulkInserter<>(kdb,tableName,recordType,batchSize)){// Insert records one at a time; worker queues should fill up and// automatically flush, serially.for(GenericRecordrecord:records)bulkInserter.insert(record);// Insert any remaining records by flushing the BulkInserter objectbulkInserter.flush();}
By List
1
2
3
4
5
6
7
8
9
10
11
12
13
// Acquire a type for the target tablefinalTyperecordType=Type.fromTable(kdb,tableName);// Construct a bulk inserter to perform the distributed ingesttry(BulkInserter<GenericRecord>bulkInserter=newBulkInserter<>(kdb,tableName,recordType,batchSize)){// Insert records; any worker queues that are full after this list// is processed will be inserted in parallelbulkInserter.insert(records);// Insert any remaining records by flushing the BulkInserter objectbulkInserter.flush();}
// Acquire a type for the target tablefinalTyperecordType=Type.fromTable(kdb,tableName);// Construct a bulk inserter to perform the distributed ingesttry(BulkInserter<GenericRecord>bulkInserter=newBulkInserter<>(kdb,tableName,recordType,batchSize)){intrecordNum=0;// Insert records; worker queues should fill up, but never reach the// batch size and automatically flush. With more than one queue and// an even distribution of records, by the time a batch-size number// of records has been processed, there should only be batchSize/n// records per queue, at which point a manual flush will be invoked.for(GenericRecordrecord:records){bulkInserter.insert(record);// Insert records in parallel by flushing the BulkInserter object// every batch-size number of recordsif(++recordNum%batchSize==0)bulkInserter.flush();}}
Timed Flush
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Acquire a type for the target tablefinalTyperecordType=Type.fromTable(kdb,tableName);// Create timed flush options: all queues, regardless of fullness, every secondBulkInserter.FlushOptionsflushOptions=newBulkInserter.FlushOptions(false,1);// Construct a bulk inserter to perform the distributed ingesttry(BulkInserter<GenericRecord>bulkInserter=newBulkInserter<>(kdb,tableName,recordType,batchSize,null,null,flushOptions)){// Insert records; no worker queues should fill up and automatically// flush, as the batch size equals the total record count. This// will allow the timed flush to insert queued records every second.for(GenericRecordrecord:records)bulkInserter.insert(record);// Insert any remaining records by flushing the BulkInserter objectbulkInserter.flush();}
// Acquire a type for the target tablefinalTyperecordType=Type.fromTable(kdb,tableName);// Construct a bulk inserter to perform the distributed ingesttry(BulkInserter<GenericRecord>bulkInserter=newBulkInserter<>(kdb,tableName,recordType,batchSize)){ExecutorServiceexecutorService=null;try{// Create a thread pool for inserting dataexecutorService=Executors.newFixedThreadPool(totalThreads);finalintrecordsPerThread=records.size()/totalThreads;for(intstartIndex=0;startIndex<records.size();startIndex+=recordsPerThread){// Pass the shared BulkInserter & record list into each task// with an offset for each to start inserting fromexecutorService.execute(newBatchInsert(bulkInserter,records,startIndex,recordsPerThread));}}finally{// Shut down thread poolif(executorService!=null){executorService.shutdown();try{if(!executorService.awaitTermination(30,TimeUnit.SECONDS))executorService.shutdownNow();}catch(@SuppressWarnings("unused")InterruptedExceptione){executorService.shutdownNow();}}}// Insert any remaining records by flushing the BulkInserter objectbulkInserter.flush();}
/**
* Helper class for parallelizing usage of the bulk ingest object.
*
* This class will generate a batch of inserts of a given size, starting at
* a given index, and then add them to a given BulkInserter
*/privatestaticclassBatchInsertimplementsRunnable{/* Bulk ingestion object to use for inserting records*/BulkInserter<GenericRecord>bulkInserter;/* Full set of records being inserted */List<GenericRecord>records;/* ID/index of first record to insert within the full set */intstartIndex;/* ID/index of last record to insert within the full set */intendIndex;/*
* Creates a new batch insert object, which will generate objects of the
* specified type and add them to the specified bulk ingest object
*
* @param bulkInserter bulk ingest object to which records will be added
* @param type database type schema to use for objects being inserted
* @param startIndex starting index of records being inserted; will be
* used as order ID for inserted records
* @param batchSize number of records to insert in this batch
*/publicBatchInsert(BulkInserter<GenericRecord>bulkInserter,List<GenericRecord>records,intstartIndex,intrecordCount){this.bulkInserter=bulkInserter;this.records=records;this.startIndex=startIndex;this.endIndex=startIndex+recordCount;}publicvoidrun(){try{for(intrecIndex=this.startIndex;recIndex<this.endIndex;recIndex++){this.bulkInserter.insert(this.records.get(recIndex));}}catch(Exceptione){System.out.println("Error inserting record: "+e);}}}
Ingest Object Types
The BulkInserter supports three types of record objects, each offering a
different balance of type safety, flexibility, and schema migration support.
Object Type
Description
Annotated Class
Uses a RecordObject-annotated Java class whose instances serve as both the
schema definition and data container; offers compile-time type safety but cannot
adapt to schema migrations that add or remove columns, as the class definition is
fixed at compile time.
GenericRecord
Uses a dynamic record constructed from a Type retrieved from an existing
table via Type.fromTable(); field values are set by column index or name, and
a new Type can be acquired after schema changes to accommodate structural
changes like column additions.
JSON String
Records are represented as JSON strings; the BulkInserter is constructed
without a Type, making this the most flexible option--it naturally
accommodates schema changes without updating the record format or
re-acquiring the table type.
// Construct a bulk inserter from the class type to perform the distributed ingesttry(BulkInserter<Product>bulkInserter=newBulkInserter<>(kdb,tableName,RecordObject.getType(Product.class),batchSize)){// Insert populated instances of the classfor(Productproduct:products)bulkInserter.insert(product);// Insert any remaining records by flushing the BulkInserter objectbulkInserter.flush();}
/**
* Create a class whose members map, by name and type, to the target
* table it represents. Annotate each column with its ordering in the
* target table and any column properties.
*/publicstaticclassProductextendsRecordObject{@RecordObject.Column(order=0,properties={"primary_key"})publicIntegerid;@RecordObject.Column(order=1,properties={"char64"})publicStringname;@RecordObject.Column(order=2)publicStringdescription;@RecordObject.Column(order=3)publicLongts;publicProduct(){}/* Create a constructor for the class that will take parameters so that
* assigning values is easier */publicProduct(Integerid,Stringname,Stringdescription){this.id=id;this.name=name;this.description=description;this.ts=Long.MIN_VALUE;// Invalid timestamp to trigger INIT_WITH_NOW}}
// Acquire a type for the order history tablefinalTyperecordType=Type.fromTable(kdb,tableName);// Construct a bulk inserter from the table type to perform the distributed ingesttry(BulkInserter<GenericRecord>bulkInserter=newBulkInserter<>(kdb,tableName,recordType,batchSize)){for(Map<String,Object>product:products){// Create record from the table type and populateGenericRecordrecord=newGenericRecord(recordType);record.put(0,product.get("id"));record.put(1,product.get("name"));record.put(2,product.get("description"));record.put(3,Long.MIN_VALUE);// Invalid timestamp to trigger INIT_WITH_NOWbulkInserter.insert(record);}// Insert any remaining records by flushing the BulkInserter objectbulkInserter.flush();}
JSON
1
2
3
4
5
6
7
8
9
10
// Construct a bulk inserter with no type specified to perform JSON distributed ingesttry(BulkInserter<String>bulkInserter=newBulkInserter<>(kdb,tableName,batchSize)){// Insert records as JSON stringsfor(Stringproduct:products)bulkInserter.insert(product);// Insert any remaining records by flushing the BulkInserter objectbulkInserter.flush();}
Python Distributed Ingest
There are two options for distributed ingest using Python.
Both objects provide automatic batching of records and reduction in network
traffic to, and processing load on, the head node; though, neither offers the
benefit of any inherent parallelism.
Ingest Object
Description
GPUdbTable
When constructed with the use_multihead_io options set to True, a
GPUdbTable object automatically uses a GPUdbIngestor object for inserts
GPUdbIngestor
Handles distributed ingest, and may be faster than using a GPUdbTable
object as there will be less overhead in the direct use of a GPUdbIngestor
Note that the list insert functions are effectively wrappers for the single
insert functions in the examples below; they serve as a convenience and do not
provide any inherent performance benefit.
# Get a handle to the ingest target tablet=gpudb.GPUdbTable(name=table_name,db=kinetica,use_multihead_io=True)forrecordinrecords:t.insert_records(record)t.flush_data_to_server()
GPUdbTable List Insert
1
2
3
4
5
6
# Get a handle to the ingest target tablet=gpudb.GPUdbTable(name=table_name,db=kinetica,use_multihead_io=True)t.insert_records(records)t.flush_data_to_server()
GPUdbIngestor Single Insert
1
2
3
4
5
6
7
8
9
# Create a GPUdbIngestor to do the multi-head ingest, using a table object# to get the type of record being ingestedtable_type=gpudb.GPUdbTable(name=table_name,db=kinetica).get_table_type()gi=gpudb.GPUdbIngestor(kinetica,table_name,table_type,BATCH_SIZE)forrecordinrecords:gi.insert_record(record)gi.flush()
GPUdbIngestor List Insert
1
2
3
4
5
6
7
8
# Create a GPUdbIngestor to do the distributed ingest, using a table object# to get the type of record being ingestedtable_type=gpudb.GPUdbTable(name=table_name,db=kinetica).get_table_type()gi=gpudb.GPUdbIngestor(kinetica,table_name,table_type,BATCH_SIZE)gi.insert_records(records)gi.flush()
Cautions
If using the Java API and MapReduce, there is a conflict between the version
of Avro used by Kinetica and the one used by MapReduce. This conflict
can be overcome by using the Maven shade plug-in with the relocation tag: