Kinetica C# API  Version 6.2.0.1
KineticaIngestor.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 
4 
5 namespace kinetica
6 {
15  public class KineticaIngestor<T>
16  {
17  [Serializable]
19  {
20  public Uri url { get; private set; }
21  public IList<T> records { get; private set; }
22  private string message;
23 
24  public InsertException( string msg ) : base( msg ) { }
25 
26  internal InsertException( Uri url_, IList<T> records_, string msg ) : base ( msg )
27  {
28  this.message = msg;
29  this.url = url_;
30  this.records = records_;
31  }
32 
33  public override string ToString() { return "InsertException: " + message; }
34  } // end class InsertException
35 
36 
37 
38 
39  // KineticaIngestor Members:
40  // =========================
41  public Kinetica kineticaDB { get; }
42  public string table_name { get; }
43  public int batch_size { get; }
44  public IDictionary<string, string> options { get; }
45  //public IReadOnlyDictionary<string, string> options { get; }
46  public Int64 count_inserted;
47  public Int64 count_updated;
48  private KineticaType ktype;
49  private Utils.RecordKeyBuilder<T> primary_key_builder;
50  private Utils.RecordKeyBuilder<T> shard_key_builder;
51  private IList<int> routing_table;
52  private IList<Utils.WorkerQueue<T>> worker_queues;
53  private Random random;
54 
55 
65  public KineticaIngestor( Kinetica kdb, string table_name,
66  int batch_size, KineticaType ktype,
67  Dictionary<string, string> options = null,
68  Utils.WorkerList workers = null )
69  {
70  this.kineticaDB = kdb;
71  this.table_name = table_name;
72  this.ktype = ktype;
73 
74  // Validate and save the batch size
75  if ( batch_size < 1 )
76  throw new KineticaException( $"Batch size must be greater than one; given {batch_size}." );
77  this.batch_size = batch_size;
78 
79  // Save the options (make it read-only if it exists)
80  if ( options != null )
81  {
82  this.options = options;
83  //this.options = options.ToImmutableDictionary<string, string>();
84  }
85  else
86  {
87  this.options = null;
88  }
89 
90  // Set up the primary and shard key builders
91  // -----------------------------------------
92  this.primary_key_builder = new Utils.RecordKeyBuilder<T>( true, this.ktype );
93  this.shard_key_builder = new Utils.RecordKeyBuilder<T>( false, this.ktype );
94 
95  // Based on the Java implementation
96  if ( this.primary_key_builder.hasKey() )
97  { // There is a primary key for the given T
98  // Now check if there is a distinct shard key
99  if ( !this.shard_key_builder.hasKey()
100  || this.shard_key_builder.hasSameKey( this.primary_key_builder ) )
101  this.shard_key_builder = this.primary_key_builder; // no distinct shard key
102  }
103  else // there is no primary key for the given T
104  {
105  this.primary_key_builder = null;
106 
107  // Check if there is shard key for T
108  if ( !this.shard_key_builder.hasKey() )
109  this.shard_key_builder = null;
110  } // done setting up the key builders
111 
112 
113  // Set up the worker queues
114  // -------------------------
115  // Do we update records if there are matching primary keys in the
116  // database already?
117  bool update_on_existing_pk = ( (options != null)
120  // Do T type records have a primary key?
121  bool has_primary_key = (this.primary_key_builder != null);
122  this.worker_queues = new List<Utils.WorkerQueue<T>>();
123  try
124  {
125  // If no workers are given, try to get them from Kinetica
126  if ( ( workers == null ) || ( workers.Count == 0 ) )
127  {
128  workers = new Utils.WorkerList( kdb );
129  }
130 
131  // If we end up with multiple workers, either given by the
132  // user or obtained from Kinetica, then use those
133  if ( ( workers != null ) && ( workers.Count > 0 ) )
134  {
135  // Add worker queues per worker
136  foreach ( System.Uri worker_url in workers )
137  {
138  string insert_records_worker_url_str = (worker_url.ToString() + "insert/records");
139  System.Uri url = new System.Uri( insert_records_worker_url_str );
140  Utils.WorkerQueue<T> worker_queue = new Utils.WorkerQueue<T>( url, batch_size,
141  has_primary_key,
142  update_on_existing_pk );
143  this.worker_queues.Add( worker_queue );
144  }
145 
146  // Get the worker rank information from Kinetica
147  this.routing_table = kdb.adminShowShards().rank;
148  // Check that enough worker URLs are specified
149  for ( int i = 0; i < routing_table.Count; ++i )
150  {
151  if ( this.routing_table[i] > this.worker_queues.Count )
152  throw new KineticaException( "Not enough worker URLs specified." );
153  }
154  }
155  else // multihead-ingest is NOT turned on; use the regular Kinetica IP address
156  {
157  string insert_records_url_str = ( kdb.URL.ToString() + "insert/records" );
158  System.Uri url = new System.Uri( insert_records_url_str );
159  Utils.WorkerQueue<T> worker_queue = new Utils.WorkerQueue<T>( url, batch_size, has_primary_key, update_on_existing_pk );
160  this.worker_queues.Add( worker_queue );
161  this.routing_table = null;
162  }
163  }
164  catch ( Exception ex )
165  {
166  throw new KineticaException( ex.ToString() );
167  }
168 
169  // Create the random number generator
170  this.random = new Random( (int) DateTime.Now.Ticks );
171  } // end constructor KineticaIngestor
172 
173 
179  public Int64 getCountInserted()
180  {
181  return System.Threading.Interlocked.Read( ref this.count_inserted );
182  }
183 
184 
190  public Int64 getCountUpdated()
191  {
192  return System.Threading.Interlocked.Read( ref this.count_updated );
193  }
194 
195 
205  public void flush()
206  {
207  foreach ( Utils.WorkerQueue<T> worker_queue in this.worker_queues )
208  {
209  // Flush the the queue
210  IList<T> queue = worker_queue.flush();
211  // Actually insert the records
212  flush( queue, worker_queue.url );
213  }
214  } // end public flush
215 
216 
223  private void flush( IList<T> queue, System.Uri url )
224  {
225  if ( queue.Count == 0 )
226  return; // nothing to do since the queue is empty
227 
228  try
229  {
230  // Create the /insert/records request and response objects
231  // -------------------------------------------------------
232  // Encode the records into binary
233  IList<byte[]> encoded_queue = new List<byte[]>();
234  foreach ( var record in queue ) encoded_queue.Add( this.kineticaDB.AvroEncode( record ) );
235  RawInsertRecordsRequest request = new RawInsertRecordsRequest( this.table_name, encoded_queue, this.options);
236 
238 
239  // Make the /insert/records call
240 
241  if ( url == null )
242  {
243  response = this.kineticaDB.insertRecordsRaw( request );
244  }
245  else
246  {
247  response = this.kineticaDB.SubmitRequest<InsertRecordsResponse>( url, request );
248  }
249 
250  // Save the counts of inserted and updated records
251  System.Threading.Interlocked.Add( ref this.count_inserted, response.count_inserted );
252  System.Threading.Interlocked.Add( ref this.count_updated, response.count_updated );
253  }
254  catch ( Exception ex )
255  {
256  throw new InsertException<T>( url, queue, ex.Message );
257  }
258  } // end private flush()
259 
260 
261 
272  public void insert( T record )
273  {
274  // Create the record keys
275  Utils.RecordKey primary_key = null; // used to check for uniqueness
276  Utils.RecordKey shard_key = null; // used to find which worker to send this record to
277 
278  // Build the primary key, if any
279  if ( this.primary_key_builder != null )
280  primary_key = this.primary_key_builder.build( record );
281 
282  // Build the shard/routing key, if any
283  if ( this.shard_key_builder != null )
284  shard_key = this.shard_key_builder.build( record );
285 
286  // Find out which worker to send the record to; then add the record
287  // to the approrpriate worker's record queue
288  Utils.WorkerQueue<T> worker_queue;
289  if ( this.routing_table == null )
290  { // no information regarding multiple workers, so get the first/only one
291  worker_queue = this.worker_queues[0];
292  }
293  else if ( shard_key == null )
294  { // there is no shard/routing key, so get a random worker
295  worker_queue = this.worker_queues[ random.Next( this.worker_queues.Count ) ];
296  }
297  else
298  { // Get the worker based on the sharding/routing key
299  int worker_index = shard_key.route( this.routing_table );
300  worker_queue = this.worker_queues[worker_index];
301  }
302 
303  // Insert the record into the queue
304  IList<T> queue = worker_queue.insert( record, primary_key );
305 
306  // If inserting the queue resulted in flushing the queue, then flush it
307  // properly
308  if ( queue != null )
309  {
310  this.flush( queue, worker_queue.url );
311  }
312  } // end insert( record )
313 
314 
315 
329  public void insert( IList<T> records)
330  {
331  // Insert one record at a time
332  for ( int i = 0; i < records.Count; ++i )
333  {
334  try
335  {
336  this.insert( records[ i ] );
337  }
338  catch ( InsertException<T> ex )
339  {
340  // Add the remaining records to the insertion exception
341  // record queue
342  IList<T> queue = ex.records;
343 
344  for ( int j = i + 1; j < records.Count; ++j )
345  {
346  queue.Add( records[ j ] );
347  }
348 
349  // Rethrow
350  throw ex;
351  } // end try-catch
352  } // end outer for loop
353  } // end insert( records )
354 
355 
356 
357  } // end class KineticaIngestor<T>
358 
359 
360 
361 
362 } // end namespace kinetica
int count_inserted
The number of records inserted.
const string UPDATE_ON_EXISTING_PK
Specifies the record collision policy for inserting into a table with a primary key.
Int64 getCountInserted()
Returns the count of records inserted so far.
void insert(T record)
Queues a record for insertion into Kinetica.
AdminShowShardsResponse adminShowShards(AdminShowShardsRequest request_)
Show the mapping of shards to the corresponding rank and tom.
Uri URL
URL for Kinetica Server (including "http:" and port)
Definition: Kinetica.cs:87
KineticaIngestor(Kinetica kdb, string table_name, int batch_size, KineticaType ktype, Dictionary< string, string > options=null, Utils.WorkerList workers=null)
int count_updated
The number of records updated.
A set of parameters for Kinetica.insertRecords<T>(string,IList<T>,IDictionary<string, string>).
void insert(IList< T > records)
Queues a list of records for insertion into Kientica.
A set of parameters for Kinetica.insertRecords<T>(string,IList<T>,IDictionary<string, string>).
IList< int > rank
Array of ranks indexed by the shard number.
A set of results returned by Kinetica.insertRecords<T>(string,IList<T>,IDictionary<string, string>).
IDictionary< string, string > options
InsertRecordsResponse insertRecordsRaw(RawInsertRecordsRequest request_)
Adds multiple records to the specified table.
API to talk to Kinetica Database
Definition: Kinetica.cs:40
Int64 getCountUpdated()
Returns the count of records updated so far.
Manages the insertion into GPUdb of large numbers of records in bulk, with automatic batch management...
void flush()
Ensures that all queued records are inserted into Kinetica.