Kinetica C# API  Version 6.1.0.0
KineticaIngestor.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 using System.Text.RegularExpressions;
4 
5 
6 namespace kinetica
7 {
16  public class KineticaIngestor<T>
17  {
18  [Serializable]
20  {
21  public Uri url { get; private set; }
22  public IList<T> records { get; private set; }
23  private string message;
24 
25  public InsertException( string msg ) : base( msg ) { }
26 
27  internal InsertException( Uri url_, IList<T> records_, string msg ) : base ( msg )
28  {
29  this.message = msg;
30  this.url = url_;
31  this.records = records_;
32  }
33 
34  public override string ToString() { return "InsertException: " + message; }
35  } // end class InsertException
36 
37 
41  public sealed class WorkerList : List<System.Uri>
42  {
50  public WorkerList() { }
51 
65  public WorkerList( Kinetica db, Regex ip_regex = null )
66  {
67  // Get the system properties from the database server
68  IDictionary<string, string> system_properties = db.showSystemProperties().property_map;
69 
70  // Find out if multi-head ingest is turned on or not
71  string multi_head_ingestion_param;
72  system_properties.TryGetValue( ShowSystemPropertiesResponse.PropertyMap.CONF_ENABLE_WORKER_HTTP_SERVERS, out multi_head_ingestion_param );
73  if ( multi_head_ingestion_param == null )
75  bool is_multi_head_ingest_enabled = multi_head_ingestion_param.Equals( ShowSystemPropertiesResponse.PropertyMap.TRUE );
76 
77  // Nothing to do if multi-head ingestion is disabled
78  if ( !is_multi_head_ingest_enabled )
79  return;
80 
81  // Get the worker IPs and ports
82  string worker_ips_str, worker_ports_str;
83  system_properties.TryGetValue( ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_IPS, out worker_ips_str );
84  system_properties.TryGetValue( ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_PORTS, out worker_ports_str );
85 
86  // Check that we got them
87  if ( worker_ips_str.Length == 0 )
89  if ( worker_ports_str.Length == 0 )
91 
92  // Parse the IPs and the ports
93  // ---------------------------
94  // Split the strings
95  string[] worker_ip_lists = worker_ips_str.Split( ';' );
96  string[] worker_ports = worker_ports_str.Split( ';' );
97 
98  // Check that there are the same number of IPs and ports supplied
99  if ( worker_ip_lists.Length != worker_ports.Length )
100  throw new KineticaException( "Inconsistent number of values for "
102  + " and "
104  // Create the URLs using the IPs and the ports, but
105  // ignore the very first rank (rank-0)
106  for ( int i = 1; i < worker_ip_lists.Length; ++i )
107  {
108  string ip_list = worker_ip_lists[ i ];
109 
110  // Need to split each of the IP lists on a comma
111  string[] ips = ip_list.Split( ',' );
112 
113  bool matching_ip_found = false;
114 
115  // Find at least one IP to work with
116  foreach ( string ip in ips )
117  {
118  // Try to create the URL
119  try
120  {
121  // If a regular expression is given, then see if this one is a match
122  if ( ip_regex != null )
123  matching_ip_found = ip_regex.IsMatch( ip );
124  else // no regex given, so take the first IP encountered for this worker
125  matching_ip_found = true;
126 
127  if ( matching_ip_found )
128  {
129  UriBuilder uri_builder = new UriBuilder( "http", ip, Int32.Parse( worker_ports[ i ] ), "insert/records" );
130  Uri url = uri_builder.Uri;
131 
132  // Add the URL to this WorkerList
133  this.Add( url );
134  break; // don't keep trying to match IPs in this group
135  } // end inner if
136  } // end try
137  catch ( Exception ex )
138  {
139  throw new KineticaException( ex.Message );
140  }
141  } // end inner foreach
142 
143  if ( !matching_ip_found )
144  throw new KineticaException( $"No matching IP found for worker #{i}." );
145  } // end outer for
146 
147  // Check that this list is not empty
148  if ( this.Count == 0 )
149  throw new KineticaException( "No worker HTTP servers found." );
150  } // end constructor
151 
152  } // end class WorkerList
153 
154 
155 
161  private sealed class RecordKey
162  {
166  private static readonly Regex DATE_REGEX = new Regex( "\\A(\\d{4})-(\\d{2})-(\\d{2})$" );
167 
171  private static readonly Regex DATETIME_REGEX = new Regex( "\\A(?<year>\\d{4})-(?<month>\\d{2})-(?<day>\\d{2})(?<time>\\s+(?<hour>\\d{1,2}):(?<min>\\d{2}):(?<sec>\\d{2})(?:\\.(?<ms>\\d{1,6}))?)?$" );
172 
176  private static readonly Regex DECIMAL_REGEX = new Regex( "\\A\\s*(?<sign>[+-]?)((?<int>\\d+)(\\.(?<intfrac>\\d{0,4}))?|\\.(?<onlyfrac>\\d{1,4}))\\s*\\z" );
177 
181  private static readonly Regex IPV4_REGEX = new Regex( "\\A(?<a>\\d{1,3})\\.(?<b>\\d{1,3})\\.(?<c>\\d{1,3})\\.(?<d>\\d{1,3})$" );
182 
186  private static readonly Regex TIME_REGEX = new Regex( "\\A(?<hour>\\d{1,2}):(?<minute>\\d{2}):(?<seconds>\\d{2})(\\.(?<milliseconds>\\d{1,3}))?$" );
187 
191  private static readonly DateTime EPOCH_DATE = new DateTime( 1970, 1, 1 );
192 
196  private static readonly int MIN_SUPPORTED_YEAR = 1000;
197 
201  private static readonly int MAX_SUPPORTED_YEAR = 2900;
202 
206  private static readonly int YEAR_1900 = 1900;
207 
211  private static readonly TimeZoneInfo UTC = TimeZoneInfo.Utc;
212 
213  private readonly byte[] buffer;
214  private readonly int buffer_size;
215  private int current_size;
216  private int hash_code;
217  private bool is_valid;
218  private long routingHash;
219 
225  public RecordKey( int size )
226  {
227  if ( size < 1 )
228  throw new KineticaException( "Buffer size must be greater than or equal to 1. "
229  + "Size given: " + size );
230  buffer_size = size;
231  current_size = 0;
232  buffer = new byte[size];
233  this.is_valid = true;
234  }
235 
240  public bool isValid()
241  {
242  return this.is_valid;
243  }
244 
249  public int hashCode()
250  {
251  return this.hash_code;
252  }
253 
254 
255 
262  private bool isBufferFull( bool throw_if_full = true )
263  {
264  if ( this.current_size == this.buffer_size )
265  {
266  if ( throw_if_full )
267  throw new KineticaException( "The buffer is already full!" );
268  return true; // yes, the buffer is full, and we haven't thrown
269  }
270  return false; // buffer is NOT full
271  } // end isBufferFull
272 
284  private bool willBufferOverflow( int n, bool throw_if_overflow = true )
285  {
286  // Note: We're not checking for a negative value for n here
287  if ( (this.current_size + n) > this.buffer_size )
288  {
289  if ( throw_if_overflow )
290  throw new KineticaException( $"The buffer (of size {buffer_size}) does not have sufficient room in it to put {n} more byte(s) (current size is {this.current_size})." );
291  return true; // yes, the buffer WILL overflow, but we haven't thrown
292  }
293  return false; // buffer will NOT overflow
294  } // end willBufferOverflow
295 
296 
303  private void add( byte b )
304  {
305  // Add the byte to the buffer and increment the size
306  buffer.SetValue( b, current_size++ );
307  } // end add()
308 
309 
310 
315  public void addInt( int? value )
316  {
317  // Check if the given number of characters will fit in the buffer
318  this.willBufferOverflow( 4 ); // int is four bytes long
319 
320  // Handle nulls
321  if ( value == null )
322  {
323  // Add four zero bytes for the null value
324  this.add( ( byte ) 0 ); // 1st 0
325  this.add( ( byte ) 0 ); // 2nd 0
326  this.add( ( byte ) 0 ); // 3rd 0
327  this.add( ( byte ) 0 ); // 4th 0
328  return;
329  }
330 
331  // Put the integer into the array, but first convert to bytes
332  byte[] int_bytes = BitConverter.GetBytes( (int)value );
333 
334  // Add the four bytes
335  foreach ( byte b in int_bytes )
336  this.add( b );
337  } // end addInt
338 
339 
344  public void addInt8( int? value )
345  {
346  // Check if the given number of characters will fit in the buffer
347  this.willBufferOverflow( 1 ); // int8 is one byte long
348 
349  // Handle nulls
350  if ( value == null )
351  {
352  // Add one zero byte for the null value
353  this.add( ( byte ) 0 );
354  return;
355  }
356 
357  // Put the integer into the array, but first convert to byte
358  this.add( (byte)value );
359  } // end addInt8
360 
361 
366  public void addInt16( int? value )
367  {
368  // Check if the given number of characters will fit in the buffer
369  this.willBufferOverflow( 2 ); // int16 is two bytes long
370 
371  // Handle nulls
372  if ( value == null )
373  {
374  // Add two zero bytes for the null value
375  this.add( ( byte ) 0 ); // 1st 0
376  this.add( ( byte ) 0 ); // 2nd 0
377  return;
378  }
379 
380  // Put the short into the array, but first convert to bytes
381  byte[] short_bytes = BitConverter.GetBytes( (short)value );
382 
383  // Add the two bytes
384  foreach ( byte b in short_bytes )
385  this.add( b );
386  } // end addInt16
387 
388 
389 
394  public void addLong( long? value )
395  {
396  // Check if the given number of characters will fit in the buffer
397  this.willBufferOverflow( 8 ); // int is eight bytes long
398 
399  // Handle nulls
400  if ( value == null )
401  {
402  // Add four zero bytes for the null value
403  this.add( ( byte ) 0 ); // 1st 0
404  this.add( ( byte ) 0 ); // 2nd 0
405  this.add( ( byte ) 0 ); // 3rd 0
406  this.add( ( byte ) 0 ); // 4th 0
407  this.add( ( byte ) 0 ); // 5th 0
408  this.add( ( byte ) 0 ); // 6th 0
409  this.add( ( byte ) 0 ); // 7th 0
410  this.add( ( byte ) 0 ); // 8th 0
411  return;
412  }
413 
414  // Put the long into the array, but first convert to bytes
415  byte[] long_bytes = BitConverter.GetBytes( (long)value );
416 
417  // Add the eight bytes
418  foreach ( byte b in long_bytes )
419  this.add( b );
420  } // end addLong
421 
422 
427  public void addFloat( float? value )
428  {
429  // Check if the given number of characters will fit in the buffer
430  this.willBufferOverflow( 4 ); // int is four bytes long
431 
432  // Handle nulls
433  if ( value == null )
434  {
435  // Add four zero bytes for the null value
436  this.add( ( byte ) 0.0f ); // 1st 0
437  this.add( ( byte ) 0.0f ); // 2nd 0
438  this.add( ( byte ) 0.0f ); // 3rd 0
439  this.add( ( byte ) 0.0f ); // 4th 0
440  return;
441  }
442 
443  // Put the integer into the array, but first convert to bytes
444  byte[] float_bytes = BitConverter.GetBytes( (float)value );
445 
446  // Add the four bytes
447  foreach ( byte b in float_bytes )
448  this.add( b );
449  } // end addFloat
450 
451 
452 
457  public void addDouble( double? value )
458  {
459  // Check if the given number of characters will fit in the buffer
460  this.willBufferOverflow( 8 ); // int is eight bytes long
461 
462  // Handle nulls
463  if ( value == null )
464  {
465  // Add four zero bytes for the null value
466  this.add( ( byte ) 0.0 ); // 1st 0
467  this.add( ( byte ) 0.0 ); // 2nd 0
468  this.add( ( byte ) 0.0 ); // 3rd 0
469  this.add( ( byte ) 0.0 ); // 4th 0
470  this.add( ( byte ) 0.0 ); // 5th 0
471  this.add( ( byte ) 0.0 ); // 6th 0
472  this.add( ( byte ) 0.0 ); // 7th 0
473  this.add( ( byte ) 0.0 ); // 8th 0
474  return;
475  }
476 
477  // Put the integer into the array, but first convert to bytes
478  byte[] double_bytes = BitConverter.GetBytes( (double)value );
479 
480  // Add the eight bytes
481  foreach ( byte b in double_bytes )
482  this.add( b );
483  } // end addDouble
484 
485 
486 
492  public void addString( string value )
493  {
494  // Handle nulls
495  if ( value == null )
496  {
497  this.addLong( 0L );
498  return;
499  }
500 
501  // Hash the value
502  MurMurHash3.LongPair murmur = new MurMurHash3.LongPair();
503  System.Text.Encoding encoding = new System.Text.UTF8Encoding();
504  byte[] input = encoding.GetBytes( value );
505  MurMurHash3.murmurhash3_x64_128( input, 0, (uint)input.Length, 10, out murmur );
506 
507  // Add the hashed value to the buffer
508  this.addLong( murmur.val1 );
509  } // end addString
510 
511 
512 
521  public void addCharN( string value, int N )
522  {
523  // Check if the given number of characters will fit in the buffer
524  this.willBufferOverflow( N );
526  //if ( ( this.current_size + N ) > buffer_size )
527  // throw new KineticaException( $"The given {N} character(s) will not fit in the buffer (of size {buffer_size}) which has {this.current_size} bytes in it already." );
528 
529  // Handle nulls
530  if ( value == null )
531  {
532  for ( int i = 0; i < N; ++i )
533  {
534  this.add( (byte) 0 );
535  }
536  return;
537  }
538 
539  // Encode the string into bytes (using the UTF-8 encoding)
540  byte[] bytes = System.Text.Encoding.UTF8.GetBytes( value );
541  int byte_count = bytes.GetLength( 0 );
542 
543  // Truncate longer strings to the given length
544  if ( byte_count > N )
545  byte_count = N;
546 
547  // Put the characters in the byte buffer in the little endian
548  // order (which means it will be right to left)
549  // ----------------------------------------------------------
550  // First, pad with any zeroes "at the end"
551  for ( int i = N; i > byte_count; --i )
552  {
553  this.add( (byte) 0 );
554  }
555 
556  // Then, put all the characters (in reverse order)
557  for ( int i = ( byte_count - 1 ); i >= 0; --i )
558  {
559  this.add( bytes[i] );
560  }
561  } // end addCharN()
562 
563 
570  public void addDate( string value )
571  {
572  // Check and throw if the buffer is already full
573  this.isBufferFull( true );
574 
575  // Handle nulls
576  if ( value == null )
577  {
578  this.addInt( 0 );
579  return;
580  }
581 
582  // Check that the given value matches the YYYY-MM-DD pattern
583  Match match = DATE_REGEX.Match( value );
584  if ( !match.Success )
585  {
586  // No match, so the key is invalid
587  this.is_valid = false;
588  this.addInt( 0 );
589  return;
590  }
591 
592  // We'll need to parse the string into year, month, and day
593  int year, month, day;
594  DateTime date;
595  System.Globalization.GregorianCalendar calendar = new System.Globalization.GregorianCalendar();
596 
597  // Parse the string value
598  try
599  {
600  year = int.Parse( match.Groups[ 1 ].ToString() );
601  month = int.Parse( match.Groups[ 2 ].ToString() );
602  day = int.Parse( match.Groups[ 3 ].ToString() );
603  date = new DateTime( year, month, day, calendar );
604  }
605  catch ( Exception ex )
606  {
607  // Upon any error, set this key to be invalid
608  this.addInt( 0 );
609  this.is_valid = false;
610  return;
611  }
612 
613  // Kinetica does not support years outside the range [1000, 2900]
614  if ( ( year < MIN_SUPPORTED_YEAR ) || ( year > MAX_SUPPORTED_YEAR ) )
615  {
616  this.addInt( 0 );
617  this.is_valid = false;
618  return;
619  }
620 
621  int fixed_day_of_week = ( ( int ) calendar.GetDayOfWeek( date ) + 1 );
622 
623  // Deduce the integer representing the date
624  int date_integer = ( ((year - YEAR_1900) << 21)
625  | (month << 17)
626  | (day << 12)
627  | (calendar.GetDayOfYear( date ) << 3)
628  | fixed_day_of_week );
629  this.addInt( date_integer );
630  } // end addDate()
631 
632 
639  public void addDateTime( string value )
640  {
641  // Check and throw if the buffer is already full
642  this.isBufferFull( true );
643 
644  // Handle nulls
645  if ( value == null )
646  {
647  this.addLong( 0 );
648  return;
649  }
650 
651  // Check that the given value matches the YYYY-MM-DD HH:MM:SS.mmm pattern
652  Match match = DATETIME_REGEX.Match( value );
653  if ( !match.Success )
654  {
655  // No match, so the key is invalid
656  this.is_valid = false;
657  this.addLong( 0 );
658  return;
659  }
660 
661  // We'll need to parse the string into year, month, day, hour,
662  // minute, second, and millisecond
663  int year, month, day;
664  int hour = 0;
665  int minute = 0;
666  int second = 0;
667  int msecond = 0;
668  DateTime date;
669  System.Globalization.GregorianCalendar calendar = new System.Globalization.GregorianCalendar();
670 
671  // Parse the string value
672  try
673  {
674  year = int.Parse( match.Groups[ "year" ].Value );
675  month = int.Parse( match.Groups[ "month" ].Value );
676  day = int.Parse( match.Groups[ "day" ].Value );
677 
678  // Handle the optional time part
679  Group time_group = match.Groups[ "time" ];
680  if ( time_group.Success )
681  {
682  hour = int.Parse( match.Groups["hour"].Value );
683  minute = int.Parse( match.Groups["min"].Value );
684  second = int.Parse( match.Groups["sec"].Value );
685 
686  // Handle the further optional milliseconds
687  Group ms_group = match.Groups["ms"];
688  if (ms_group.Success)
689  {
690  msecond = int.Parse(match.Groups["ms"].Value);
691  // Need to have the milliseconds be milliseconds (three digits)
692  switch (ms_group.Value.Length)
693  {
694  case 1:
695  msecond *= 100; break;
696  case 2:
697  msecond *= 10; break;
698  // No need for case 3
699  case 4:
700  msecond /= 10; break;
701  case 5:
702  msecond /= 100; break;
703  case 6:
704  msecond /= 1000; break;
705  }
706  }
707  } // end parsing the time component
708 
709  // Now put it all together
710  date = new DateTime( year, month, day, hour, minute, second, msecond, calendar );
711  }
712  catch ( Exception ex )
713  {
714  // Upon any error, set this key to be invalid
715  this.addLong( 0 );
716  this.is_valid = false;
717  return;
718  }
719 
720  // Kinetica does not support years outside the range [1000, 2900]
721  if ( ( year < MIN_SUPPORTED_YEAR ) || ( year > MAX_SUPPORTED_YEAR ) )
722  {
723  this.addLong( 0 );
724  this.is_valid = false;
725  return;
726  }
727 
728  int fixed_day_of_week = ( ( int ) calendar.GetDayOfWeek( date ) + 1 );
729 
730  // Deduce the integer representing the date
731  long datetime_long = (long) ( (((long)(year - YEAR_1900)) << 53)
732  | (((long)month) << 49)
733  | (((long)day) << 44)
734  | (((long)hour) << 39)
735  | (((long)minute) << 33)
736  | (((long)second) << 27)
737  | (((long)msecond) << 17)
738  | (((long)calendar.GetDayOfYear( date )) << 8)
739  | (((long)fixed_day_of_week) << 5) );
740  this.addLong( datetime_long );
741  } // end addDateTime()
742 
743 
750  public void addDecimal( string value )
751  {
752  // Check and throw if the buffer is already full
753  this.isBufferFull( true );
754 
755  // Handle nulls
756  if ( value == null )
757  {
758  this.addLong( 0L );
759  return;
760  }
761 
762  // Check that the given value matches the decimal regular expression pattern
763  Match match = DECIMAL_REGEX.Match( value );
764  if ( !match.Success )
765  {
766  // No match, so the key is invalid
767  this.is_valid = false;
768  this.addLong( 0L );
769  return;
770  }
771 
772  // Parse the string value
773  long decimal_value;
774  try
775  {
776  // Extract the integral and fractional parts
777  Group integral_group = match.Groups[ "int" ];
778  Group fraction_with_integral_group = match.Groups[ "intfrac" ];
779  Group frac_only_group = match.Groups[ "onlyfrac" ];
780 
781  if ( integral_group.Success )
782  { // Has an integral part to the decimal
783  decimal_value = long.Parse( integral_group.Value );
784 
785  if ( fraction_with_integral_group.Success )
786  { // Also have a fractional part
787  long fraction = 0;
788  // The fraction could be zero in length (i.e. the string ends with the decimal point)
789  if (fraction_with_integral_group.Value.Length > 0 )
790  fraction = long.Parse( fraction_with_integral_group.Value );
791 
792  // We need to shift the integral part to the left appropriately
793  // before adding the fraction
794  long integral_part = decimal_value * (long)Math.Pow(10, fraction_with_integral_group.Value.Length );
795  decimal_value = integral_part + fraction;
796 
797  // Shift it further to the left if the fraction is less than 1000
798  switch ( fraction_with_integral_group.Value.Length )
799  {
800  case 1:
801  decimal_value *= 1000; break;
802  case 2:
803  decimal_value *= 100; break;
804  case 3:
805  decimal_value *= 10; break;
806  }
807  }
808  }
809  else if ( frac_only_group.Success )
810  { // Only the fractional part is given
811  decimal_value = long.Parse( frac_only_group.Value );
812 
813  // Adjust the value so that it is always four digits long
814  switch ( frac_only_group.Value.Length )
815  {
816  case 1:
817  decimal_value *= 1000; break;
818  case 2:
819  decimal_value *= 100; break;
820  case 3:
821  decimal_value *= 10; break;
822  }
823  }
824  else
825  throw new KineticaException( "No match for decimal!" );
826 
827  // Now handle the sign
828  Group sign_group = match.Groups[ "sign" ];
829  if ( sign_group.Success )
830  { // Needs action only if negative
831  if ( sign_group.Value == "-" )
832  decimal_value = ( -1 ) * decimal_value;
833  }
834  }
835  catch ( Exception ex )
836  {
837  // Upon any error, set this key to be invalid
838  this.addLong( 0L );
839  this.is_valid = false;
840  return;
841  }
842 
843  // Deduce the integer representing the date
844  this.addLong( decimal_value );
845  } // end addDecimal()
846 
847 
854  public void addIPv4( string value )
855  {
856  // Check and throw if the buffer is already full
857  this.isBufferFull( true );
858 
859  // Handle nulls
860  if ( value == null )
861  {
862  this.addInt( 0 );
863  return;
864  }
865 
866  // Check that the given value matches the XXX.XXX.XXX.XXX pattern
867  Match match = IPV4_REGEX.Match( value );
868  if ( !match.Success )
869  {
870  // No match, so the key is invalid
871  this.is_valid = false;
872  this.addInt( 0 );
873  return;
874  }
875 
876  // We'll need to parse the string into four integers
877  int a, b, c, d;
878 
879  // Parse the string value
880  try
881  {
882  a = int.Parse( match.Groups[ "a" ].Value );
883  b = int.Parse( match.Groups[ "b" ].Value );
884  c = int.Parse( match.Groups[ "c" ].Value );
885  d = int.Parse( match.Groups[ "d" ].Value );
886  }
887  catch ( Exception ex )
888  {
889  // Upon any error, set this key to be invalid
890  this.addInt( 0 );
891  this.is_valid = false;
892  return;
893  }
894 
895  // Each byte has to be within the range [0, 255] (the regex does
896  // not support negative numbers, so no worries about those)
897  if ( ( a > 255 ) || ( b > 255 ) || ( c > 255 ) || ( d > 255 ) )
898  {
899  this.addInt( 0 );
900  this.is_valid = false;
901  return;
902  }
903 
904  // Deduce the integer representing the date
905  int ipv4_integer = ( (a << 24) | (b << 16) | (c << 8) | d );
906  this.addInt( ipv4_integer );
907  } // end addIPv4()
908 
909 
917  public void addTime( string value )
918  {
919  // Check and throw if the buffer is already full
920  this.isBufferFull( true );
921 
922  // Handle nulls
923  if ( value == null )
924  {
925  this.addInt( 0 );
926  return;
927  }
928 
929  // Check that the given value matches the HH:MM:SS[.mmm] pattern
930  Match match = TIME_REGEX.Match( value );
931  if ( !match.Success )
932  {
933  // No match, so the key is invalid
934  this.is_valid = false;
935  this.addInt( 0 );
936  return;
937  }
938 
939  // We'll need to parse the string into four integers
940  uint hour, minute, second, milliseconds;
941 
942  // Parse the string value
943  try
944  {
945  hour = uint.Parse( match.Groups["hour"].Value );
946  minute = uint.Parse( match.Groups["minute"].Value );
947  second = uint.Parse( match.Groups["seconds"].Value );
948  Group msec_group = match.Groups["milliseconds"];
949 
950  // Milliseconds are optional
951  milliseconds = 0;
952  if (msec_group.Success)
953  {
954  milliseconds = uint.Parse(msec_group.Value);
955 
956  // Handle single and double digits for milliseconds
957  switch ( msec_group.Value.Length )
958  {
959  case 1:
960  milliseconds *= 100; break;
961  case 2:
962  milliseconds *= 10; break;
963  }
964  }
965  }
966  catch ( Exception ex )
967  {
968  // Upon any error, set this key to be invalid
969  this.addInt( 0 );
970  this.is_valid = false;
971  return;
972  }
973 
974  // Validate the hour, minute, second values
975  if ( ( hour > 23 ) || ( minute > 59 ) || ( second > 59 ) )
976  {
977  this.addInt( 0 );
978  this.is_valid = false;
979  return;
980  }
981 
982  // Deduce the integer representing the time
983  int time_integer = (int)( (hour << 26) | (minute << 20) | (second << 14) | (milliseconds << 4) );
984  this.addInt( time_integer );
985  } // end addTime()
986 
987 
992  public void addTimeStamp( long? value )
993  {
994  // Handle nulls
995  if ( value == null )
996  {
997  this.addLong( 0 );
998  return;
999  }
1000 
1001  // Encode the timestamp the way the database server does it
1002  DateTime time = EPOCH_DATE.AddMilliseconds( (double) value );
1003  long fixed_day_of_week = ( ( long ) time.DayOfWeek + 1 );
1004 
1005  long timestamp = (long) ( (((long)(time.Year - YEAR_1900)) << 53)
1006  | (((long)(time.Month)) << 49)
1007  | (((long)time.Day) << 44)
1008  | (((long)time.Hour) << 39)
1009  | (((long)time.Minute) << 33)
1010  | (((long)time.Second) << 27)
1011  | (((long)time.Millisecond) << 17)
1012  | (((long)time.DayOfYear) << 8)
1013  | ( fixed_day_of_week << 5) );
1014  this.addLong( timestamp );
1015  } // end addTimeStamp()
1016 
1017 
1018 
1025  public void computHashes()
1026  {
1027  // Check all the values for the key have been added
1028  if ( this.current_size != this.buffer_size )
1029  throw new KineticaException( "The RecordKey buffer is not full; check that all the relevant values have been added." );
1030 
1031  // Hash the value
1032  MurMurHash3.LongPair murmur = new MurMurHash3.LongPair();
1033  MurMurHash3.murmurhash3_x64_128( this.buffer, 0, ( uint ) this.buffer_size, 10, out murmur );
1034 
1035  // Save the hash value
1036  this.routingHash = murmur.val1;
1037  this.hash_code = ( int ) ( this.routingHash ^ ((this.routingHash >> 32) & 0x0000ffffL));
1038  } // end computHashes
1039 
1040 
1041 
1048  public int route( IList<int> routingTable )
1049  {
1050  // Return 1 less than the value of the nth element of routingTable where
1051  // n == (record key hash) % (number of elements in routingTable)
1052  // (because the 1st worker rank is the 0th element in the worker list)
1053  return (routingTable[ Math.Abs( ( int ) ( this.routingHash % routingTable.Count ) ) ] - 1);
1054  } // end route
1055 
1056  } // end class RecordKey
1057 
1058 
1063  private sealed class RecordKeyBuilder<T>
1064  {
1068  private enum ColumnType
1069  {
1070  CHAR1,
1071  CHAR2,
1072  CHAR4,
1073  CHAR8,
1074  CHAR16,
1075  CHAR32,
1076  CHAR64,
1077  CHAR128,
1078  CHAR256,
1079  DATE,
1080  DATETIME,
1081  DECIMAL,
1082  DOUBLE,
1083  FLOAT,
1084  INT,
1085  INT8,
1086  INT16,
1087  IPV4,
1088  LONG,
1089  STRING,
1090  TIME,
1091  TIMESTAMP
1092  } // end enum ColumnType
1093 
1094 
1095  // Class members
1096  private KineticaType ktype;
1097  private IList<int> routing_column_indices;
1098  private IList<ColumnType> column_types;
1099  private int buffer_size;
1100 
1101  public RecordKeyBuilder( bool is_primary_key, KineticaType ktype )
1102  {
1103  this.ktype = ktype;
1104 
1105  this.buffer_size = 0;
1106  routing_column_indices = new List<int>();
1107  column_types = new List<ColumnType>();
1108 
1109  // We need to check if the type has all of the following: x, y, timestamp, track ID
1110  // (this will tell us if it's a track type table, and if so, the track ID
1111  // column would be a routing column)
1112  bool has_timestamp = false;
1113  bool has_x = false;
1114  bool has_y = false;
1115  int track_id_column_idx = -1; // not found yet
1116 
1117  // Add indices of any primary or shard key (based on is_primary_key)
1118  // to the list of routing columns
1119  IList<KineticaType.Column> columns = ktype.getColumns();
1120  for ( int i = 0; i < columns.Count; ++i )
1121  {
1122  // Get the column
1123  KineticaType.Column column = columns[ i ];
1124 
1125  // Check if it is one of: x, y, timestamp, track ID
1126  switch ( column.getName() )
1127  {
1128  case "TRACKID":
1129  track_id_column_idx = i;
1130  break;
1131 
1132  case "TIMESTAMP":
1133  has_timestamp = true;
1134  break;
1135 
1136  case "x":
1137  has_x = true;
1138  break;
1139 
1140  case "y":
1141  has_y = true;
1142  break;
1143  } // end switch on column name
1144 
1145  // Check if this column has been declared as a primary/shard key
1146  // And if so, and if appropriate, add it to the routing key column list
1147  if ( is_primary_key && column.getProperties().Contains( ColumnProperty.PRIMARY_KEY ) )
1148  {
1149  routing_column_indices.Add( i );
1150  }
1151  else if ( !is_primary_key && column.getProperties().Contains( ColumnProperty.SHARD_KEY ) )
1152  {
1153  routing_column_indices.Add( i );
1154  }
1155  } // end for loop
1156 
1157  // Check if this is a track-type table; if so, add the track ID column's index to the list
1158  if ( !is_primary_key
1159  && has_timestamp && has_x && has_y && ( track_id_column_idx != -1 ) )
1160  {
1161  if ( routing_column_indices.Count == 0 )
1162  {
1163  routing_column_indices.Add( track_id_column_idx );
1164  }
1165  else if ( ( routing_column_indices.Count != 1 )
1166  || ( routing_column_indices[0] != track_id_column_idx ) )
1167  {
1168  // Track type tables can't have any other routing key
1169  throw new KineticaException( "Cannot have a shard key other than 'TRACKID' for track tables." );
1170  }
1171  } // end if a track type table
1172 
1173 
1174  // For each index of routing columns, save the column type, and increase
1175  // the buffer size appropriately
1176  foreach ( int i in routing_column_indices )
1177  {
1178  // Get the column information
1179  KineticaType.Column column = columns[ i ];
1180 
1181  switch ( column.getType() )
1182  {
1183  // Float and double are the simplest
1184  case KineticaType.Column.ColumnType.FLOAT:
1185  {
1186  column_types.Add( ColumnType.FLOAT );
1187  this.buffer_size += 4;
1188  break;
1189  }
1190  case KineticaType.Column.ColumnType.DOUBLE:
1191  {
1192  column_types.Add( ColumnType.DOUBLE );
1193  this.buffer_size += 8;
1194  break;
1195  }
1196 
1197  case KineticaType.Column.ColumnType.INT:
1198  {
1199  // Integer has byte, short and int
1200  if ( column.getProperties().Contains( ColumnProperty.INT8 ) )
1201  { // byte
1202  column_types.Add( ColumnType.INT8 );
1203  this.buffer_size += 1;
1204  }
1205  else if ( column.getProperties().Contains( ColumnProperty.INT16 ) )
1206  { // short
1207  column_types.Add( ColumnType.INT16 );
1208  this.buffer_size += 2;
1209  }
1210  else // regular 4-byte integer
1211  {
1212  column_types.Add( ColumnType.INT );
1213  this.buffer_size += 4;
1214  }
1215  break;
1216  } // end case integer
1217 
1218  case KineticaType.Column.ColumnType.LONG:
1219  {
1220  // Long has the regular long and timestamp
1221  if ( column.getProperties().Contains( ColumnProperty.TIMESTAMP ) )
1222  { // it's a timestamp
1223  column_types.Add( ColumnType.TIMESTAMP );
1224  }
1225  else // regular long
1226  {
1227  column_types.Add( ColumnType.LONG );
1228  }
1229  this.buffer_size += 8;
1230  break;
1231  } // end case long
1232 
1233  case KineticaType.Column.ColumnType.STRING:
1234  {
1235  if ( column.getProperties().Contains( ColumnProperty.CHAR1 ) )
1236  {
1237  column_types.Add( ColumnType.CHAR1 );
1238  this.buffer_size += 1;
1239  }
1240  else if ( column.getProperties().Contains( ColumnProperty.CHAR2 ) )
1241  {
1242  column_types.Add( ColumnType.CHAR2 );
1243  this.buffer_size += 2;
1244  }
1245  else if ( column.getProperties().Contains( ColumnProperty.CHAR4 ) )
1246  {
1247  column_types.Add( ColumnType.CHAR4 );
1248  this.buffer_size += 4;
1249  }
1250  else if ( column.getProperties().Contains( ColumnProperty.CHAR8 ) )
1251  {
1252  column_types.Add( ColumnType.CHAR8 );
1253  this.buffer_size += 8;
1254  }
1255  else if ( column.getProperties().Contains( ColumnProperty.CHAR16 ) )
1256  {
1257  column_types.Add( ColumnType.CHAR16 );
1258  this.buffer_size += 16;
1259  }
1260  else if ( column.getProperties().Contains( ColumnProperty.CHAR32 ) )
1261  {
1262  column_types.Add( ColumnType.CHAR32 );
1263  this.buffer_size += 32;
1264  }
1265  else if ( column.getProperties().Contains( ColumnProperty.CHAR64 ) )
1266  {
1267  column_types.Add( ColumnType.CHAR64 );
1268  this.buffer_size += 64;
1269  }
1270  else if ( column.getProperties().Contains( ColumnProperty.CHAR128 ) )
1271  {
1272  column_types.Add( ColumnType.CHAR128 );
1273  this.buffer_size += 128;
1274  }
1275  else if ( column.getProperties().Contains( ColumnProperty.CHAR256 ) )
1276  {
1277  column_types.Add( ColumnType.CHAR256 );
1278  this.buffer_size += 256;
1279  }
1280  else if ( column.getProperties().Contains( ColumnProperty.DATE ) )
1281  {
1282  column_types.Add( ColumnType.DATE );
1283  this.buffer_size += 4;
1284  }
1285  else if ( column.getProperties().Contains( ColumnProperty.DATETIME ) )
1286  {
1287  column_types.Add( ColumnType.DATETIME );
1288  this.buffer_size += 8;
1289  }
1290  else if ( column.getProperties().Contains( ColumnProperty.DECIMAL ) )
1291  {
1292  column_types.Add( ColumnType.DECIMAL );
1293  this.buffer_size += 8;
1294  }
1295  else if ( column.getProperties().Contains( ColumnProperty.IPV4 ) )
1296  {
1297  column_types.Add( ColumnType.IPV4 );
1298  this.buffer_size += 4;
1299  }
1300  else if ( column.getProperties().Contains( ColumnProperty.TIME ) )
1301  {
1302  column_types.Add( ColumnType.TIME );
1303  this.buffer_size += 4;
1304  }
1305  else // regular string
1306  {
1307  column_types.Add( ColumnType.STRING );
1308  this.buffer_size += 8;
1309  }
1310  break;
1311  } // end case string
1312 
1313  // Other types are not allowed for routing columns
1314  case KineticaType.Column.ColumnType.BYTES:
1315  case KineticaType.Column.ColumnType.DEFAULT:
1316  throw new KineticaException( $"Cannot use column '{column.getName()}' as a key." );
1317  } // end switch on the column's primitive data type
1318  } // end foreach
1319  } // end constructor RecordKeyBuilder
1320 
1321 
1329  public RecordKey build( T record )
1330  {
1331  // Can't build a key if the buffer size is zero!
1332  if ( this.buffer_size == 0 )
1333  return null;
1334 
1335  // Create the empty key
1336  RecordKey key = new RecordKey( this.buffer_size );
1337 
1338  // Add each routing column's value to the key
1339  for ( int i = 0; i < this.routing_column_indices.Count; ++i )
1340  {
1341  // Get the column (with type and name)
1342  KineticaType.Column column = this.ktype.getColumns()[ this.routing_column_indices[ i ] ];
1343 
1344  // Get the value out of the record using the column's name and reflection
1345  var value = record.GetType().GetProperty( column.getName() ).GetValue( record, null );
1346 
1347  switch ( this.column_types[i] )
1348  {
1349  case ColumnType.CHAR1:
1350  key.addCharN( (string) value, 1 );
1351  break;
1352 
1353  case ColumnType.CHAR2:
1354  key.addCharN( ( string ) value, 2 );
1355  break;
1356 
1357  case ColumnType.CHAR4:
1358  key.addCharN( ( string ) value, 4 );
1359  break;
1360 
1361  case ColumnType.CHAR8:
1362  key.addCharN( ( string ) value, 8 );
1363  break;
1364 
1365  case ColumnType.CHAR16:
1366  key.addCharN( ( string ) value, 16 );
1367  break;
1368 
1369  case ColumnType.CHAR32:
1370  key.addCharN( ( string ) value, 32 );
1371  break;
1372 
1373  case ColumnType.CHAR64:
1374  key.addCharN( ( string ) value, 64 );
1375  break;
1376 
1377  case ColumnType.CHAR128:
1378  key.addCharN( ( string ) value, 128 );
1379  break;
1380 
1381  case ColumnType.CHAR256:
1382  key.addCharN( ( string ) value, 256 );
1383  break;
1384 
1385  case ColumnType.DATE:
1386  key.addDate( (string) value );
1387  break;
1388 
1389  case ColumnType.DATETIME:
1390  key.addDateTime( ( string ) value );
1391  break;
1392 
1393  case ColumnType.DECIMAL:
1394  key.addDecimal( (string) value );
1395  break;
1396 
1397  case ColumnType.DOUBLE:
1398  key.addDouble( ( double? ) value );
1399  break;
1400 
1401  case ColumnType.FLOAT:
1402  key.addFloat( ( float? ) value );
1403  break;
1404 
1405  case ColumnType.INT:
1406  key.addInt( ( int? ) value );
1407  break;
1408 
1409  case ColumnType.INT8:
1410  key.addInt8( ( int? ) value );
1411  break;
1412 
1413  case ColumnType.INT16:
1414  key.addInt16( ( int? ) value );
1415  break;
1416 
1417  case ColumnType.IPV4:
1418  key.addIPv4( ( string ) value );
1419  break;
1420 
1421  case ColumnType.LONG:
1422  key.addLong( ( long? ) value );
1423  break;
1424 
1425  case ColumnType.STRING:
1426  key.addString( ( string ) value );
1427  break;
1428 
1429  case ColumnType.TIME:
1430  key.addTime( ( string ) value );
1431  break;
1432 
1433  case ColumnType.TIMESTAMP:
1434  key.addTimeStamp( ( long? ) value );
1435  break;
1436  } // end switch
1437  } // end for loop
1438 
1439  // Compute the hash for the key and return it
1440  key.computHashes();
1441  return key;
1442  } // end build()
1443 
1444 
1445 
1451  public bool hasKey()
1452  {
1453  // Does it have any routing columns?
1454  return !(this.routing_column_indices.Count == 0);
1455  }
1456 
1457 
1463  public bool hasSameKey( RecordKeyBuilder<T> other)
1464  {
1465  return this.column_types.Equals( other.column_types );
1466  }
1467 
1468  } // end class RecordKeyBuilder
1469 
1470 
1471 
1472  private sealed class WorkerQueue<T>
1473  {
1474  public System.Uri url { get; private set; }
1475  private int capacity;
1476  private bool has_primary_key;
1477  private bool update_on_existing_pk;
1478  private IList<T> queue;
1479  private Dictionary<RecordKey, int> primary_key_map;
1480 
1481 
1489  public WorkerQueue( System.Uri url, int capacity, bool has_primary_key, bool update_on_existing_pk )
1490  {
1491  this.url = url;
1492  this.capacity = capacity;
1493  this.has_primary_key = has_primary_key;
1494  this.update_on_existing_pk = update_on_existing_pk;
1495 
1496  queue = new List<T>();
1497 
1498  // If the type has primary keys, then initialize with a
1499  // capacity of 75% of the final capacity
1500  if ( this.has_primary_key )
1501  primary_key_map = new Dictionary<RecordKey, int>( (int)Math.Round( this.capacity/0.75 ) );
1502  } // end constructor WorkerQueue<T>
1503 
1504 
1505 
1510  public IList<T> flush()
1511  {
1512  IList<T> old_queue = this.queue;
1513  queue = new List<T>( this.capacity );
1514 
1515  // Clear the primary key map if one exists
1516  if ( this.primary_key_map != null )
1517  this.primary_key_map.Clear();
1518 
1519  return old_queue;
1520  } // end flush
1521 
1522 
1523 
1531  public IList<T> insert( T record, RecordKey key )
1532  {
1533  if ( this.has_primary_key && key.isValid() )
1534  {
1535  // We are to update the record even if the primary key already exists
1536  if ( this.update_on_existing_pk )
1537  {
1538  int key_idx;
1539 
1540  if ( this.primary_key_map.TryGetValue( key, out key_idx ) )
1541  {
1542  // Key exists, so we need to replace the associated record
1543  this.queue[key_idx] = record;
1544  }
1545  else // key does not exist; add the record and
1546  { // update the key->record mapping
1547  this.queue.Add( record );
1548  this.primary_key_map.Add( key, ( this.queue.Count - 1 ) );
1549  }
1550  }
1551  else // do NOT update/add the record if the key already exists
1552  {
1553  if ( this.primary_key_map.ContainsKey( key ) )
1554  return null; // yup, the key already exists
1555 
1556  // The key does not exist, so add the record and
1557  // update the key->record map
1558  this.queue.Add( record );
1559  this.primary_key_map.Add( key, ( this.queue.Count - 1 ) );
1560  }
1561  }
1562  else // simply add the record
1563  {
1564  queue.Add( record );
1565  }
1566 
1567  // If the queue is full, then flush and return the 'old' queue
1568  if ( queue.Count == capacity )
1569  return flush();
1570  else // no records to return
1571  return null;
1572  } // end insert
1573  } // end class WorkerQueue
1574 
1575 
1576 
1577 
1578  // KineticaIngestor Members:
1579  // =========================
1580  public Kinetica kineticaDB { get; }
1581  public string table_name { get; }
1582  public int batch_size { get; }
1583  public IDictionary<string, string> options { get; }
1584  //public IReadOnlyDictionary<string, string> options { get; }
1585  public Int64 count_inserted;
1586  public Int64 count_updated;
1587  private KineticaType ktype;
1588  private RecordKeyBuilder<T> primary_key_builder;
1589  private RecordKeyBuilder<T> shard_key_builder;
1590  private IList<int> routing_table;
1591  private IList<WorkerQueue<T>> worker_queues;
1592  private Random random;
1593 
1594 
1604  public KineticaIngestor( Kinetica kdb, string table_name,
1605  int batch_size, KineticaType ktype,
1606  Dictionary<string, string> options = null,
1607  WorkerList workers = null )
1608  {
1609  this.kineticaDB = kdb;
1610  this.table_name = table_name;
1611  this.ktype = ktype;
1612 
1613  // Validate and save the batch size
1614  if ( batch_size < 1 )
1615  throw new KineticaException( $"Batch size must be greater than one; given {batch_size}." );
1616  this.batch_size = batch_size;
1617 
1618  // Save the options (make it read-only if it exists)
1619  if ( options != null )
1620  {
1621  this.options = options;
1622  //this.options = options.ToImmutableDictionary<string, string>();
1623  }
1624  else
1625  {
1626  this.options = null;
1627  }
1628 
1629  // Set up the primary and shard key builders
1630  // -----------------------------------------
1631  this.primary_key_builder = new RecordKeyBuilder<T>( true, this.ktype );
1632  this.shard_key_builder = new RecordKeyBuilder<T>( false, this.ktype );
1633 
1634  // Based on the Java implementation
1635  if ( this.primary_key_builder.hasKey() )
1636  { // There is a primary key for the given T
1637  // Now check if there is a distinct shard key
1638  if ( !this.shard_key_builder.hasKey()
1639  || this.shard_key_builder.hasSameKey( this.primary_key_builder ) )
1640  this.shard_key_builder = this.primary_key_builder; // no distinct shard key
1641  }
1642  else // there is no primary key for the given T
1643  {
1644  this.primary_key_builder = null;
1645 
1646  // Check if there is shard key for T
1647  if ( !this.shard_key_builder.hasKey() )
1648  this.shard_key_builder = null;
1649  } // done setting up the key builders
1650 
1651 
1652  // Set up the worker queues
1653  // -------------------------
1654  // Do we update records if there are matching primary keys in the
1655  // database already?
1656  bool update_on_existing_pk = ( (options != null)
1659  // Do T type records have a primary key?
1660  bool has_primary_key = (this.primary_key_builder != null);
1661  this.worker_queues = new List<WorkerQueue<T>>();
1662  try
1663  {
1664  // If no workers are given, try to get them from Kinetica
1665  if ( ( workers == null ) || ( workers.Count == 0 ) )
1666  {
1667  workers = new WorkerList( kdb );
1668  }
1669 
1670  // If we end up with multiple workers, either given by the
1671  // user or obtained from Kinetica, then use those
1672  if ( ( workers != null ) && ( workers.Count > 0 ) )
1673  {
1674  // Add worker queues per worker
1675  foreach ( System.Uri worker_url in workers )
1676  {
1677  WorkerQueue<T> worker_queue = new WorkerQueue<T>( worker_url, batch_size, has_primary_key, update_on_existing_pk );
1678  this.worker_queues.Add( worker_queue );
1679  }
1680 
1681  // Get the worker rank information from Kinetica
1682  this.routing_table = kdb.adminShowShards().rank;
1683  // Check that enough worker URLs are specified
1684  for ( int i = 0; i < routing_table.Count; ++i )
1685  {
1686  if ( this.routing_table[i] > this.worker_queues.Count )
1687  throw new KineticaException( "Not enough worker URLs specified." );
1688  }
1689  }
1690  else // multihead-ingest is NOT turned on; use the regular Kinetica IP address
1691  {
1692  System.Uri url = new System.Uri( kdb.Url + "/insert/records" );
1693  WorkerQueue<T> worker_queue = new WorkerQueue<T>( url, batch_size, has_primary_key, update_on_existing_pk );
1694  this.worker_queues.Add( worker_queue );
1695  this.routing_table = null;
1696  }
1697  }
1698  catch ( Exception ex )
1699  {
1700  throw new KineticaException( ex.ToString() );
1701  }
1702 
1703  // Create the random number generator
1704  this.random = new Random( (int) DateTime.Now.Ticks );
1705  } // end constructor KineticaIngestor
1706 
1707 
1713  public Int64 getCountInserted()
1714  {
1715  return System.Threading.Interlocked.Read( ref this.count_inserted );
1716  }
1717 
1718 
1724  public Int64 getCountUpdated()
1725  {
1726  return System.Threading.Interlocked.Read( ref this.count_updated );
1727  }
1728 
1729 
1739  public void flush()
1740  {
1741  foreach ( WorkerQueue<T> worker_queue in this.worker_queues )
1742  {
1743  // Flush the the queue
1744  IList<T> queue = worker_queue.flush();
1745  // Actually insert the records
1746  flush( queue, worker_queue.url );
1747  }
1748  } // end public flush
1749 
1750 
1757  private void flush( IList<T> queue, System.Uri url )
1758  {
1759  if ( queue.Count == 0 )
1760  return; // nothing to do since the queue is empty
1761 
1762  try
1763  {
1764  // Create the /insert/records request and response objects
1765  // -------------------------------------------------------
1766  // Encode the records into binary
1767  IList<byte[]> encoded_queue = new List<byte[]>();
1768  foreach ( var record in queue ) encoded_queue.Add( this.kineticaDB.AvroEncode( record ) );
1769  RawInsertRecordsRequest request = new RawInsertRecordsRequest( this.table_name, encoded_queue, this.options);
1770 
1772 
1773  // Make the /insert/records call
1774  if ( url == null )
1775  {
1776  response = this.kineticaDB.insertRecordsRaw( request );
1777  }
1778  else
1779  {
1780  response = this.kineticaDB.SubmitRequest<InsertRecordsResponse>( url, request );
1781  }
1782 
1783  // Save the counts of inserted and updated records
1784  System.Threading.Interlocked.Add( ref this.count_inserted, response.count_inserted );
1785  System.Threading.Interlocked.Add( ref this.count_updated, response.count_updated );
1786  }
1787  catch ( Exception ex )
1788  {
1789  throw new InsertException<T>( url, queue, ex.Message );
1790  }
1791  } // end private flush()
1792 
1793 
1794 
1805  public void insert( T record )
1806  {
1807  // Create the record keys
1808  RecordKey primary_key = null; // used to check for uniqueness
1809  RecordKey shard_key = null; // used to find which worker to send this record to
1810 
1811  // Build the primary key, if any
1812  if ( this.primary_key_builder != null )
1813  primary_key = this.primary_key_builder.build( record );
1814 
1815  // Build the shard/routing key, if any
1816  if ( this.shard_key_builder != null )
1817  shard_key = this.shard_key_builder.build( record );
1818 
1819  // Find out which worker to send the record to; then add the record
1820  // to the approrpriate worker's record queue
1821  WorkerQueue<T> worker_queue;
1822  if ( this.routing_table == null )
1823  { // no information regarding multiple workers, so get the first/only one
1824  worker_queue = this.worker_queues[0];
1825  }
1826  else if ( shard_key == null )
1827  { // there is no shard/routing key, so get a random worker
1828  worker_queue = this.worker_queues[ random.Next( this.worker_queues.Count ) ];
1829  }
1830  else
1831  { // Get the worker based on the sharding/routing key
1832  int worker_index = shard_key.route( this.routing_table );
1833  worker_queue = this.worker_queues[worker_index];
1834  }
1835 
1836  // Insert the record into the queue
1837  IList<T> queue = worker_queue.insert( record, primary_key );
1838 
1839  // If inserting the queue resulted in flushing the queue, then flush it
1840  // properly
1841  if ( queue != null )
1842  {
1843  this.flush( queue, worker_queue.url );
1844  }
1845  } // end insert( record )
1846 
1847 
1848 
1862  public void insert( IList<T> records)
1863  {
1864  // Insert one record at a time
1865  for ( int i = 0; i < records.Count; ++i )
1866  {
1867  try
1868  {
1869  this.insert( records[ i ] );
1870  }
1871  catch ( InsertException<T> ex )
1872  {
1873  // Add the remaining records to the insertion exception
1874  // record queue
1875  IList<T> queue = ex.records;
1876 
1877  for ( int j = i + 1; j < records.Count; ++j )
1878  {
1879  queue.Add( records[ j ] );
1880  }
1881 
1882  // Rethrow
1883  throw ex;
1884  } // end try-catch
1885  } // end outer for loop
1886  } // end insert( records )
1887 
1888 
1889 
1890  } // end class KineticaIngestor<T>
1891 
1892 
1893 
1894 
1895 } // end namespace kinetica
const string CONF_ENABLE_WORKER_HTTP_SERVERS
Boolean value indicating whether the system is configured for multi-head ingestion.
int count_inserted
The number of records inserted.
const string CHAR1
This property provides optimized memory, disk and query performance for string columns.
A set of results returned by Kinetica.showSystemProperties(IDictionary<string, string>).
const string UPDATE_ON_EXISTING_PK
Specifies the record collision policy for inserting into a table with a primary key.
Int64 getCountInserted()
Returns the count of records inserted so far.
const string DATETIME
Valid only for &#39;string&#39; columns.
const string INT16
This property provides optimized memory and query performance for int columns.
void insert(T record)
Queues a record for insertion into Kinetica.
const string PRIMARY_KEY
This property indicates that this column will be part of (or the entire) primary key.
const string CHAR128
This property provides optimized memory, disk and query performance for string columns.
AdminShowShardsResponse adminShowShards(AdminShowShardsRequest request_)
Show the mapping of shards to the corresponding rank and tom.
WorkerList()
Creates an empty WorkerList that can be populated manually with worker URLs to support multi-head ing...
WorkerList(Kinetica db, Regex ip_regex=null)
Creates a WorkerList object and automatically populates it with the worker URLs from GPUdb to support...
string Url
URL for Kinetica Server (including "http:" and port)
Definition: Kinetica.cs:82
A map of server configuration parameters and version information.
const string CONF_WORKER_HTTP_SERVER_IPS
Semicolon (&#39;;&#39;) separated string of IP addresses of all the ingestion-enabled worker heads of the sys...
int count_updated
The number of records updated.
Column properties used for Kinetica types.
const string TIMESTAMP
Valid only for &#39;long&#39; columns.
const string CHAR16
This property provides optimized memory, disk and query performance for string columns.
A list of worker URLs to use for multi-head ingest.
static void murmurhash3_x64_128(byte[] key, uint offset, uint len, int seed, out LongPair output)
Returns the MurmurHash3_x64_128 hash, placing the result in output
Definition: MurMurHash3.cs:59
const string CHAR64
This property provides optimized memory, disk and query performance for string columns.
const string CHAR4
This property provides optimized memory, disk and query performance for string columns.
const string CHAR2
This property provides optimized memory, disk and query performance for string columns.
const string DATE
Valid only for &#39;string&#39; columns.
const string CHAR8
This property provides optimized memory, disk and query performance for string columns.
const string DECIMAL
Valid only for &#39;string&#39; columns.
A set of parameters for Kinetica.insertRecords<T>(string,IList<T>,IDictionary<string, string>).
const string CHAR32
This property provides optimized memory, disk and query performance for string columns.
IList< Column > getColumns()
const string IPV4
This property provides optimized memory, disk and query performance for string columns representing I...
const string CHAR256
This property provides optimized memory, disk and query performance for string columns.
void insert(IList< T > records)
Queues a list of records for insertion into Kientica.
ShowSystemPropertiesResponse showSystemProperties(ShowSystemPropertiesRequest request_)
Returns server configuration and version related information to the caller.
A set of parameters for Kinetica.insertRecords<T>(string,IList<T>,IDictionary<string, string>).
const string SHARD_KEY
This property indicates that this column will be part of (or the entire) shard key.
const string INT8
This property provides optimized memory and query performance for int columns.
const string CONF_WORKER_HTTP_SERVER_PORTS
Semicolon (&#39;;&#39;) separated string of the port numbers of all the ingestion-enabled worker ranks of the...
IDictionary< string, string > property_map
A map of server configuration parameters and version information.
IList< int > rank
Array of ranks indexed by the shard number.
const string TRUE
Indicates that the system is configured for multi-head ingestion.
A set of results returned by Kinetica.insertRecords<T>(string,IList<T>,IDictionary<string, string>).
IDictionary< string, string > options
InsertRecordsResponse insertRecordsRaw(RawInsertRecordsRequest request_)
Adds multiple records to the specified table.
const string TIME
Valid only for &#39;string&#39; columns.
KineticaIngestor(Kinetica kdb, string table_name, int batch_size, KineticaType ktype, Dictionary< string, string > options=null, WorkerList workers=null)
API to talk to Kinetica Database
Definition: Kinetica.cs:40
Int64 getCountUpdated()
Returns the count of records updated so far.
Manages the insertion into GPUdb of large numbers of records in bulk, with automatic batch management...
void flush()
Ensures that all queued records are inserted into Kinetica.