Kinetica C# API  Version 7.0.19.0
 All Classes Namespaces Files Functions Variables Enumerations Enumerator Properties Pages
KineticaIngestor.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 
4 
5 namespace kinetica
6 {
15  public class KineticaIngestor<T>
16  {
17  [Serializable]
18  public class InsertException<T> : KineticaException
19  {
20  public Uri url { get; private set; }
21  public IList<T> records { get; private set; }
22  private string message;
23 
24  public InsertException( string msg ) : base( msg ) { }
25 
26  internal InsertException( Uri url_, IList<T> records_, string msg ) : base ( msg )
27  {
28  this.message = msg;
29  this.url = url_;
30  this.records = records_;
31  }
32 
33  public override string ToString() { return "InsertException: " + message; }
34  } // end class InsertException
35 
36 
37 
38 
39  // KineticaIngestor Members:
40  // =========================
41  public Kinetica kineticaDB { get; }
42  public string table_name { get; }
43  public int batch_size { get; }
44  public IDictionary<string, string> options { get; }
45  //public IReadOnlyDictionary<string, string> options { get; }
46  public Int64 count_inserted;
47  public Int64 count_updated;
48  private KineticaType ktype;
49  private Utils.RecordKeyBuilder<T> primary_key_builder;
50  private Utils.RecordKeyBuilder<T> shard_key_builder;
51  private IList<int> routing_table;
52  private IList<Utils.WorkerQueue<T>> worker_queues;
53  private Random random;
54 
55 
65  public KineticaIngestor( Kinetica kdb, string table_name,
66  int batch_size, KineticaType ktype,
67  Dictionary<string, string> options = null,
68  Utils.WorkerList workers = null )
69  {
70  this.kineticaDB = kdb;
71  this.table_name = table_name;
72  this.ktype = ktype;
73 
74  // Validate and save the batch size
75  if ( batch_size < 1 )
76  throw new KineticaException( $"Batch size must be greater than one; given {batch_size}." );
77  this.batch_size = batch_size;
78 
79  // Save the options (make it read-only if it exists)
80  if ( options != null )
81  {
82  this.options = options;
83  //this.options = options.ToImmutableDictionary<string, string>();
84  }
85  else
86  {
87  this.options = null;
88  }
89 
90  // Set up the primary and shard key builders
91  // -----------------------------------------
92  this.primary_key_builder = new Utils.RecordKeyBuilder<T>( true, this.ktype );
93  this.shard_key_builder = new Utils.RecordKeyBuilder<T>( false, this.ktype );
94 
95  // Based on the Java implementation
96  if ( this.primary_key_builder.hasKey() )
97  { // There is a primary key for the given T
98  // Now check if there is a distinct shard key
99  if ( !this.shard_key_builder.hasKey()
100  || this.shard_key_builder.hasSameKey( this.primary_key_builder ) )
101  this.shard_key_builder = this.primary_key_builder; // no distinct shard key
102  }
103  else // there is no primary key for the given T
104  {
105  this.primary_key_builder = null;
106 
107  // Check if there is shard key for T
108  if ( !this.shard_key_builder.hasKey() )
109  this.shard_key_builder = null;
110  } // done setting up the key builders
111 
112 
113  // Set up the worker queues
114  // -------------------------
115  // Do we update records if there are matching primary keys in the
116  // database already?
117  bool update_on_existing_pk = ( (options != null)
118  && options.ContainsKey( InsertRecordsRequest<T>.Options.UPDATE_ON_EXISTING_PK )
119  && options[ InsertRecordsRequest<T>.Options.UPDATE_ON_EXISTING_PK ].Equals( InsertRecordsRequest<T>.Options.TRUE ) );
120  // Do T type records have a primary key?
121  bool has_primary_key = (this.primary_key_builder != null);
122  this.worker_queues = new List<Utils.WorkerQueue<T>>();
123  try
124  {
125  // If no workers are given, try to get them from Kinetica
126  if ( ( workers == null ) || ( workers.Count == 0 ) )
127  {
128  workers = new Utils.WorkerList( kdb );
129  }
130 
131  // If we end up with multiple workers, either given by the
132  // user or obtained from Kinetica, then use those
133  if ( ( workers != null ) && ( workers.Count > 0 ) )
134  {
135  // Add worker queues per worker
136  foreach ( System.Uri worker_url in workers )
137  {
138  string insert_records_worker_url_str = (worker_url.ToString() + "insert/records");
139  System.Uri url = new System.Uri( insert_records_worker_url_str );
140  Utils.WorkerQueue<T> worker_queue = new Utils.WorkerQueue<T>( url, batch_size,
141  has_primary_key,
142  update_on_existing_pk );
143  this.worker_queues.Add( worker_queue );
144  }
145 
146  // Get the worker rank information from Kinetica
147  this.routing_table = kdb.adminShowShards().rank;
148  // Check that enough worker URLs are specified
149  for ( int i = 0; i < routing_table.Count; ++i )
150  {
151  if ( this.routing_table[i] > this.worker_queues.Count )
152  throw new KineticaException( "Not enough worker URLs specified." );
153  }
154  }
155  else // multihead-ingest is NOT turned on; use the regular Kinetica IP address
156  {
157  string insert_records_url_str = ( kdb.URL.ToString() + "insert/records" );
158  System.Uri url = new System.Uri( insert_records_url_str );
159  Utils.WorkerQueue<T> worker_queue = new Utils.WorkerQueue<T>( url, batch_size, has_primary_key, update_on_existing_pk );
160  this.worker_queues.Add( worker_queue );
161  this.routing_table = null;
162  }
163  }
164  catch ( Exception ex )
165  {
166  throw new KineticaException( ex.ToString() );
167  }
168 
169  // Create the random number generator
170  this.random = new Random( (int) DateTime.Now.Ticks );
171  } // end constructor KineticaIngestor
172 
173 
179  public Int64 getCountInserted()
180  {
181  return System.Threading.Interlocked.Read( ref this.count_inserted );
182  }
183 
184 
190  public Int64 getCountUpdated()
191  {
192  return System.Threading.Interlocked.Read( ref this.count_updated );
193  }
194 
195 
205  public void flush()
206  {
207  foreach ( Utils.WorkerQueue<T> worker_queue in this.worker_queues )
208  {
209  // Flush the the queue
210  IList<T> queue = worker_queue.flush();
211  // Actually insert the records
212  flush( queue, worker_queue.url );
213  }
214  } // end public flush
215 
216 
223  private void flush( IList<T> queue, System.Uri url )
224  {
225  if ( queue.Count == 0 )
226  return; // nothing to do since the queue is empty
227 
228  try
229  {
230  // Create the /insert/records request and response objects
231  // -------------------------------------------------------
232  // Encode the records into binary
233  IList<byte[]> encoded_queue = new List<byte[]>();
234  foreach ( var record in queue ) encoded_queue.Add( this.kineticaDB.AvroEncode( record ) );
235  RawInsertRecordsRequest request = new RawInsertRecordsRequest( this.table_name, encoded_queue, this.options);
236 
238 
239  // Make the /insert/records call
240 
241  if ( url == null )
242  {
243  response = this.kineticaDB.insertRecordsRaw( request );
244  }
245  else
246  {
247  response = this.kineticaDB.SubmitRequest<InsertRecordsResponse>( url, request );
248  }
249 
250  // Save the counts of inserted and updated records
251  System.Threading.Interlocked.Add( ref this.count_inserted, response.count_inserted );
252  System.Threading.Interlocked.Add( ref this.count_updated, response.count_updated );
253  }
254  catch ( Exception ex )
255  {
256  throw new InsertException<T>( url, queue, ex.Message );
257  }
258  } // end private flush()
259 
260 
261 
272  public void insert( T record )
273  {
274  // Create the record keys
275  Utils.RecordKey primary_key = null; // used to check for uniqueness
276  Utils.RecordKey shard_key = null; // used to find which worker to send this record to
277 
278  // Build the primary key, if any
279  if ( this.primary_key_builder != null )
280  primary_key = this.primary_key_builder.build( record );
281 
282  // Build the shard/routing key, if any
283  if ( this.shard_key_builder != null )
284  shard_key = this.shard_key_builder.build( record );
285 
286  // Find out which worker to send the record to; then add the record
287  // to the approrpriate worker's record queue
288  Utils.WorkerQueue<T> worker_queue;
289  if ( this.routing_table == null )
290  { // no information regarding multiple workers, so get the first/only one
291  worker_queue = this.worker_queues[0];
292  }
293  else if ( shard_key == null )
294  { // there is no shard/routing key, so get a random worker
295  worker_queue = this.worker_queues[ random.Next( this.worker_queues.Count ) ];
296  }
297  else
298  { // Get the worker based on the sharding/routing key
299  int worker_index = shard_key.route( this.routing_table );
300  worker_queue = this.worker_queues[worker_index];
301  }
302 
303  // Insert the record into the queue
304  IList<T> queue = worker_queue.insert( record, primary_key );
305 
306  // If inserting the queue resulted in flushing the queue, then flush it
307  // properly
308  if ( queue != null )
309  {
310  this.flush( queue, worker_queue.url );
311  }
312  } // end insert( record )
313 
314 
315 
329  public void insert( IList<T> records)
330  {
331  // Insert one record at a time
332  for ( int i = 0; i < records.Count; ++i )
333  {
334  try
335  {
336  this.insert( records[ i ] );
337  }
338  catch ( InsertException<T> ex )
339  {
340  // Add the remaining records to the insertion exception
341  // record queue
342  IList<T> queue = ex.records;
343 
344  for ( int j = i + 1; j < records.Count; ++j )
345  {
346  queue.Add( records[ j ] );
347  }
348 
349  // Rethrow
350  throw ex;
351  } // end try-catch
352  } // end outer for loop
353  } // end insert( records )
354 
355 
356 
357  } // end class KineticaIngestor<T>
358 
359 
360 
361 
362 } // end namespace kinetica
A list of worker URLs to use for multi-head ingest.
Definition: WorkerList.cs:11
KineticaIngestor(Kinetica kdb, string table_name, int batch_size, KineticaType ktype, Dictionary< string, string > options=null, Utils.WorkerList workers=null)
A set of parameters for Kinetica.insertRecords{T}(string,IList{T},IDictionary{string, string}).
void insert(T record)
Queues a record for insertion into Kinetica.
Int64 getCountInserted()
Returns the count of records inserted so far.
void insert(IList< T > records)
Queues a list of records for insertion into Kientica.
void flush()
Ensures that all queued records are inserted into Kinetica.
A set of parameters for Kinetica.insertRecords{T}(string,IList{T},IDictionary{string, string}).
Int64 getCountUpdated()
Returns the count of records updated so far.
A set of results returned by Kinetica.insertRecords{T}(string,IList{T},IDictionary{string, string}).
API to talk to Kinetica Database
Definition: Kinetica.cs:40