Kinetica C# API  Version 6.0.1.0
 All Classes Namespaces Files Functions Variables Enumerations Enumerator Properties Pages
KineticaIngestor.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 using System.Collections.Immutable;
4 using System.IO;
5 using System.Text.RegularExpressions;
6 
7 
8 namespace kinetica
9 {
18  public class KineticaIngestor<T>
19  {
20  [Serializable]
21  public class InsertException<T> : KineticaException
22  {
23  public Uri url { get; private set; }
24  public IList<T> records { get; private set; }
25  private string message;
26 
27  public InsertException( string msg ) : base( msg ) { }
28 
29  internal InsertException( Uri url_, IList<T> records_, string msg ) : base ( msg )
30  {
31  this.message = msg;
32  this.url = url_;
33  this.records = records_;
34  }
35 
36  public override string ToString() { return "InsertException: " + message; }
37  } // end class InsertException
38 
39 
43  public sealed class WorkerList : List<System.Uri>
44  {
52  public WorkerList() { }
53 
67  public WorkerList( Kinetica db, Regex ip_regex = null )
68  {
69  // Get the system properties from the database server
70  IDictionary<string, string> system_properties = db.showSystemProperties().property_map;
71 
72  // Find out if multi-head ingest is turned on or not
73  string multi_head_ingestion_param;
74  system_properties.TryGetValue( ShowSystemPropertiesResponse.PropertyMap.CONF_ENABLE_WORKER_HTTP_SERVERS, out multi_head_ingestion_param );
75  if ( multi_head_ingestion_param == null )
76  throw new KineticaException( "Missing value for " + ShowSystemPropertiesResponse.PropertyMap.CONF_ENABLE_WORKER_HTTP_SERVERS );
77  bool is_multi_head_ingest_enabled = multi_head_ingestion_param.Equals( ShowSystemPropertiesResponse.PropertyMap.TRUE );
78 
79  // Nothing to do if multi-head ingestion is disabled
80  if ( !is_multi_head_ingest_enabled )
81  return;
82 
83  // Get the worker IPs and ports
84  string worker_ips_str, worker_ports_str;
85  system_properties.TryGetValue( ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_IPS, out worker_ips_str );
86  system_properties.TryGetValue( ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_PORTS, out worker_ports_str );
87 
88  // Check that we got them
89  if ( worker_ips_str.Length == 0 )
90  throw new KineticaException( "Missing value for " + ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_IPS );
91  if ( worker_ports_str.Length == 0 )
92  throw new KineticaException( "Missing value for " + ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_PORTS );
93 
94  // Parse the IPs and the ports
95  // ---------------------------
96  // Split the strings
97  string[] worker_ip_lists = worker_ips_str.Split( ';' );
98  string[] worker_ports = worker_ports_str.Split( ';' );
99 
100  // Check that there are the same number of IPs and ports supplied
101  if ( worker_ip_lists.Length != worker_ports.Length )
102  throw new KineticaException( "Inconsistent number of values for "
103  + ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_IPS
104  + " and "
105  + ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_PORTS );
106  // Create the URLs using the IPs and the ports, but
107  // ignore the very first rank (rank-0)
108  for ( int i = 1; i < worker_ip_lists.Length; ++i )
109  {
110  string ip_list = worker_ip_lists[ i ];
111 
112  // Need to split each of the IP lists on a comma
113  string[] ips = ip_list.Split( ',' );
114 
115  bool matching_ip_found = false;
116 
117  // Find at least one IP to work with
118  foreach ( string ip in ips )
119  {
120  // Try to create the URL
121  try
122  {
123  // If a regular expression is given, then see if this one is a match
124  if ( ip_regex != null )
125  matching_ip_found = ip_regex.IsMatch( ip );
126  else // no regex given, so take the first IP encountered for this worker
127  matching_ip_found = true;
128 
129  if ( matching_ip_found )
130  {
131  UriBuilder uri_builder = new UriBuilder( "http", ip, Int32.Parse( worker_ports[ i ] ), "insert/records" );
132  Uri url = uri_builder.Uri;
133 
134  // Add the URL to this WorkerList
135  this.Add( url );
136  break; // don't keep trying to match IPs in this group
137  } // end inner if
138  } // end try
139  catch ( Exception ex )
140  {
141  throw new KineticaException( ex.Message );
142  }
143  } // end inner foreach
144 
145  if ( !matching_ip_found )
146  throw new KineticaException( $"No matching IP found for worker #{i}." );
147  } // end outer for
148 
149  // Check that this list is not empty
150  if ( this.Count == 0 )
151  throw new KineticaException( "No worker HTTP servers found." );
152  } // end constructor
153 
154  } // end class WorkerList
155 
156 
157 
163  private sealed class RecordKey
164  {
168  private static readonly Regex DATE_REGEX = new Regex( "\\A(\\d{4})-(\\d{2})-(\\d{2})$" );
172  private static readonly Regex DECIMAL_REGEX = new Regex( "\\A\\s*[+-]?((?<int>\\d+)(\\.(?<frac>\\d{0,4}))?|\\.(?<frac>\\d{1,4}))\\s*\\z" );
176  private static readonly Regex IPV4_REGEX = new Regex( "\\A(?<a>\\d{1,3})\\.(?<b>\\d{1,3})\\.(?<c>\\d{1,3})\\.(?<d>\\d{1,3})$" );
180  private static readonly Regex TIME_REGEX = new Regex( "\\A(?<hour>\\d{1,2}):(?<minute>\\d{2}):(?<seconds>\\d{2})(\\.(?<milliseconds>\\d{3}))?$" );
184  private static readonly DateTime MIN_DATE = (new System.Globalization.GregorianCalendar()).MinSupportedDateTime;
185  //private readonly DateTime MIN_DATE = DateTime.MinValue;
189  private static readonly int MIN_SUPPORTED_YEAR = 1000;
193  private static readonly int MAX_SUPPORTED_YEAR = 2900;
197  private static readonly TimeZoneInfo UTC = TimeZoneInfo.Utc;
198 
199  private readonly byte[] buffer;
200  private readonly int buffer_size;
201  private int current_size;
202  private int hash_code;
203  private bool is_valid;
204  private long routingHash;
205 
211  public RecordKey( int size )
212  {
213  if ( size < 1 )
214  throw new KineticaException( "Buffer size must be greater than or equal to 1. "
215  + "Size given: " + size );
216  buffer_size = size;
217  current_size = 0;
218  buffer = new byte[size];
219  this.is_valid = true;
220  }
221 
226  public bool isValid()
227  {
228  return this.is_valid;
229  }
230 
235  public int hashCode()
236  {
237  return this.hash_code;
238  }
239 
240 
241 
248  private bool isBufferFull( bool throw_if_full = true )
249  {
250  if ( this.current_size == this.buffer_size )
251  {
252  if ( throw_if_full )
253  throw new KineticaException( "The buffer is already full!" );
254  return true; // yes, the buffer is full, and we haven't thrown
255  }
256  return false; // buffer is NOT full
257  } // end isBufferFull
258 
270  private bool willBufferOverflow( int n, bool throw_if_overflow = true )
271  {
272  // Note: We're not checking for a negative value for n here
273  if ( (this.current_size + n) > this.buffer_size )
274  {
275  if ( throw_if_overflow )
276  throw new KineticaException( $"The buffer (of size {buffer_size}) does not have sufficient room in it to put {n} more byte(s) (current size is {this.current_size})." );
277  return true; // yes, the buffer WILL overflow, but we haven't thrown
278  }
279  return false; // buffer will NOT overflow
280  } // end willBufferOverflow
281 
282 
289  private void add( byte b )
290  {
291  // Add the byte to the buffer and increment the size
292  buffer.SetValue( b, current_size++ );
293  } // end add()
294 
295 
296 
301  public void addInt( int? value )
302  {
303  // Check if the given number of characters will fit in the buffer
304  this.willBufferOverflow( 4 ); // int is four bytes long
305 
306  // Handle nulls
307  if ( value == null )
308  {
309  // Add four zero bytes for the null value
310  this.add( ( byte ) 0 ); // 1st 0
311  this.add( ( byte ) 0 ); // 2nd 0
312  this.add( ( byte ) 0 ); // 3rd 0
313  this.add( ( byte ) 0 ); // 4th 0
314  return;
315  }
316 
317  // Put the integer into the array, but first convert to bytes
318  byte[] int_bytes = BitConverter.GetBytes( (int)value );
319 
320  // Add the four bytes
321  foreach ( byte b in int_bytes )
322  this.add( b );
323  } // end addInt
324 
325 
330  public void addInt8( int? value )
331  {
332  // Check if the given number of characters will fit in the buffer
333  this.willBufferOverflow( 1 ); // int8 is one byte long
334 
335  // Handle nulls
336  if ( value == null )
337  {
338  // Add one zero byte for the null value
339  this.add( ( byte ) 0 );
340  return;
341  }
342 
343  // Put the integer into the array, but first convert to byte
344  this.add( (byte)value );
345  } // end addInt8
346 
347 
352  public void addInt16( int? value )
353  {
354  // Check if the given number of characters will fit in the buffer
355  this.willBufferOverflow( 2 ); // int16 is two bytes long
356 
357  // Handle nulls
358  if ( value == null )
359  {
360  // Add two zero bytes for the null value
361  this.add( ( byte ) 0 ); // 1st 0
362  this.add( ( byte ) 0 ); // 2nd 0
363  return;
364  }
365 
366  // Put the short into the array, but first convert to bytes
367  byte[] short_bytes = BitConverter.GetBytes( (short)value );
368 
369  // Add the two bytes
370  foreach ( byte b in short_bytes )
371  this.add( b );
372  } // end addInt16
373 
374 
375 
380  public void addLong( long? value )
381  {
382  // Check if the given number of characters will fit in the buffer
383  this.willBufferOverflow( 8 ); // int is eight bytes long
384 
385  // Handle nulls
386  if ( value == null )
387  {
388  // Add four zero bytes for the null value
389  this.add( ( byte ) 0 ); // 1st 0
390  this.add( ( byte ) 0 ); // 2nd 0
391  this.add( ( byte ) 0 ); // 3rd 0
392  this.add( ( byte ) 0 ); // 4th 0
393  this.add( ( byte ) 0 ); // 5th 0
394  this.add( ( byte ) 0 ); // 6th 0
395  this.add( ( byte ) 0 ); // 7th 0
396  this.add( ( byte ) 0 ); // 8th 0
397  return;
398  }
399 
400  // Put the long into the array, but first convert to bytes
401  byte[] long_bytes = BitConverter.GetBytes( (long)value );
402 
403  // Add the eight bytes
404  foreach ( byte b in long_bytes )
405  this.add( b );
406  } // end addLong
407 
408 
413  public void addFloat( float? value )
414  {
415  // Check if the given number of characters will fit in the buffer
416  this.willBufferOverflow( 4 ); // int is four bytes long
417 
418  // Handle nulls
419  if ( value == null )
420  {
421  // Add four zero bytes for the null value
422  this.add( ( byte ) 0.0f ); // 1st 0
423  this.add( ( byte ) 0.0f ); // 2nd 0
424  this.add( ( byte ) 0.0f ); // 3rd 0
425  this.add( ( byte ) 0.0f ); // 4th 0
426  return;
427  }
428 
429  // Put the integer into the array, but first convert to bytes
430  byte[] float_bytes = BitConverter.GetBytes( (float)value );
431 
432  // Add the four bytes
433  foreach ( byte b in float_bytes )
434  this.add( b );
435  } // end addFloat
436 
437 
438 
443  public void addDouble( double? value )
444  {
445  // Check if the given number of characters will fit in the buffer
446  this.willBufferOverflow( 8 ); // int is eight bytes long
447 
448  // Handle nulls
449  if ( value == null )
450  {
451  // Add four zero bytes for the null value
452  this.add( ( byte ) 0.0 ); // 1st 0
453  this.add( ( byte ) 0.0 ); // 2nd 0
454  this.add( ( byte ) 0.0 ); // 3rd 0
455  this.add( ( byte ) 0.0 ); // 4th 0
456  this.add( ( byte ) 0.0 ); // 5th 0
457  this.add( ( byte ) 0.0 ); // 6th 0
458  this.add( ( byte ) 0.0 ); // 7th 0
459  this.add( ( byte ) 0.0 ); // 8th 0
460  return;
461  }
462 
463  // Put the integer into the array, but first convert to bytes
464  byte[] double_bytes = BitConverter.GetBytes( (double)value );
465 
466  // Add the eight bytes
467  foreach ( byte b in double_bytes )
468  this.add( b );
469  } // end addDouble
470 
471 
472 
478  public void addString( string value )
479  {
480  // Handle nulls
481  if ( value == null )
482  {
483  this.addLong( 0L );
484  return;
485  }
486 
487  // Hash the value
488  MurMurHash3.LongPair murmur = new MurMurHash3.LongPair();
489  System.Text.Encoding encoding = new System.Text.UTF8Encoding();
490  byte[] input = encoding.GetBytes( value );
491  MurMurHash3.murmurhash3_x64_128( input, 0, (uint)input.Length, 10, out murmur );
492 
493  // Add the hashed value to the buffer
494  this.addLong( murmur.val1 );
495  } // end addString
496 
497 
498 
507  public void addCharN( string value, int N )
508  {
509  // Check if the given number of characters will fit in the buffer
510  this.willBufferOverflow( N );
512  //if ( ( this.current_size + N ) > buffer_size )
513  // throw new KineticaException( $"The given {N} character(s) will not fit in the buffer (of size {buffer_size}) which has {this.current_size} bytes in it already." );
514 
515  // Handle nulls
516  if ( value == null )
517  {
518  for ( int i = 0; i < N; ++i )
519  {
520  this.add( (byte) 0 );
521  }
522  return;
523  }
524 
525  // Encode the string into bytes (using the UTF-8 encoding)
526  byte[] bytes = System.Text.Encoding.UTF8.GetBytes( value );
527  int byte_count = bytes.GetLength( 0 );
528 
529  // Truncate longer strings to the given length
530  if ( byte_count > N )
531  byte_count = N;
532 
533  // Put the characters in the byte buffer in the little endian
534  // order (which means it will be right to left)
535  // ----------------------------------------------------------
536  // First, pad with any zeroes "at the end"
537  for ( int i = N; i > byte_count; --i )
538  {
539  this.add( (byte) 0 );
540  }
541 
542  // Then, put all the characters (in reverse order)
543  for ( int i = ( byte_count - 1 ); i >= 0; --i )
544  {
545  this.add( bytes[i] );
546  }
547  } // end addCharN()
548 
549 
556  public void addDate( string value )
557  {
558  // Check and throw if the buffer is already full
559  this.isBufferFull( true );
560 
561  // Handle nulls
562  if ( value == null )
563  {
564  this.addInt( 0 );
565  return;
566  }
567 
568  // Check that the given value matches the YYYY-MM-DD pattern
569  Match match = DATE_REGEX.Match( value );
570  if ( !match.Success )
571  {
572  // No match, so the key is invalid
573  this.is_valid = false;
574  this.addInt( 0 );
575  return;
576  }
577 
578  // We'll need to parse the string into year, month, and day
579  int year, month, day;
580  DateTime date;
581  System.Globalization.GregorianCalendar calendar = new System.Globalization.GregorianCalendar();
582 
583  // Parse the string value
584  try
585  {
586  year = int.Parse( match.Groups[ 1 ].ToString() );
587  month = int.Parse( match.Groups[ 2 ].ToString() );
588  day = int.Parse( match.Groups[ 3 ].ToString() );
589  date = new DateTime( year, month, day, calendar );
590  }
591  catch ( Exception ex )
592  {
593  // Upon any error, set this key to be invalid
594  this.addInt( 0 );
595  this.is_valid = false;
596  return;
597  }
598 
599  // Kinetica does not support years outside the range [1000, 2900]
600  if ( ( year < MIN_SUPPORTED_YEAR ) || ( year > MAX_SUPPORTED_YEAR ) )
601  {
602  this.addInt( 0 );
603  this.is_valid = false;
604  return;
605  }
606 
607  // Deduce the integer representing the date
608  int date_integer = ( ((year - MIN_SUPPORTED_YEAR) << 21)
609  | (month << 17)
610  | (day << 12)
611  | (calendar.GetDayOfYear( date ) << 3)
612  | (int)calendar.GetDayOfWeek( date ) );
613  this.addInt( date_integer );
614  } // end addDate()
615 
616 
623  public void addDecimal( string value )
624  {
625  // Check and throw if the buffer is already full
626  this.isBufferFull( true );
627 
628  // Handle nulls
629  if ( value == null )
630  {
631  this.addLong( 0L );
632  return;
633  }
634 
635  // Check that the given value matches the decimal regular expression pattern
636  Match match = DECIMAL_REGEX.Match( value );
637  if ( !match.Success )
638  {
639  // No match, so the key is invalid
640  this.is_valid = false;
641  this.addLong( 0L );
642  return;
643  }
644 
645  // Parse the string value
646  long decimal_value;
647  try
648  {
649  // Extract the integral and fractional parts
650  string integral_part_str = match.Groups[ "int" ].Value;
651  string fractional_part_str = match.Groups[ "frac" ].Value;
652  long integral_part;
653  long fractional_part;
654  bool has_integral_part = long.TryParse( integral_part_str, out integral_part );
655  bool has_fractional_part = long.TryParse( fractional_part_str, out fractional_part );
656  // Shift the integral part to the left, if there is any fractional part
657  int fractional_part_len = fractional_part_str.Length;
658  integral_part = integral_part * (long)Math.Pow(10, fractional_part_len );
659  // Put the two parts together to create a long form of the decimal value
660  decimal_value = integral_part + fractional_part;
661  }
662  catch ( Exception ex )
663  {
664  // Upon any error, set this key to be invalid
665  this.addLong( 0L );
666  this.is_valid = false;
667  return;
668  }
669 
670  // Deduce the integer representing the date
671  this.addLong( decimal_value );
672  } // end addDecimal()
673 
674 
681  public void addIPv4( string value )
682  {
683  // Check and throw if the buffer is already full
684  this.isBufferFull( true );
685 
686  // Handle nulls
687  if ( value == null )
688  {
689  this.addInt( 0 );
690  return;
691  }
692 
693  // Check that the given value matches the XXX.XXX.XXX.XXX pattern
694  Match match = IPV4_REGEX.Match( value );
695  if ( !match.Success )
696  {
697  // No match, so the key is invalid
698  this.is_valid = false;
699  this.addInt( 0 );
700  return;
701  }
702 
703  // We'll need to parse the string into four integers
704  int a, b, c, d;
705 
706  // Parse the string value
707  try
708  {
709  a = int.Parse( match.Groups[ "a" ].Value );
710  b = int.Parse( match.Groups[ "b" ].Value );
711  c = int.Parse( match.Groups[ "c" ].Value );
712  d = int.Parse( match.Groups[ "d" ].Value );
713  }
714  catch ( Exception ex )
715  {
716  // Upon any error, set this key to be invalid
717  this.addInt( 0 );
718  this.is_valid = false;
719  return;
720  }
721 
722  // Each byte has to be within the range [0, 255] (the regex does
723  // not support negative numbers, so no worries about those)
724  if ( ( a > 255 ) || ( b > 255 ) || ( c > 255 ) || ( d > 255 ) )
725  {
726  this.addInt( 0 );
727  this.is_valid = false;
728  return;
729  }
730 
731  // Deduce the integer representing the date
732  int ipv4_integer = ( (a << 24) | (b << 16) | (c << 8) | d );
733  this.addInt( ipv4_integer );
734  } // end addIPv4()
735 
736 
743  public void addTime( string value )
744  {
745  // Check and throw if the buffer is already full
746  this.isBufferFull( true );
747 
748  // Handle nulls
749  if ( value == null )
750  {
751  this.addInt( 0 );
752  return;
753  }
754 
755  // Check that the given value matches the HH:MM:SS[.mmm] pattern
756  Match match = TIME_REGEX.Match( value );
757  if ( !match.Success )
758  {
759  // No match, so the key is invalid
760  this.is_valid = false;
761  this.addInt( 0 );
762  return;
763  }
764 
765  // We'll need to parse the string into four integers
766  uint hour, minute, second, milliseconds;
767 
768  // Parse the string value
769  try
770  {
771  hour = uint.Parse( match.Groups["hour"].Value );
772  minute = uint.Parse( match.Groups["minute"].Value );
773  second = uint.Parse( match.Groups["seconds"].Value );
774  Group msec_group = match.Groups["milliseconds"];
775  if ( msec_group.Success )
776  milliseconds = uint.Parse( msec_group.Value );
777  else
778  milliseconds = 0;
779  }
780  catch ( Exception ex )
781  {
782  // Upon any error, set this key to be invalid
783  this.addInt( 0 );
784  this.is_valid = false;
785  return;
786  }
787 
788  // Validate the hour, minute, second values
789  if ( ( hour > 23 ) || ( minute > 59 ) || ( second > 59 ) )
790  {
791  this.addInt( 0 );
792  this.is_valid = false;
793  return;
794  }
795 
796  // Deduce the integer representing the time
797  int time_integer = (int)( (hour << 26) | (minute << 20) | (second << 14) | (milliseconds << 4) );
798  this.addInt( time_integer );
799  } // end addTime()
800 
801 
806  public void addTimeStamp( long? value )
807  {
808  // Handle nulls
809  if ( value == null )
810  {
811  this.addInt( 0 );
812  return;
813  }
814 
815  //
816  System.Globalization.GregorianCalendar calendar = new System.Globalization.GregorianCalendar();
817  DateTime time = MIN_DATE.AddMilliseconds( (double) value );
818  long timestamp = (long) ( ((time.Year - 1900) << 53)
819  | ((time.Month + 1) << 49)
820  | (time.Day << 44)
821  | (time.Hour << 39)
822  | (time.Minute << 33)
823  | (time.Second << 27)
824  | (time.Millisecond << 17)
825  | (time.DayOfYear << 8)
826  | ((int)time.DayOfWeek << 5) );
827  this.addLong( timestamp );
828  } // end addTimeStamp()
829 
830 
831 
838  public void computHashes()
839  {
840  // Check all the values for the key have been added
841  if ( this.current_size != this.buffer_size )
842  throw new KineticaException( "The RecordKey buffer is not full; check that all the relevant values have been added." );
843 
844  // Hash the value
845  MurMurHash3.LongPair murmur = new MurMurHash3.LongPair();
846  MurMurHash3.murmurhash3_x64_128( this.buffer, 0, ( uint ) this.buffer_size, 10, out murmur );
847 
848  // Save the hash value
849  this.routingHash = murmur.val1;
850  this.hash_code = ( int ) ( this.routingHash ^ ((this.routingHash >> 32) & 0x0000ffffL));
851  } // end computHashes
852 
853 
854 
861  public int route( IList<int> routingTable )
862  {
863  // Return 1 less than the value of the nth element of routingTable where
864  // n == (record key hash) % (number of elements in routingTable)
865  // (because the 1st worker rank is the 0th element in the worker list)
866  return (routingTable[ Math.Abs( ( int ) ( this.routingHash % routingTable.Count ) ) ] - 1);
867  } // end route
868 
869  } // end class RecordKey
870 
871 
876  private sealed class RecordKeyBuilder<T>
877  {
881  private enum ColumnType
882  {
883  CHAR1,
884  CHAR2,
885  CHAR4,
886  CHAR8,
887  CHAR16,
888  CHAR32,
889  CHAR64,
890  CHAR128,
891  CHAR256,
892  DATE,
893  DECIMAL,
894  DOUBLE,
895  FLOAT,
896  INT,
897  INT8,
898  INT16,
899  IPV4,
900  LONG,
901  STRING,
902  TIME,
903  TIMESTAMP
904  } // end enum ColumnType
905 
906 
907  // Class members
908  private KineticaType ktype;
909  private IList<int> routing_column_indices;
910  private IList<ColumnType> column_types;
911  private int buffer_size;
912 
913  public RecordKeyBuilder( bool is_primary_key, KineticaType ktype )
914  {
915  this.ktype = ktype;
916 
917  this.buffer_size = 0;
918  routing_column_indices = new List<int>();
919  column_types = new List<ColumnType>();
920 
921  // We need to check if the type has all of the following: x, y, timestamp, track ID
922  // (this will tell us if it's a track type table, and if so, the track ID
923  // column would be a routing column)
924  bool has_timestamp = false;
925  bool has_x = false;
926  bool has_y = false;
927  int track_id_column_idx = -1; // not found yet
928 
929  // Add indices of any primary or shard key (based on is_primary_key)
930  // to the list of routing columns
931  IList<KineticaType.Column> columns = ktype.getColumns();
932  for ( int i = 0; i < columns.Count; ++i )
933  {
934  // Get the column
935  KineticaType.Column column = columns[ i ];
936 
937  // Check if it is one of: x, y, timestamp, track ID
938  switch ( column.getName() )
939  {
940  case "TRACKID":
941  track_id_column_idx = i;
942  break;
943 
944  case "TIMESTAMP":
945  has_timestamp = true;
946  break;
947 
948  case "x":
949  has_x = true;
950  break;
951 
952  case "y":
953  has_y = true;
954  break;
955  } // end switch on column name
956 
957  // Check if this column has been declared as a primary/shard key
958  // And if so, and if appropriate, add it to the routing key column list
959  if ( is_primary_key && column.getProperties().Contains( ColumnProperty.PRIMARY_KEY ) )
960  {
961  routing_column_indices.Add( i );
962  }
963  else if ( !is_primary_key && column.getProperties().Contains( ColumnProperty.SHARD_KEY ) )
964  {
965  routing_column_indices.Add( i );
966  }
967  } // end for loop
968 
969  // Check if this is a track-type table; if so, add the track ID column's index to the list
970  if ( !is_primary_key
971  && has_timestamp && has_x && has_y && ( track_id_column_idx != -1 ) )
972  {
973  if ( routing_column_indices.Count == 0 )
974  {
975  routing_column_indices.Add( track_id_column_idx );
976  }
977  else if ( ( routing_column_indices.Count != 1 )
978  || ( routing_column_indices[0] != track_id_column_idx ) )
979  {
980  // Track type tables can't have any other routing key
981  throw new KineticaException( "Cannot have a shard key other than 'TRACKID' for track tables." );
982  }
983  } // end if a track type table
984 
985 
986  // For each index of routing columns, save the column type, and increase
987  // the buffer size appropriately
988  foreach ( int i in routing_column_indices )
989  {
990  // Get the column information
991  KineticaType.Column column = columns[ i ];
992 
993  switch ( column.getType() )
994  {
995  // Float and double are the simplest
996  case KineticaType.Column.ColumnType.FLOAT:
997  {
998  column_types.Add( ColumnType.FLOAT );
999  this.buffer_size += 4;
1000  break;
1001  }
1002  case KineticaType.Column.ColumnType.DOUBLE:
1003  {
1004  column_types.Add( ColumnType.DOUBLE );
1005  this.buffer_size += 8;
1006  break;
1007  }
1008 
1009  case KineticaType.Column.ColumnType.INT:
1010  {
1011  // Integer has byte, short and int
1012  if ( column.getProperties().Contains( ColumnProperty.INT8 ) )
1013  { // byte
1014  column_types.Add( ColumnType.INT8 );
1015  this.buffer_size += 1;
1016  }
1017  else if ( column.getProperties().Contains( ColumnProperty.INT16 ) )
1018  { // short
1019  column_types.Add( ColumnType.INT16 );
1020  this.buffer_size += 2;
1021  }
1022  else // regular 4-byte integer
1023  {
1024  column_types.Add( ColumnType.INT );
1025  this.buffer_size += 4;
1026  }
1027  break;
1028  } // end case integer
1029 
1030  case KineticaType.Column.ColumnType.LONG:
1031  {
1032  // Long has the regular long and timestamp
1033  if ( column.getProperties().Contains( ColumnProperty.TIMESTAMP ) )
1034  { // it's a timestamp
1035  column_types.Add( ColumnType.TIMESTAMP );
1036  }
1037  else // regular long
1038  {
1039  column_types.Add( ColumnType.LONG );
1040  }
1041  this.buffer_size += 8;
1042  break;
1043  } // end case long
1044 
1045  case KineticaType.Column.ColumnType.STRING:
1046  {
1047  if ( column.getProperties().Contains( ColumnProperty.CHAR1 ) )
1048  {
1049  column_types.Add( ColumnType.CHAR1 );
1050  this.buffer_size += 1;
1051  }
1052  else if ( column.getProperties().Contains( ColumnProperty.CHAR2 ) )
1053  {
1054  column_types.Add( ColumnType.CHAR2 );
1055  this.buffer_size += 2;
1056  }
1057  else if ( column.getProperties().Contains( ColumnProperty.CHAR4 ) )
1058  {
1059  column_types.Add( ColumnType.CHAR4 );
1060  this.buffer_size += 4;
1061  }
1062  else if ( column.getProperties().Contains( ColumnProperty.CHAR8 ) )
1063  {
1064  column_types.Add( ColumnType.CHAR8 );
1065  this.buffer_size += 8;
1066  }
1067  else if ( column.getProperties().Contains( ColumnProperty.CHAR16 ) )
1068  {
1069  column_types.Add( ColumnType.CHAR16 );
1070  this.buffer_size += 16;
1071  }
1072  else if ( column.getProperties().Contains( ColumnProperty.CHAR32 ) )
1073  {
1074  column_types.Add( ColumnType.CHAR32 );
1075  this.buffer_size += 32;
1076  }
1077  else if ( column.getProperties().Contains( ColumnProperty.CHAR64 ) )
1078  {
1079  column_types.Add( ColumnType.CHAR64 );
1080  this.buffer_size += 64;
1081  }
1082  else if ( column.getProperties().Contains( ColumnProperty.CHAR128 ) )
1083  {
1084  column_types.Add( ColumnType.CHAR128 );
1085  this.buffer_size += 128;
1086  }
1087  else if ( column.getProperties().Contains( ColumnProperty.CHAR256 ) )
1088  {
1089  column_types.Add( ColumnType.CHAR256 );
1090  this.buffer_size += 256;
1091  }
1092  else if ( column.getProperties().Contains( ColumnProperty.DATE ) )
1093  {
1094  column_types.Add( ColumnType.DATE );
1095  this.buffer_size += 4;
1096  }
1097  else if ( column.getProperties().Contains( ColumnProperty.DECIMAL ) )
1098  {
1099  column_types.Add( ColumnType.DECIMAL );
1100  this.buffer_size += 8;
1101  }
1102  else if ( column.getProperties().Contains( ColumnProperty.IPV4 ) )
1103  {
1104  column_types.Add( ColumnType.IPV4 );
1105  this.buffer_size += 4;
1106  }
1107  else if ( column.getProperties().Contains( ColumnProperty.TIME ) )
1108  {
1109  column_types.Add( ColumnType.TIME );
1110  this.buffer_size += 4;
1111  }
1112  else // regular string
1113  {
1114  column_types.Add( ColumnType.STRING );
1115  this.buffer_size += 8;
1116  }
1117  break;
1118  } // end case string
1119 
1120  // Other types are not allowed for routing columns
1121  case KineticaType.Column.ColumnType.BYTES:
1122  case KineticaType.Column.ColumnType.DEFAULT:
1123  throw new KineticaException( $"Cannot use column '{column.getName()}' as a key." );
1124  } // end switch on the column's primitive data type
1125  } // end foreach
1126  } // end constructor RecordKeyBuilder
1127 
1128 
1136  public RecordKey build( T record )
1137  {
1138  // Can't build a key if the buffer size is zero!
1139  if ( this.buffer_size == 0 )
1140  return null;
1141 
1142  // Create the empty key
1143  RecordKey key = new RecordKey( this.buffer_size );
1144 
1145  // Add each routing column's value to the key
1146  for ( int i = 0; i < this.routing_column_indices.Count; ++i )
1147  {
1148  // Get the column (with type and name)
1149  KineticaType.Column column = this.ktype.getColumns()[ this.routing_column_indices[ i ] ];
1150 
1151  // Get the value out of the record using the column's name and reflection
1152  var value = record.GetType().GetProperty( column.getName() ).GetValue( record, null );
1153 
1154  switch ( this.column_types[i] )
1155  {
1156  case ColumnType.CHAR1:
1157  key.addCharN( (string) value, 1 );
1158  break;
1159 
1160  case ColumnType.CHAR2:
1161  key.addCharN( ( string ) value, 2 );
1162  break;
1163 
1164  case ColumnType.CHAR4:
1165  key.addCharN( ( string ) value, 4 );
1166  break;
1167 
1168  case ColumnType.CHAR8:
1169  key.addCharN( ( string ) value, 8 );
1170  break;
1171 
1172  case ColumnType.CHAR16:
1173  key.addCharN( ( string ) value, 16 );
1174  break;
1175 
1176  case ColumnType.CHAR32:
1177  key.addCharN( ( string ) value, 32 );
1178  break;
1179 
1180  case ColumnType.CHAR64:
1181  key.addCharN( ( string ) value, 64 );
1182  break;
1183 
1184  case ColumnType.CHAR128:
1185  key.addCharN( ( string ) value, 128 );
1186  break;
1187 
1188  case ColumnType.CHAR256:
1189  key.addCharN( ( string ) value, 256 );
1190  break;
1191 
1192  case ColumnType.DATE:
1193  key.addDate( (string) value );
1194  break;
1195 
1196  case ColumnType.DECIMAL:
1197  key.addDecimal( (string) value );
1198  break;
1199 
1200  case ColumnType.DOUBLE:
1201  key.addDouble( ( double? ) value );
1202  break;
1203 
1204  case ColumnType.FLOAT:
1205  key.addFloat( ( float? ) value );
1206  break;
1207 
1208  case ColumnType.INT:
1209  key.addInt( ( int? ) value );
1210  break;
1211 
1212  case ColumnType.INT8:
1213  key.addInt8( ( int? ) value );
1214  break;
1215 
1216  case ColumnType.INT16:
1217  key.addInt16( ( int? ) value );
1218  break;
1219 
1220  case ColumnType.IPV4:
1221  key.addIPv4( ( string ) value );
1222  break;
1223 
1224  case ColumnType.LONG:
1225  key.addLong( ( long? ) value );
1226  break;
1227 
1228  case ColumnType.STRING:
1229  key.addString( ( string ) value );
1230  break;
1231 
1232  case ColumnType.TIME:
1233  key.addTime( ( string ) value );
1234  break;
1235 
1236  case ColumnType.TIMESTAMP:
1237  key.addTimeStamp( ( long? ) value );
1238  break;
1239  } // end switch
1240  } // end for loop
1241 
1242  // Compute the hash for the key and return it
1243  key.computHashes();
1244  return key;
1245  } // end build()
1246 
1247 
1248 
1254  public bool hasKey()
1255  {
1256  // Does it have any routing columns?
1257  return !(this.routing_column_indices.Count == 0);
1258  }
1259 
1260 
1266  public bool hasSameKey( RecordKeyBuilder<T> other)
1267  {
1268  return this.column_types.Equals( other.column_types );
1269  }
1270 
1271  } // end class RecordKeyBuilder
1272 
1273 
1274 
1275  private sealed class WorkerQueue<T>
1276  {
1277  public System.Uri url { get; private set; }
1278  private int capacity;
1279  private bool has_primary_key;
1280  private bool update_on_existing_pk;
1281  private IList<T> queue;
1282  private Dictionary<RecordKey, int> primary_key_map;
1283 
1284 
1292  public WorkerQueue( System.Uri url, int capacity, bool has_primary_key, bool update_on_existing_pk )
1293  {
1294  this.url = url;
1295  this.capacity = capacity;
1296  this.has_primary_key = has_primary_key;
1297  this.update_on_existing_pk = update_on_existing_pk;
1298 
1299  queue = new List<T>();
1300 
1301  // If the type has primary keys, then initialize with a
1302  // capacity of 75% of the final capacity
1303  if ( this.has_primary_key )
1304  primary_key_map = new Dictionary<RecordKey, int>( (int)Math.Round( this.capacity/0.75 ) );
1305  } // end constructor WorkerQueue<T>
1306 
1307 
1308 
1313  public IList<T> flush()
1314  {
1315  IList<T> old_queue = this.queue;
1316  queue = new List<T>( this.capacity );
1317 
1318  // Clear the primary key map if one exists
1319  if ( this.primary_key_map != null )
1320  this.primary_key_map.Clear();
1321 
1322  return old_queue;
1323  } // end flush
1324 
1325 
1326 
1334  public IList<T> insert( T record, RecordKey key )
1335  {
1336  if ( this.has_primary_key && key.isValid() )
1337  {
1338  // We are to update the record even if the primary key already exists
1339  if ( this.update_on_existing_pk )
1340  {
1341  int key_idx;
1342 
1343  if ( this.primary_key_map.TryGetValue( key, out key_idx ) )
1344  {
1345  // Key exists, so we need to replace the associated record
1346  this.queue[key_idx] = record;
1347  }
1348  else // key does not exist; add the record and
1349  { // update the key->record mapping
1350  this.queue.Add( record );
1351  this.primary_key_map.Add( key, ( this.queue.Count - 1 ) );
1352  }
1353  }
1354  else // do NOT update/add the record if the key already exists
1355  {
1356  if ( this.primary_key_map.ContainsKey( key ) )
1357  return null; // yup, the key already exists
1358 
1359  // The key does not exist, so add the record and
1360  // update the key->record map
1361  this.queue.Add( record );
1362  this.primary_key_map.Add( key, ( this.queue.Count - 1 ) );
1363  }
1364  }
1365  else // simply add the record
1366  {
1367  queue.Add( record );
1368  }
1369 
1370  // If the queue is full, then flush and return the 'old' queue
1371  if ( queue.Count == capacity )
1372  return flush();
1373  else // no records to return
1374  return null;
1375  } // end insert
1376  } // end class WorkerQueue
1377 
1378 
1379 
1380 
1381  // KineticaIngestor Members:
1382  // =========================
1383  public Kinetica kineticaDB { get; }
1384  public string table_name { get; }
1385  public int batch_size { get; }
1386  public IDictionary<string, string> options { get; }
1387  //public IReadOnlyDictionary<string, string> options { get; }
1388  public Int64 count_inserted;
1389  public Int64 count_updated;
1390  private KineticaType ktype;
1391  private RecordKeyBuilder<T> primary_key_builder;
1392  private RecordKeyBuilder<T> shard_key_builder;
1393  private IList<int> routing_table;
1394  private IList<WorkerQueue<T>> worker_queues;
1395  private Random random;
1396 
1397 
1407  public KineticaIngestor( Kinetica kdb, string table_name,
1408  int batch_size, KineticaType ktype,
1409  Dictionary<string, string> options = null,
1410  WorkerList workers = null )
1411  {
1412  this.kineticaDB = kdb;
1413  this.table_name = table_name;
1414  this.ktype = ktype;
1415 
1416  // Validate and save the batch size
1417  if ( batch_size < 1 )
1418  throw new KineticaException( $"Batch size must be greater than one; given {batch_size}." );
1419  this.batch_size = batch_size;
1420 
1421  // Save the options (make it read-only if it exists)
1422  if ( options != null )
1423  {
1424  this.options = options;
1425  //this.options = options.ToImmutableDictionary<string, string>();
1426  }
1427  else
1428  {
1429  this.options = null;
1430  }
1431 
1432  // Set up the primary and shard key builders
1433  // -----------------------------------------
1434  this.primary_key_builder = new RecordKeyBuilder<T>( true, this.ktype );
1435  this.shard_key_builder = new RecordKeyBuilder<T>( false, this.ktype );
1436 
1437  // Based on the Java implementation
1438  if ( this.primary_key_builder.hasKey() )
1439  { // There is a primary key for the given T
1440  // Now check if there is a distinct shard key
1441  if ( !this.shard_key_builder.hasKey()
1442  || this.shard_key_builder.hasSameKey( this.primary_key_builder ) )
1443  this.shard_key_builder = this.primary_key_builder; // no distinct shard key
1444  }
1445  else // there is no primary key for the given T
1446  {
1447  this.primary_key_builder = null;
1448 
1449  // Check if there is shard key for T
1450  if ( !this.shard_key_builder.hasKey() )
1451  this.shard_key_builder = null;
1452  } // done setting up the key builders
1453 
1454 
1455  // Set up the worker queues
1456  // -------------------------
1457  // Do we update records if there are matching primary keys in the
1458  // database already?
1459  bool update_on_existing_pk = ( (options != null)
1460  && options.ContainsKey( InsertRecordsRequest<T>.Options.UPDATE_ON_EXISTING_PK )
1461  && options[ InsertRecordsRequest<T>.Options.UPDATE_ON_EXISTING_PK ].Equals( InsertRecordsRequest<T>.Options.TRUE ) );
1462  // Do T type records have a primary key?
1463  bool has_primary_key = (this.primary_key_builder != null);
1464  this.worker_queues = new List<WorkerQueue<T>>();
1465  try
1466  {
1467  // If no workers are given, try to get them from Kinetica
1468  if ( ( workers == null ) || ( workers.Count == 0 ) )
1469  {
1470  workers = new WorkerList( kdb );
1471  }
1472 
1473  // If we end up with multiple workers, either given by the
1474  // user or obtained from Kinetica, then use those
1475  if ( ( workers != null ) && ( workers.Count > 0 ) )
1476  {
1477  // Add worker queues per worker
1478  foreach ( System.Uri worker_url in workers )
1479  {
1480  WorkerQueue<T> worker_queue = new WorkerQueue<T>( worker_url, batch_size, has_primary_key, update_on_existing_pk );
1481  this.worker_queues.Add( worker_queue );
1482  }
1483 
1484  // Get the worker rank information from Kinetica
1485  this.routing_table = kdb.adminGetShardAssignments().shard_assignments_rank;
1486  // Check that enough worker URLs are specified
1487  for ( int i = 0; i < routing_table.Count; ++i )
1488  {
1489  if ( this.routing_table[i] > this.worker_queues.Count )
1490  throw new KineticaException( "Not enough worker URLs specified." );
1491  }
1492  }
1493  else // multihead-ingest is NOT turned on; use the regular Kinetica IP address
1494  {
1495  System.Uri url = new System.Uri( kdb.Url + "/insert/records" );
1496  WorkerQueue<T> worker_queue = new WorkerQueue<T>( url, batch_size, has_primary_key, update_on_existing_pk );
1497  this.worker_queues.Add( worker_queue );
1498  this.routing_table = null;
1499  }
1500  }
1501  catch ( Exception ex )
1502  {
1503  throw new KineticaException( ex.ToString() );
1504  }
1505 
1506  // Create the random number generator
1507  this.random = new Random( (int) DateTime.Now.Ticks );
1508  } // end constructor KineticaIngestor
1509 
1510 
1516  public Int64 getCountInserted()
1517  {
1518  return System.Threading.Interlocked.Read( ref this.count_inserted );
1519  }
1520 
1521 
1527  public Int64 getCountUpdated()
1528  {
1529  return System.Threading.Interlocked.Read( ref this.count_updated );
1530  }
1531 
1532 
1542  public void flush()
1543  {
1544  foreach ( WorkerQueue<T> worker_queue in this.worker_queues )
1545  {
1546  // Flush the the queue
1547  IList<T> queue = worker_queue.flush();
1548  // Actually insert the records
1549  flush( queue, worker_queue.url );
1550  }
1551  } // end public flush
1552 
1553 
1560  private void flush( IList<T> queue, System.Uri url )
1561  {
1562  if ( queue.Count == 0 )
1563  return; // nothing to do since the queue is empty
1564 
1565  try
1566  {
1567  // Create the /insert/records request and response objects
1568  // -------------------------------------------------------
1569  // Encode the records into binary
1570  IList<byte[]> encoded_queue = new List<byte[]>();
1571  foreach ( var record in queue ) encoded_queue.Add( this.kineticaDB.AvroEncode( record ) );
1572  RawInsertRecordsRequest request = new RawInsertRecordsRequest( this.table_name, encoded_queue, this.options);
1573 
1575 
1576  // Make the /insert/records call
1577  if ( url == null )
1578  {
1579  response = this.kineticaDB.insertRecordsRaw( request );
1580  }
1581  else
1582  {
1583  response = this.kineticaDB.SubmitRequest<InsertRecordsResponse>( url, request );
1584  }
1585 
1586  // Save the counts of inserted and updated records
1587  System.Threading.Interlocked.Add( ref this.count_inserted, response.count_inserted );
1588  System.Threading.Interlocked.Add( ref this.count_updated, response.count_updated );
1589  }
1590  catch ( Exception ex )
1591  {
1592  throw new InsertException<T>( url, queue, ex.ToString() );
1593  }
1594  } // end private flush()
1595 
1596 
1597 
1608  public void insert( T record )
1609  {
1610  // Create the record keys
1611  RecordKey primary_key = null; // used to check for uniqueness
1612  RecordKey shard_key = null; // used to find which worker to send this record to
1613 
1614  // Build the primary key, if any
1615  if ( this.primary_key_builder != null )
1616  primary_key = this.primary_key_builder.build( record );
1617 
1618  // Build the shard/routing key, if any
1619  if ( this.shard_key_builder != null )
1620  shard_key = this.shard_key_builder.build( record );
1621 
1622  // Find out which worker to send the record to; then add the record
1623  // to the approrpriate worker's record queue
1624  WorkerQueue<T> worker_queue;
1625  if ( this.routing_table == null )
1626  { // no information regarding multiple workers, so get the first/only one
1627  worker_queue = this.worker_queues[0];
1628  }
1629  else if ( shard_key == null )
1630  { // there is no shard/routing key, so get a random worker
1631  worker_queue = this.worker_queues[ random.Next( this.worker_queues.Count ) ];
1632  }
1633  else
1634  { // Get the worker based on the sharding/routing key
1635  int worker_index = shard_key.route( this.routing_table );
1636  worker_queue = this.worker_queues[worker_index];
1637  }
1638 
1639  // Insert the record into the queue
1640  IList<T> queue = worker_queue.insert( record, primary_key );
1641 
1642  // If inserting the queue resulted in flushing the queue, then flush it
1643  // properly
1644  if ( queue != null )
1645  {
1646  this.flush( queue, worker_queue.url );
1647  }
1648  } // end insert( record )
1649 
1650 
1651 
1665  public void insert( IList<T> records)
1666  {
1667  // Insert one record at a time
1668  for ( int i = 0; i < records.Count; ++i )
1669  {
1670  try
1671  {
1672  this.insert( records[ i ] );
1673  }
1674  catch ( InsertException<T> ex )
1675  {
1676  // Add the remaining records to the insertion exception
1677  // record queue
1678  IList<T> queue = ex.records;
1679 
1680  for ( int j = i + 1; j < records.Count; ++j )
1681  {
1682  queue.Add( records[ j ] );
1683  }
1684 
1685  // Rethrow
1686  throw ex;
1687  } // end try-catch
1688  } // end outer for loop
1689  } // end insert( records )
1690 
1691 
1692 
1693  } // end class KineticaIngestor<T>
1694 
1695 
1696 
1697 
1698 } // end namespace kinetica
const string CHAR1
This property provides optimized memory, disk and query performance for string columns.
A set of results returned by /show/system/properties.
const string INT16
This property provides optimized memory and query performance for int columns.
const string PRIMARY_KEY
This property indicates that this column will be part of (or the entire) primary key.
const string CHAR128
This property provides optimized memory, disk and query performance for string columns.
A set of parameters for /insert/records.
WorkerList(Kinetica db, Regex ip_regex=null)
Creates a WorkerList object and automatically populates it with the worker URLs from GPUdb to support...
WorkerList()
Creates an empty WorkerList that can be populated manually with worker URLs to support multi-head ing...
void insert(T record)
Queues a record for insertion into Kinetica.
KineticaIngestor(Kinetica kdb, string table_name, int batch_size, KineticaType ktype, Dictionary< string, string > options=null, WorkerList workers=null)
const string TIMESTAMP
Valid only for &#39;long&#39; columns.
const string CHAR16
This property provides optimized memory, disk and query performance for string columns.
Int64 getCountInserted()
Returns the count of records inserted so far.
const string CHAR64
This property provides optimized memory, disk and query performance for string columns.
const string CHAR4
This property provides optimized memory, disk and query performance for string columns.
const string CHAR2
This property provides optimized memory, disk and query performance for string columns.
const string DATE
Valid only for &#39;string&#39; columns.
void insert(IList< T > records)
Queues a list of records for insertion into Kientica.
const string CHAR8
This property provides optimized memory, disk and query performance for string columns.
const string DECIMAL
Valid only for &#39;string&#39; columns.
void flush()
Ensures that all queued records are inserted into Kinetica.
const string CHAR32
This property provides optimized memory, disk and query performance for string columns.
const string IPV4
This property provides optimized memory, disk and query performance for string columns representing I...
const string CHAR256
This property provides optimized memory, disk and query performance for string columns.
A set of parameters for /insert/records.
const string SHARD_KEY
This property indicates that this column will be part of (or the entire) shard key.
const string INT8
This property provides optimized memory and query performance for int columns.
Int64 getCountUpdated()
Returns the count of records updated so far.
A set of results returned by /insert/records.
const string TIME
Valid only for &#39;string&#39; columns.
API to talk to Kinetica Database
Definition: Kinetica.cs:40