2 using System.Collections.Generic;
15 public class KineticaIngestor<T>
20 public Uri url {
get;
private set; }
21 public IList<T> records {
get;
private set; }
22 private string message;
26 internal InsertException( Uri url_, IList<T> records_,
string msg ) : base ( msg )
30 this.records = records_;
33 public override string ToString() {
return "InsertException: " + message; }
42 public string table_name {
get; }
43 public int batch_size {
get; }
44 public IDictionary<string, string> options {
get; }
49 private Utils.RecordKeyBuilder<T> primary_key_builder;
50 private Utils.RecordKeyBuilder<T> shard_key_builder;
51 private IList<int> routing_table;
52 private IList<Utils.WorkerQueue<T>> worker_queues;
53 private Random random;
67 Dictionary<string, string> options = null,
68 Utils.WorkerList workers = null )
70 this.kineticaDB = kdb;
71 this.table_name = table_name;
76 throw new KineticaException( $
"Batch size must be greater than one; given {batch_size}." );
77 this.batch_size = batch_size;
80 if ( options != null )
82 this.options = options;
92 this.primary_key_builder =
new Utils.RecordKeyBuilder<T>(
true, this.ktype );
93 this.shard_key_builder =
new Utils.RecordKeyBuilder<T>(
false, this.ktype );
96 if ( this.primary_key_builder.hasKey() )
99 if ( !this.shard_key_builder.hasKey()
100 || this.shard_key_builder.hasSameKey( this.primary_key_builder ) )
101 this.shard_key_builder = this.primary_key_builder;
105 this.primary_key_builder = null;
108 if ( !this.shard_key_builder.hasKey() )
109 this.shard_key_builder = null;
117 bool update_on_existing_pk = ( (options != null)
121 bool has_primary_key = (this.primary_key_builder != null);
122 this.worker_queues =
new List<Utils.WorkerQueue<T>>();
126 if ( ( workers == null ) || ( workers.Count == 0 ) )
133 if ( ( workers != null ) && ( workers.Count > 0 ) )
136 foreach ( System.Uri worker_url in workers )
138 string insert_records_worker_url_str = (worker_url.ToString() +
"insert/records");
139 System.Uri url =
new System.Uri( insert_records_worker_url_str );
140 Utils.WorkerQueue<T> worker_queue =
new Utils.WorkerQueue<T>( url, batch_size,
142 update_on_existing_pk );
143 this.worker_queues.Add( worker_queue );
147 this.routing_table = kdb.adminShowShards().rank;
149 for (
int i = 0; i < routing_table.Count; ++i )
151 if ( this.routing_table[i] > this.worker_queues.Count )
157 string insert_records_url_str = ( kdb.URL.ToString() +
"insert/records" );
158 System.Uri url =
new System.Uri( insert_records_url_str );
159 Utils.WorkerQueue<T> worker_queue =
new Utils.WorkerQueue<T>( url, batch_size, has_primary_key, update_on_existing_pk );
160 this.worker_queues.Add( worker_queue );
161 this.routing_table = null;
164 catch ( Exception ex )
170 this.random =
new Random( (
int) DateTime.Now.Ticks );
181 return System.Threading.Interlocked.Read( ref this.count_inserted );
192 return System.Threading.Interlocked.Read( ref this.count_updated );
207 foreach ( Utils.WorkerQueue<T> worker_queue in
this.worker_queues )
210 IList<T> queue = worker_queue.flush();
212 flush( queue, worker_queue.url );
223 private void flush( IList<T> queue, System.Uri url )
225 if ( queue.Count == 0 )
233 IList<byte[]> encoded_queue =
new List<byte[]>();
234 foreach ( var record
in queue ) encoded_queue.Add( this.kineticaDB.AvroEncode( record ) );
243 response = this.kineticaDB.insertRecordsRaw( request );
247 response = this.kineticaDB.SubmitRequest<InsertRecordsResponse>( url, request );
251 System.Threading.Interlocked.Add( ref this.count_inserted, response.count_inserted );
252 System.Threading.Interlocked.Add( ref this.count_updated, response.count_updated );
254 catch ( Exception ex )
256 throw new InsertException<T>( url, queue, ex.Message );
275 Utils.RecordKey primary_key = null;
276 Utils.RecordKey shard_key = null;
279 if ( this.primary_key_builder != null )
280 primary_key = this.primary_key_builder.build( record );
283 if ( this.shard_key_builder != null )
284 shard_key = this.shard_key_builder.build( record );
288 Utils.WorkerQueue<T> worker_queue;
289 if ( this.routing_table == null )
291 worker_queue = this.worker_queues[0];
293 else if ( shard_key == null )
295 worker_queue = this.worker_queues[ random.Next( this.worker_queues.Count ) ];
299 int worker_index = shard_key.route( this.routing_table );
300 worker_queue = this.worker_queues[worker_index];
304 IList<T> queue = worker_queue.insert( record, primary_key );
310 this.flush( queue, worker_queue.url );
332 for (
int i = 0; i < records.Count; ++i )
336 this.insert( records[ i ] );
338 catch ( InsertException<T> ex )
342 IList<T> queue = ex.records;
344 for (
int j = i + 1; j < records.Count; ++j )
346 queue.Add( records[ j ] );
InsertException(string msg)
A list of worker URLs to use for multi-head ingest.
KineticaIngestor(Kinetica kdb, string table_name, int batch_size, KineticaType ktype, Dictionary< string, string > options=null, Utils.WorkerList workers=null)
A set of parameters for Kinetica.insertRecords{T}(string,IList{T},IDictionary{string, string}).
void insert(T record)
Queues a record for insertion into Kinetica.
Int64 getCountInserted()
Returns the count of records inserted so far.
void insert(IList< T > records)
Queues a list of records for insertion into Kientica.
void flush()
Ensures that all queued records are inserted into Kinetica.
A set of parameters for Kinetica.insertRecords{T}(string,IList{T},IDictionary{string, string}).
Int64 getCountUpdated()
Returns the count of records updated so far.
A set of results returned by Kinetica.insertRecords{T}(string,IList{T},IDictionary{string, string}).
API to talk to Kinetica Database
override string ToString()