2 using System.Collections.Generic;
19 [Obsolete(
"Use BulkInserter<T> instead. KineticaIngestor will be removed in a future version.")]
25 public Uri
url {
get;
private set; }
26 public IList<T>
records {
get;
private set; }
30 internal InsertException( Uri url_, IList<T> records_,
string msg ) : base ( msg )
36 public override string ToString() {
return "InsertException: " + Message; }
47 public IDictionary<string, string>
options {
get; }
52 private Utils.RecordKeyBuilder<T>? primaryKeyBuilder;
53 private Utils.RecordKeyBuilder<T>? shardKeyBuilder;
54 private IList<int>? routingTable;
55 private IList<Utils.WorkerQueue<T>> workerQueues;
70 Dictionary<string, string>?
options =
null,
71 Utils.WorkerList? workers =
null )
79 throw new KineticaException( $
"Batch size must be greater than one; given {batchSize}." );
86 this.primaryKeyBuilder =
new Utils.RecordKeyBuilder<T>(
true, this.ktype );
87 this.shardKeyBuilder =
new Utils.RecordKeyBuilder<T>(
false, this.ktype );
90 if ( this.primaryKeyBuilder.hasKey() )
93 if ( !this.shardKeyBuilder.hasKey()
94 || this.shardKeyBuilder.hasSameKey( this.primaryKeyBuilder ) )
95 this.shardKeyBuilder = this.primaryKeyBuilder;
99 this.primaryKeyBuilder =
null;
102 if ( !this.shardKeyBuilder.hasKey() )
103 this.shardKeyBuilder =
null;
111 bool updateOnExistingPk = ( (
options !=
null)
115 bool hasPrimaryKey = (this.primaryKeyBuilder !=
null);
116 this.workerQueues = [];
120 if ( ( workers ==
null ) || ( workers.Count == 0 ) )
122 workers =
new Utils.WorkerList( kdb );
127 if ( ( workers !=
null ) && ( workers.Count > 0 ) )
130 foreach ( var workerUrl
in workers )
133 if ( workerUrl ==
null )
continue;
135 string strWorkerUrl = workerUrl.ToString();
136 strWorkerUrl = strWorkerUrl.EndsWith(
'/') ? strWorkerUrl[..^1] : strWorkerUrl;
137 string insert_records_worker_url_str = $
"{strWorkerUrl}/insert/records";
138 System.Uri url =
new( insert_records_worker_url_str );
139 Utils.WorkerQueue<T> worker_queue =
new( url, batchSize, hasPrimaryKey, updateOnExistingPk );
140 this.workerQueues.Add( worker_queue );
146 for (
int i = 0; i < routingTable.Count; ++i )
148 if ( this.routingTable[i] > this.workerQueues.Count )
154 string strWorkerUrl = kdb.
URL.ToString();
155 strWorkerUrl = strWorkerUrl.EndsWith(
'/') ? strWorkerUrl[..^1] : strWorkerUrl;
156 string insertRecordsUrlStr = $
"{strWorkerUrl}/insert/records";
157 System.Uri url =
new( insertRecordsUrlStr );
158 Utils.WorkerQueue<T> worker_queue =
new( url, batchSize, hasPrimaryKey, updateOnExistingPk );
159 this.workerQueues.Add( worker_queue );
160 this.routingTable =
null;
163 catch ( Exception ex )
169 this.random =
new( (int)
DateTime.Now.Ticks );
180 return System.Threading.Interlocked.Read( ref this.count_inserted );
191 return System.Threading.Interlocked.Read( ref this.count_updated );
207 foreach ( Utils.WorkerQueue<T> workerQueue in
this.workerQueues )
210 IList<T> queue = workerQueue.flush();
212 flush( queue, workerQueue.url );
223 private void flush( IList<T> queue, System.Uri url )
225 if ( queue.Count == 0 )
233 List<byte[]> encodedQueue = [];
234 foreach ( var record
in queue ) encodedQueue.Add( this.
kineticaDB.AvroEncode( record ) );
250 System.Threading.Interlocked.Add( ref this.count_inserted, response.
count_inserted );
251 System.Threading.Interlocked.Add( ref this.count_updated, response.
count_updated );
253 catch ( Exception ex )
255 throw new InsertException( url, queue, ex.Message );
275 Utils.RecordKey? primaryKey =
null;
276 Utils.RecordKey? shardKey =
null;
279 if ( this.primaryKeyBuilder !=
null )
280 primaryKey = this.primaryKeyBuilder.build( record );
283 if ( this.shardKeyBuilder !=
null )
284 shardKey = this.shardKeyBuilder.build( record );
288 Utils.WorkerQueue<T> workerQueue;
289 if ( this.routingTable ==
null )
291 workerQueue = this.workerQueues[0];
293 else if ( shardKey ==
null )
295 workerQueue = this.workerQueues[ random.Next( this.workerQueues.Count ) ];
299 int worker_index = shardKey.route( this.routingTable );
300 workerQueue = this.workerQueues[worker_index];
304 IList<T> queue = workerQueue.insert( record, primaryKey );
310 this.
flush( queue, workerQueue.url );
333 for (
int i = 0; i < records.Count; ++i )
337 this.
insert( records[ i ] );
345 for (
int j = i + 1; j < records.Count; ++j )
347 queue.Add( records[ j ] );
A set of string constants for the parameter options.
A set of parameters for Kinetica.insertRecordsRaw.
InsertRecordsResponse insertRecordsRaw(RawInsertRecordsRequest request_)
Adds multiple records to the specified table.
int count_inserted
The number of records inserted.
void insert(T record)
Queues a record for insertion into Kinetica.
Manages the insertion into GPUdb of large numbers of records in bulk,
const string UPDATE_ON_EXISTING_PK
Specifies the record collision policy for inserting into a table with a primary key.
Int64 getCountInserted()
Returns the count of records inserted so far.
A set of results returned by Kinetica.insertRecords.
void insert(IList< T > records)
Queues a list of records for insertion into Kinetica.
Int64 getCountUpdated()
Returns the count of records updated so far.
InsertException(string msg)
A set of parameters for Kinetica.insertRecords.
void flush()
Ensures that all queued records are inserted into Kinetica.
int count_updated
The number of records updated.
IList< int > rank
Array of ranks indexed by the shard number.
override string ToString()
Failover to clusters in a random order (default)
AdminShowShardsResponse adminShowShards(AdminShowShardsRequest request_)
Show the mapping of shards to the corresponding rank and tom.
IDictionary< string, string > options
KineticaIngestor(Kinetica kdb, string tableName, int batchSize, KineticaType ktype, Dictionary< string, string >? options=null, Utils.WorkerList? workers=null)
DateTime in YYYY-MM-DD HH:MM:SS.mmm format
Uri URL
URL for Kinetica Server (including "http:" and port)