2 using System.Collections.Generic;
20 public Uri
url {
get;
private set; }
21 public IList<T>
records {
get;
private set; }
22 private string message;
26 internal InsertException( Uri url_, IList<T> records_,
string msg ) : base ( msg )
33 public override string ToString() {
return "InsertException: " + message; }
44 public IDictionary<string, string>
options {
get; }
49 private Utils.RecordKeyBuilder<T> primary_key_builder;
50 private Utils.RecordKeyBuilder<T> shard_key_builder;
51 private IList<int> routing_table;
52 private IList<Utils.WorkerQueue<T>> worker_queues;
53 private Random random;
67 Dictionary<string, string>
options = null,
68 Utils.WorkerList workers = null )
76 throw new KineticaException( $
"Batch size must be greater than one; given {batch_size}." );
92 this.primary_key_builder =
new Utils.RecordKeyBuilder<T>(
true, this.ktype );
93 this.shard_key_builder =
new Utils.RecordKeyBuilder<T>(
false, this.ktype );
96 if ( this.primary_key_builder.hasKey() )
99 if ( !this.shard_key_builder.hasKey()
100 || this.shard_key_builder.hasSameKey( this.primary_key_builder ) )
101 this.shard_key_builder = this.primary_key_builder;
105 this.primary_key_builder = null;
108 if ( !this.shard_key_builder.hasKey() )
109 this.shard_key_builder = null;
117 bool update_on_existing_pk = ( (
options != null)
121 bool has_primary_key = (this.primary_key_builder != null);
122 this.worker_queues =
new List<Utils.WorkerQueue<T>>();
126 if ( ( workers == null ) || ( workers.Count == 0 ) )
128 workers =
new Utils.WorkerList( kdb );
133 if ( ( workers != null ) && ( workers.Count > 0 ) )
136 foreach (
System.Uri worker_url in workers )
138 string insert_records_worker_url_str = (worker_url.ToString() +
"insert/records");
140 Utils.WorkerQueue<T> worker_queue =
new Utils.WorkerQueue<T>(
url,
batch_size,
142 update_on_existing_pk );
143 this.worker_queues.Add( worker_queue );
149 for (
int i = 0; i < routing_table.Count; ++i )
151 if ( this.routing_table[i] > this.worker_queues.Count )
157 string insert_records_url_str = ( kdb.
URL.ToString() +
"insert/records" );
159 Utils.WorkerQueue<T> worker_queue =
new Utils.WorkerQueue<T>(
url,
batch_size, has_primary_key, update_on_existing_pk );
160 this.worker_queues.Add( worker_queue );
161 this.routing_table = null;
164 catch ( Exception ex )
170 this.random =
new Random( (
int) DateTime.Now.Ticks );
181 return System.Threading.Interlocked.Read( ref this.count_inserted );
192 return System.Threading.Interlocked.Read( ref this.count_updated );
207 foreach ( Utils.WorkerQueue<T> worker_queue in
this.worker_queues )
210 IList<T> queue = worker_queue.flush();
212 flush( queue, worker_queue.url );
225 if ( queue.Count == 0 )
233 IList<byte[]> encoded_queue =
new List<byte[]>();
234 foreach ( var record
in queue ) encoded_queue.Add( this.
kineticaDB.AvroEncode( record ) );
254 catch ( Exception ex )
275 Utils.RecordKey primary_key = null;
276 Utils.RecordKey shard_key = null;
279 if ( this.primary_key_builder != null )
280 primary_key = this.primary_key_builder.build( record );
283 if ( this.shard_key_builder != null )
284 shard_key = this.shard_key_builder.build( record );
288 Utils.WorkerQueue<T> worker_queue;
289 if ( this.routing_table == null )
291 worker_queue = this.worker_queues[0];
293 else if ( shard_key == null )
295 worker_queue = this.worker_queues[ random.Next( this.worker_queues.Count ) ];
299 int worker_index = shard_key.route( this.routing_table );
300 worker_queue = this.worker_queues[worker_index];
304 IList<T> queue = worker_queue.insert( record, primary_key );
310 this.
flush( queue, worker_queue.url );
332 for (
int i = 0; i < records.Count; ++i )
336 this.
insert( records[ i ] );
344 for (
int j = i + 1; j < records.Count; ++j )
346 queue.Add( records[ j ] );
int count_inserted
The number of records inserted.
const string UPDATE_ON_EXISTING_PK
Specifies the record collision policy for inserting into a table with a primary key.
Int64 getCountInserted()
Returns the count of records inserted so far.
void insert(T record)
Queues a record for insertion into Kinetica.
AdminShowShardsResponse adminShowShards(AdminShowShardsRequest request_)
Show the mapping of shards to the corresponding rank and tom.
InsertException(string msg)
Uri URL
URL for Kinetica Server (including "http:" and port)
KineticaIngestor(Kinetica kdb, string table_name, int batch_size, KineticaType ktype, Dictionary< string, string > options=null, Utils.WorkerList workers=null)
int count_updated
The number of records updated.
override string ToString()
A set of parameters for Kinetica.insertRecords<T>(string,IList<T>,IDictionary<string, string>).
void insert(IList< T > records)
Queues a list of records for insertion into Kientica.
A set of parameters for Kinetica.insertRecords<T>(string,IList<T>,IDictionary<string, string>).
IList< int > rank
Array of ranks indexed by the shard number.
A set of results returned by Kinetica.insertRecords<T>(string,IList<T>,IDictionary<string, string>).
IDictionary< string, string > options
InsertRecordsResponse insertRecordsRaw(RawInsertRecordsRequest request_)
Adds multiple records to the specified table.
API to talk to Kinetica Database
Int64 getCountUpdated()
Returns the count of records updated so far.
Manages the insertion into GPUdb of large numbers of records in bulk, with automatic batch management...
void flush()
Ensures that all queued records are inserted into Kinetica.