Kinetica   C#   API  Version 7.2.3.0
KineticaIngestor.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 
4 
5 namespace kinetica
6 {
15  public class KineticaIngestor<T>
16  {
17  [Serializable]
19  {
20  public Uri url { get; private set; }
21  public IList<T> records { get; private set; }
22  private string message;
23 
24  public InsertException( string msg ) : base( msg ) { }
25 
26  internal InsertException( Uri url_, IList<T> records_, string msg ) : base ( msg )
27  {
28  this.message = msg;
29  this.url = url_;
30  this.records = records_;
31  }
32 
33  public override string ToString() { return "InsertException: " + message; }
34  } // end class InsertException
35 
36 
37 
38 
39  // KineticaIngestor Members:
40  // =========================
41  public Kinetica kineticaDB { get; }
42  public string table_name { get; }
43  public int batch_size { get; }
44  public IDictionary<string, string> options { get; }
45  //public IReadOnlyDictionary<string, string> options { get; }
46  public Int64 count_inserted;
47  public Int64 count_updated;
48  private KineticaType ktype;
49  private Utils.RecordKeyBuilder<T>? primaryKeyBuilder;
50  private Utils.RecordKeyBuilder<T>? shardKeyBuilder;
51  private IList<int>? routingTable;
52  private IList<Utils.WorkerQueue<T>> workerQueues;
53  private Random random;
54 
55 
65  public KineticaIngestor( Kinetica kdb, string tableName,
66  int batchSize, KineticaType ktype,
67  Dictionary<string, string>? options = null,
68  Utils.WorkerList? workers = null )
69  {
70  this.kineticaDB = kdb;
71  this.table_name = tableName;
72  this.ktype = ktype;
73 
74  // Validate and save the batch size
75  if ( batchSize < 1 )
76  throw new KineticaException( $"Batch size must be greater than one; given {batchSize}." );
77  this.batch_size = batchSize;
78 
79  this.options = options;
80 
81  // Set up the primary and shard key builders
82  // -----------------------------------------
83  this.primaryKeyBuilder = new Utils.RecordKeyBuilder<T>( true, this.ktype );
84  this.shardKeyBuilder = new Utils.RecordKeyBuilder<T>( false, this.ktype );
85 
86  // Based on the Java implementation
87  if ( this.primaryKeyBuilder.hasKey() )
88  { // There is a primary key for the given T
89  // Now check if there is a distinct shard key
90  if ( !this.shardKeyBuilder.hasKey()
91  || this.shardKeyBuilder.hasSameKey( this.primaryKeyBuilder ) )
92  this.shardKeyBuilder = this.primaryKeyBuilder; // no distinct shard key
93  }
94  else // there is no primary key for the given T
95  {
96  this.primaryKeyBuilder = null;
97 
98  // Check if there is shard key for T
99  if ( !this.shardKeyBuilder.hasKey() )
100  this.shardKeyBuilder = null;
101  } // done setting up the key builders
102 
103 
104  // Set up the worker queues
105  // -------------------------
106  // Do we update records if there are matching primary keys in the
107  // database already?
108  bool updateOnExistingPk = ( (options != null)
111  // Do T type records have a primary key?
112  bool hasPrimaryKey = (this.primaryKeyBuilder != null);
113  this.workerQueues = [];
114  try
115  {
116  // If no workers are given, try to get them from Kinetica
117  if ( ( workers == null ) || ( workers.Count == 0 ) )
118  {
119  workers = new Utils.WorkerList( kdb );
120  }
121 
122  // If we end up with multiple workers, either given by the
123  // user or obtained from Kinetica, then use those
124  if ( ( workers != null ) && ( workers.Count > 0 ) )
125  {
126  // Add worker queues per worker
127  foreach ( System.Uri workerUrl in workers )
128  {
129  string strWorkerUrl = workerUrl.ToString();
130  strWorkerUrl = strWorkerUrl.EndsWith('/') ? strWorkerUrl[..^1] : strWorkerUrl;
131  string insert_records_worker_url_str = $"{strWorkerUrl}/insert/records";
132  System.Uri url = new( insert_records_worker_url_str );
133  Utils.WorkerQueue<T> worker_queue = new( url, batchSize, hasPrimaryKey, updateOnExistingPk );
134  this.workerQueues.Add( worker_queue );
135  }
136 
137  // Get the worker rank information from Kinetica
138  this.routingTable = kdb.adminShowShards().rank;
139  // Check that enough worker URLs are specified
140  for ( int i = 0; i < routingTable.Count; ++i )
141  {
142  if ( this.routingTable[i] > this.workerQueues.Count )
143  throw new KineticaException( "Not enough worker URLs specified." );
144  }
145  }
146  else // multihead-ingest is NOT turned on; use the regular Kinetica IP address
147  {
148  string strWorkerUrl = kdb.URL.ToString();
149  strWorkerUrl = strWorkerUrl.EndsWith('/') ? strWorkerUrl[..^1] : strWorkerUrl;
150  string insertRecordsUrlStr = $"{strWorkerUrl}/insert/records";
151  System.Uri url = new( insertRecordsUrlStr );
152  Utils.WorkerQueue<T> worker_queue = new( url, batchSize, hasPrimaryKey, updateOnExistingPk );
153  this.workerQueues.Add( worker_queue );
154  this.routingTable = null;
155  }
156  }
157  catch ( Exception ex )
158  {
159  throw new KineticaException( ex.ToString() );
160  }
161 
162  // Create the random number generator
163  this.random = new( (int) DateTime.Now.Ticks );
164  } // end constructor KineticaIngestor
165 
166 
172  public Int64 getCountInserted()
173  {
174  return System.Threading.Interlocked.Read( ref this.count_inserted );
175  }
176 
177 
183  public Int64 getCountUpdated()
184  {
185  return System.Threading.Interlocked.Read( ref this.count_updated );
186  }
187 
188 
199  public void flush()
200  {
201  foreach ( Utils.WorkerQueue<T> workerQueue in this.workerQueues )
202  {
203  // Flush the queue
204  IList<T> queue = workerQueue.flush();
205  // Actually insert the records
206  flush( queue, workerQueue.url );
207  }
208  } // end public flush
209 
210 
217  private void flush( IList<T> queue, System.Uri url )
218  {
219  if ( queue.Count == 0 )
220  return; // nothing to do since the queue is empty
221 
222  try
223  {
224  // Create the /insert/records request and response objects
225  // -------------------------------------------------------
226  // Encode the records into binary
227  List<byte[]> encodedQueue = [];
228  foreach ( var record in queue ) encodedQueue.Add( this.kineticaDB.AvroEncode( record ) );
229  RawInsertRecordsRequest request = new( this.table_name, encodedQueue, this.options);
230 
231  InsertRecordsResponse response = new();
232 
233  if ( url == null )
234  {
235  response = this.kineticaDB.insertRecordsRaw( request );
236  }
237  else
238  {
239  response = this.kineticaDB.SubmitRequest<InsertRecordsResponse>( url, request );
240  }
241 
242  // Save the counts of inserted and updated records
243  System.Threading.Interlocked.Add( ref this.count_inserted, response.count_inserted );
244  System.Threading.Interlocked.Add( ref this.count_updated, response.count_updated );
245  }
246  catch ( Exception ex )
247  {
248  throw new InsertException<T>( url, queue, ex.Message );
249  }
250  } // end private flush()
251 
252 
253 
265  public void insert( T record )
266  {
267  // Create the record keys
268  Utils.RecordKey? primaryKey = null; // used to check for uniqueness
269  Utils.RecordKey? shardKey = null; // used to find which worker to send this record to
270 
271  // Build the primary key, if any
272  if ( this.primaryKeyBuilder != null )
273  primaryKey = this.primaryKeyBuilder.build( record );
274 
275  // Build the shard/routing key, if any
276  if ( this.shardKeyBuilder != null )
277  shardKey = this.shardKeyBuilder.build( record );
278 
279  // Find out which worker to send the record to; then add the record
280  // to the appropriate worker's record queue
281  Utils.WorkerQueue<T> workerQueue;
282  if ( this.routingTable == null )
283  { // no information regarding multiple workers, so get the first/only one
284  workerQueue = this.workerQueues[0];
285  }
286  else if ( shardKey == null )
287  { // there is no shard/routing key, so get a random worker
288  workerQueue = this.workerQueues[ random.Next( this.workerQueues.Count ) ];
289  }
290  else
291  { // Get the worker based on the sharding/routing key
292  int worker_index = shardKey.route( this.routingTable );
293  workerQueue = this.workerQueues[worker_index];
294  }
295 
296  // Insert the record into the queue
297  IList<T> queue = workerQueue.insert( record, primaryKey );
298 
299  // If inserting the queue resulted in flushing the queue, then flush it
300  // properly
301  if ( queue != null )
302  {
303  this.flush( queue, workerQueue.url );
304  }
305  } // end insert( record )
306 
307 
308 
323  public void insert( IList<T> records)
324  {
325  // Insert one record at a time
326  for ( int i = 0; i < records.Count; ++i )
327  {
328  try
329  {
330  this.insert( records[ i ] );
331  }
332  catch ( InsertException<T> ex )
333  {
334  // Add the remaining records to the insertion exception
335  // record queue
336  IList<T> queue = ex.records;
337 
338  for ( int j = i + 1; j < records.Count; ++j )
339  {
340  queue.Add( records[ j ] );
341  }
342 
343  // Rethrow
344  throw ex;
345  } // end try-catch
346  } // end outer for loop
347  } // end insert( records )
348 
349 
350 
351  } // end class KineticaIngestor<T>
352 
353 
354 
355 
356 } // end namespace kinetica
int count_inserted
The number of records inserted.
const string UPDATE_ON_EXISTING_PK
Specifies the record collision policy for inserting into a table with a primary key.
Int64 getCountInserted()
Returns the count of records inserted so far.
void insert(T record)
Queues a record for insertion into Kinetica.
AdminShowShardsResponse adminShowShards(AdminShowShardsRequest request_)
Show the mapping of shards to the corresponding rank and tom.
Uri URL
URL for Kinetica Server (including "http:" and port)
Definition: Kinetica.cs:91
int count_updated
The number of records updated.
A set of parameters for Kinetica.insertRecords.
void insert(IList< T > records)
Queues a list of records for insertion into Kinetica.
A set of parameters for Kinetica.insertRecordsRaw.
IList< int > rank
Array of ranks indexed by the shard number.
KineticaIngestor(Kinetica kdb, string tableName, int batchSize, KineticaType ktype, Dictionary< string, string >? options=null, Utils.WorkerList? workers=null)
A set of results returned by Kinetica.insertRecords.
IDictionary< string, string > options
InsertRecordsResponse insertRecordsRaw(RawInsertRecordsRequest request_)
Adds multiple records to the specified table.
A set of string constants for the parameter options.
API to talk to Kinetica Database
Definition: Kinetica.cs:40
Int64 getCountUpdated()
Returns the count of records updated so far.
Manages the insertion into GPUdb of large numbers of records in bulk, with automatic batch management...
void flush()
Ensures that all queued records are inserted into Kinetica.