2 using System.Collections.Generic;
20 public Uri
url {
get;
private set; }
21 public IList<T>
records {
get;
private set; }
22 private string message;
26 internal InsertException( Uri url_, IList<T> records_,
string msg ) : base ( msg )
33 public override string ToString() {
return "InsertException: " + message; }
44 public IDictionary<string, string>
options {
get; }
49 private Utils.RecordKeyBuilder<T>? primaryKeyBuilder;
50 private Utils.RecordKeyBuilder<T>? shardKeyBuilder;
51 private IList<int>? routingTable;
52 private IList<Utils.WorkerQueue<T>> workerQueues;
53 private Random random;
67 Dictionary<string, string>?
options =
null,
68 Utils.WorkerList? workers =
null )
76 throw new KineticaException( $
"Batch size must be greater than one; given {batchSize}." );
83 this.primaryKeyBuilder =
new Utils.RecordKeyBuilder<T>(
true, this.ktype );
84 this.shardKeyBuilder =
new Utils.RecordKeyBuilder<T>(
false, this.ktype );
87 if ( this.primaryKeyBuilder.hasKey() )
90 if ( !this.shardKeyBuilder.hasKey()
91 || this.shardKeyBuilder.hasSameKey( this.primaryKeyBuilder ) )
92 this.shardKeyBuilder = this.primaryKeyBuilder;
96 this.primaryKeyBuilder =
null;
99 if ( !this.shardKeyBuilder.hasKey() )
100 this.shardKeyBuilder =
null;
108 bool updateOnExistingPk = ( (
options !=
null)
112 bool hasPrimaryKey = (this.primaryKeyBuilder !=
null);
113 this.workerQueues = [];
117 if ( ( workers ==
null ) || ( workers.Count == 0 ) )
119 workers =
new Utils.WorkerList( kdb );
124 if ( ( workers !=
null ) && ( workers.Count > 0 ) )
127 foreach ( System.Uri workerUrl in workers )
129 string strWorkerUrl = workerUrl.ToString();
130 strWorkerUrl = strWorkerUrl.EndsWith(
'/') ? strWorkerUrl[..^1] : strWorkerUrl;
131 string insert_records_worker_url_str = $
"{strWorkerUrl}/insert/records";
132 System.Uri url =
new( insert_records_worker_url_str );
133 Utils.WorkerQueue<T> worker_queue =
new( url, batchSize, hasPrimaryKey, updateOnExistingPk );
134 this.workerQueues.Add( worker_queue );
140 for (
int i = 0; i < routingTable.Count; ++i )
142 if ( this.routingTable[i] > this.workerQueues.Count )
148 string strWorkerUrl = kdb.
URL.ToString();
149 strWorkerUrl = strWorkerUrl.EndsWith(
'/') ? strWorkerUrl[..^1] : strWorkerUrl;
150 string insertRecordsUrlStr = $
"{strWorkerUrl}/insert/records";
151 System.Uri url =
new( insertRecordsUrlStr );
152 Utils.WorkerQueue<T> worker_queue =
new( url, batchSize, hasPrimaryKey, updateOnExistingPk );
153 this.workerQueues.Add( worker_queue );
154 this.routingTable =
null;
157 catch ( Exception ex )
163 this.random =
new( (int) DateTime.Now.Ticks );
174 return System.Threading.Interlocked.Read( ref this.count_inserted );
185 return System.Threading.Interlocked.Read( ref this.count_updated );
201 foreach ( Utils.WorkerQueue<T> workerQueue in
this.workerQueues )
204 IList<T> queue = workerQueue.flush();
206 flush( queue, workerQueue.url );
217 private void flush( IList<T> queue, System.Uri url )
219 if ( queue.Count == 0 )
227 List<byte[]> encodedQueue = [];
228 foreach ( var record
in queue ) encodedQueue.Add( this.
kineticaDB.AvroEncode( record ) );
239 response = this.
kineticaDB.SubmitRequest<InsertRecordsResponse>( url, request );
243 System.Threading.Interlocked.Add( ref this.count_inserted, response.
count_inserted );
244 System.Threading.Interlocked.Add( ref this.count_updated, response.
count_updated );
246 catch ( Exception ex )
248 throw new InsertException<T>( url, queue, ex.Message );
268 Utils.RecordKey? primaryKey =
null;
269 Utils.RecordKey? shardKey =
null;
272 if ( this.primaryKeyBuilder !=
null )
273 primaryKey = this.primaryKeyBuilder.build( record );
276 if ( this.shardKeyBuilder !=
null )
277 shardKey = this.shardKeyBuilder.build( record );
281 Utils.WorkerQueue<T> workerQueue;
282 if ( this.routingTable ==
null )
284 workerQueue = this.workerQueues[0];
286 else if ( shardKey ==
null )
288 workerQueue = this.workerQueues[ random.Next( this.workerQueues.Count ) ];
292 int worker_index = shardKey.route( this.routingTable );
293 workerQueue = this.workerQueues[worker_index];
297 IList<T> queue = workerQueue.insert( record, primaryKey );
303 this.
flush( queue, workerQueue.url );
326 for (
int i = 0; i < records.Count; ++i )
330 this.
insert( records[ i ] );
338 for (
int j = i + 1; j < records.Count; ++j )
340 queue.Add( records[ j ] );
int count_inserted
The number of records inserted.
const string UPDATE_ON_EXISTING_PK
Specifies the record collision policy for inserting into a table with a primary key.
Int64 getCountInserted()
Returns the count of records inserted so far.
void insert(T record)
Queues a record for insertion into Kinetica.
AdminShowShardsResponse adminShowShards(AdminShowShardsRequest request_)
Show the mapping of shards to the corresponding rank and tom.
InsertException(string msg)
Uri URL
URL for Kinetica Server (including "http:" and port)
int count_updated
The number of records updated.
override string ToString()
A set of parameters for Kinetica.insertRecords.
void insert(IList< T > records)
Queues a list of records for insertion into Kinetica.
A set of parameters for Kinetica.insertRecordsRaw.
IList< int > rank
Array of ranks indexed by the shard number.
KineticaIngestor(Kinetica kdb, string tableName, int batchSize, KineticaType ktype, Dictionary< string, string >? options=null, Utils.WorkerList? workers=null)
A set of results returned by Kinetica.insertRecords.
IDictionary< string, string > options
InsertRecordsResponse insertRecordsRaw(RawInsertRecordsRequest request_)
Adds multiple records to the specified table.
A set of string constants for the parameter options.
API to talk to Kinetica Database
Int64 getCountUpdated()
Returns the count of records updated so far.
Manages the insertion into GPUdb of large numbers of records in bulk, with automatic batch management...
void flush()
Ensures that all queued records are inserted into Kinetica.