Kinetica   C#   API  Version 7.2.3.1
KineticaIngestor.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 
4 
5 namespace kinetica;
6 
19  [Obsolete("Use BulkInserter<T> instead. KineticaIngestor will be removed in a future version.")]
20  public class KineticaIngestor<T>
21  {
22  [Serializable]
24  {
25  public Uri url { get; private set; }
26  public IList<T> records { get; private set; }
27 
28  public InsertException( string msg ) : base( msg ) { }
29 
30  internal InsertException( Uri url_, IList<T> records_, string msg ) : base ( msg )
31  {
32  this.url = url_;
33  this.records = records_;
34  }
35 
36  public override string ToString() { return "InsertException: " + Message; }
37  } // end class InsertException
38 
39 
40 
41 
42  // KineticaIngestor Members:
43  // =========================
44  public Kinetica kineticaDB { get; }
45  public string table_name { get; }
46  public int batch_size { get; }
47  public IDictionary<string, string> options { get; }
48  //public IReadOnlyDictionary<string, string> options { get; }
49  public Int64 count_inserted;
50  public Int64 count_updated;
51  private KineticaType ktype;
52  private Utils.RecordKeyBuilder<T>? primaryKeyBuilder;
53  private Utils.RecordKeyBuilder<T>? shardKeyBuilder;
54  private IList<int>? routingTable;
55  private IList<Utils.WorkerQueue<T>> workerQueues;
56  private Random random;
57 
58 
68  public KineticaIngestor( Kinetica kdb, string tableName,
69  int batchSize, KineticaType ktype,
70  Dictionary<string, string>? options = null,
71  Utils.WorkerList? workers = null )
72  {
73  this.kineticaDB = kdb;
74  this.table_name = tableName;
75  this.ktype = ktype;
76 
77  // Validate and save the batch size
78  if ( batchSize < 1 )
79  throw new KineticaException( $"Batch size must be greater than one; given {batchSize}." );
80  this.batch_size = batchSize;
81 
82  this.options = options;
83 
84  // Set up the primary and shard key builders
85  // -----------------------------------------
86  this.primaryKeyBuilder = new Utils.RecordKeyBuilder<T>( true, this.ktype );
87  this.shardKeyBuilder = new Utils.RecordKeyBuilder<T>( false, this.ktype );
88 
89  // Based on the Java implementation
90  if ( this.primaryKeyBuilder.hasKey() )
91  { // There is a primary key for the given T
92  // Now check if there is a distinct shard key
93  if ( !this.shardKeyBuilder.hasKey()
94  || this.shardKeyBuilder.hasSameKey( this.primaryKeyBuilder ) )
95  this.shardKeyBuilder = this.primaryKeyBuilder; // no distinct shard key
96  }
97  else // there is no primary key for the given T
98  {
99  this.primaryKeyBuilder = null;
100 
101  // Check if there is shard key for T
102  if ( !this.shardKeyBuilder.hasKey() )
103  this.shardKeyBuilder = null;
104  } // done setting up the key builders
105 
106 
107  // Set up the worker queues
108  // -------------------------
109  // Do we update records if there are matching primary keys in the
110  // database already?
111  bool updateOnExistingPk = ( (options != null)
114  // Do T type records have a primary key?
115  bool hasPrimaryKey = (this.primaryKeyBuilder != null);
116  this.workerQueues = [];
117  try
118  {
119  // If no workers are given, try to get them from Kinetica
120  if ( ( workers == null ) || ( workers.Count == 0 ) )
121  {
122  workers = new Utils.WorkerList( kdb );
123  }
124 
125  // If we end up with multiple workers, either given by the
126  // user or obtained from Kinetica, then use those
127  if ( ( workers != null ) && ( workers.Count > 0 ) )
128  {
129  // Add worker queues per worker
130  foreach ( var workerUrl in workers )
131  {
132  // Skip removed ranks (null URLs)
133  if ( workerUrl == null ) continue;
134 
135  string strWorkerUrl = workerUrl.ToString();
136  strWorkerUrl = strWorkerUrl.EndsWith('/') ? strWorkerUrl[..^1] : strWorkerUrl;
137  string insert_records_worker_url_str = $"{strWorkerUrl}/insert/records";
138  System.Uri url = new( insert_records_worker_url_str );
139  Utils.WorkerQueue<T> worker_queue = new( url, batchSize, hasPrimaryKey, updateOnExistingPk );
140  this.workerQueues.Add( worker_queue );
141  }
142 
143  // Get the worker rank information from Kinetica
144  this.routingTable = kdb.adminShowShards().rank;
145  // Check that enough worker URLs are specified
146  for ( int i = 0; i < routingTable.Count; ++i )
147  {
148  if ( this.routingTable[i] > this.workerQueues.Count )
149  throw new KineticaException( "Not enough worker URLs specified." );
150  }
151  }
152  else // multihead-ingest is NOT turned on; use the regular Kinetica IP address
153  {
154  string strWorkerUrl = kdb.URL.ToString();
155  strWorkerUrl = strWorkerUrl.EndsWith('/') ? strWorkerUrl[..^1] : strWorkerUrl;
156  string insertRecordsUrlStr = $"{strWorkerUrl}/insert/records";
157  System.Uri url = new( insertRecordsUrlStr );
158  Utils.WorkerQueue<T> worker_queue = new( url, batchSize, hasPrimaryKey, updateOnExistingPk );
159  this.workerQueues.Add( worker_queue );
160  this.routingTable = null;
161  }
162  }
163  catch ( Exception ex )
164  {
165  throw new KineticaException( ex.ToString() );
166  }
167 
168  // Create the random number generator
169  this.random = new( (int) DateTime.Now.Ticks );
170  } // end constructor KineticaIngestor
171 
172 
178  public Int64 getCountInserted()
179  {
180  return System.Threading.Interlocked.Read( ref this.count_inserted );
181  }
182 
183 
189  public Int64 getCountUpdated()
190  {
191  return System.Threading.Interlocked.Read( ref this.count_updated );
192  }
193 
194 
205  public void flush()
206  {
207  foreach ( Utils.WorkerQueue<T> workerQueue in this.workerQueues )
208  {
209  // Flush the queue
210  IList<T> queue = workerQueue.flush();
211  // Actually insert the records
212  flush( queue, workerQueue.url );
213  }
214  } // end public flush
215 
216 
223  private void flush( IList<T> queue, System.Uri url )
224  {
225  if ( queue.Count == 0 )
226  return; // nothing to do since the queue is empty
227 
228  try
229  {
230  // Create the /insert/records request and response objects
231  // -------------------------------------------------------
232  // Encode the records into binary
233  List<byte[]> encodedQueue = [];
234  foreach ( var record in queue ) encodedQueue.Add( this.kineticaDB.AvroEncode( record ) );
235  RawInsertRecordsRequest request = new( this.table_name, encodedQueue, this.options);
236 
237  InsertRecordsResponse response = new();
238 
239  if ( url == null )
240  {
241  response = this.kineticaDB.insertRecordsRaw( request );
242  }
243  else
244  {
245  // Use SubmitRequestRaw for direct URL calls (no HA failover - handled by ingestor)
246  response = this.kineticaDB.SubmitRequestRaw<InsertRecordsResponse>( url, request );
247  }
248 
249  // Save the counts of inserted and updated records
250  System.Threading.Interlocked.Add( ref this.count_inserted, response.count_inserted );
251  System.Threading.Interlocked.Add( ref this.count_updated, response.count_updated );
252  }
253  catch ( Exception ex )
254  {
255  throw new InsertException( url, queue, ex.Message );
256  }
257  } // end private flush()
258 
259 
260 
272  public void insert( T record )
273  {
274  // Create the record keys
275  Utils.RecordKey? primaryKey = null; // used to check for uniqueness
276  Utils.RecordKey? shardKey = null; // used to find which worker to send this record to
277 
278  // Build the primary key, if any
279  if ( this.primaryKeyBuilder != null )
280  primaryKey = this.primaryKeyBuilder.build( record );
281 
282  // Build the shard/routing key, if any
283  if ( this.shardKeyBuilder != null )
284  shardKey = this.shardKeyBuilder.build( record );
285 
286  // Find out which worker to send the record to; then add the record
287  // to the appropriate worker's record queue
288  Utils.WorkerQueue<T> workerQueue;
289  if ( this.routingTable == null )
290  { // no information regarding multiple workers, so get the first/only one
291  workerQueue = this.workerQueues[0];
292  }
293  else if ( shardKey == null )
294  { // there is no shard/routing key, so get a random worker
295  workerQueue = this.workerQueues[ random.Next( this.workerQueues.Count ) ];
296  }
297  else
298  { // Get the worker based on the sharding/routing key
299  int worker_index = shardKey.route( this.routingTable );
300  workerQueue = this.workerQueues[worker_index];
301  }
302 
303  // Insert the record into the queue
304  IList<T> queue = workerQueue.insert( record, primaryKey );
305 
306  // If inserting the queue resulted in flushing the queue, then flush it
307  // properly
308  if ( queue != null )
309  {
310  this.flush( queue, workerQueue.url );
311  }
312  } // end insert( record )
313 
314 
315 
330  public void insert( IList<T> records)
331  {
332  // Insert one record at a time
333  for ( int i = 0; i < records.Count; ++i )
334  {
335  try
336  {
337  this.insert( records[ i ] );
338  }
339  catch ( InsertException ex )
340  {
341  // Add the remaining records to the insertion exception
342  // record queue
343  IList<T> queue = ex.records;
344 
345  for ( int j = i + 1; j < records.Count; ++j )
346  {
347  queue.Add( records[ j ] );
348  }
349 
350  // Rethrow (preserving stack trace)
351  throw;
352  } // end try-catch
353  } // end outer for loop
354  } // end insert( records )
355 
356 
357 
358  } // end class KineticaIngestor<T>
359 
360 
A set of string constants for the parameter options.
A set of parameters for Kinetica.insertRecordsRaw.
InsertRecordsResponse insertRecordsRaw(RawInsertRecordsRequest request_)
Adds multiple records to the specified table.
int count_inserted
The number of records inserted.
void insert(T record)
Queues a record for insertion into Kinetica.
Manages the insertion into GPUdb of large numbers of records in bulk,
const string UPDATE_ON_EXISTING_PK
Specifies the record collision policy for inserting into a table with a primary key.
Int64 getCountInserted()
Returns the count of records inserted so far.
A set of results returned by Kinetica.insertRecords.
void insert(IList< T > records)
Queues a list of records for insertion into Kinetica.
Int64 getCountUpdated()
Returns the count of records updated so far.
A set of parameters for Kinetica.insertRecords.
void flush()
Ensures that all queued records are inserted into Kinetica.
int count_updated
The number of records updated.
IList< int > rank
Array of ranks indexed by the shard number.
Failover to clusters in a random order (default)
AdminShowShardsResponse adminShowShards(AdminShowShardsRequest request_)
Show the mapping of shards to the corresponding rank and tom.
IDictionary< string, string > options
KineticaIngestor(Kinetica kdb, string tableName, int batchSize, KineticaType ktype, Dictionary< string, string >? options=null, Utils.WorkerList? workers=null)
DateTime in YYYY-MM-DD HH:MM:SS.mmm format
Uri URL
URL for Kinetica Server (including "http:" and port)
Definition: Kinetica.cs:152