2 using System.Collections.Generic;
 
    3 using System.Collections.Immutable;
 
    5 using System.Text.RegularExpressions;
 
   18     public class KineticaIngestor<T>
 
   23             public Uri url { 
get; 
private set; }
 
   24             public IList<T> records { 
get; 
private set; }
 
   25             private string message;
 
   29             internal InsertException( Uri url_, IList<T> records_, 
string msg ) : base ( msg )
 
   33                 this.records = records_;
 
   36             public override string ToString() { 
return "InsertException: " + message; }
 
   43         public sealed 
class WorkerList : List<System.Uri>
 
   70                 IDictionary<string, string> system_properties = db.showSystemProperties().property_map;
 
   73                 string multi_head_ingestion_param;
 
   74                 system_properties.TryGetValue( ShowSystemPropertiesResponse.PropertyMap.CONF_ENABLE_WORKER_HTTP_SERVERS, out multi_head_ingestion_param );
 
   75                 if ( multi_head_ingestion_param == null )
 
   77                 bool is_multi_head_ingest_enabled = multi_head_ingestion_param.Equals( ShowSystemPropertiesResponse.PropertyMap.TRUE );
 
   80                 if ( !is_multi_head_ingest_enabled )
 
   84                 string worker_ips_str, worker_ports_str;
 
   85                 system_properties.TryGetValue( ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_IPS, out worker_ips_str );
 
   86                 system_properties.TryGetValue( ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_PORTS, out worker_ports_str );
 
   89                 if ( worker_ips_str.Length == 0 )
 
   91                 if ( worker_ports_str.Length == 0 )
 
   97                 string[] worker_ip_lists = worker_ips_str.Split( 
';' );
 
   98                 string[] worker_ports    = worker_ports_str.Split( 
';' );
 
  101                 if ( worker_ip_lists.Length != worker_ports.Length )
 
  108                 for ( 
int i = 1; i < worker_ip_lists.Length; ++i )
 
  110                     string ip_list = worker_ip_lists[ i ];
 
  113                     string[] ips = ip_list.Split( 
',' );
 
  115                     bool matching_ip_found = 
false;
 
  118                     foreach ( 
string ip 
in ips )
 
  124                             if ( ip_regex != null )
 
  125                                 matching_ip_found = ip_regex.IsMatch( ip );
 
  127                                 matching_ip_found = 
true;
 
  129                             if ( matching_ip_found )
 
  131                                 UriBuilder uri_builder = 
new UriBuilder( 
"http", ip, Int32.Parse( worker_ports[ i ] ), 
"insert/records" );
 
  132                                 Uri url = uri_builder.Uri;
 
  139                         catch ( Exception ex )
 
  145                     if ( !matching_ip_found )
 
  150                 if ( this.Count == 0 )
 
  163         private sealed 
class RecordKey
 
  168             private static readonly Regex DATE_REGEX = 
new Regex( 
"\\A(\\d{4})-(\\d{2})-(\\d{2})$" );
 
  172             private static readonly Regex DECIMAL_REGEX = 
new Regex( 
"\\A\\s*[+-]?((?<int>\\d+)(\\.(?<frac>\\d{0,4}))?|\\.(?<frac>\\d{1,4}))\\s*\\z" );
 
  176             private static readonly Regex IPV4_REGEX = 
new Regex( 
"\\A(?<a>\\d{1,3})\\.(?<b>\\d{1,3})\\.(?<c>\\d{1,3})\\.(?<d>\\d{1,3})$" );
 
  180             private static readonly Regex TIME_REGEX = 
new Regex( 
"\\A(?<hour>\\d{1,2}):(?<minute>\\d{2}):(?<seconds>\\d{2})(\\.(?<milliseconds>\\d{3}))?$" );
 
  184             private static readonly DateTime MIN_DATE = (
new System.Globalization.GregorianCalendar()).MinSupportedDateTime;
 
  189             private static readonly 
int MIN_SUPPORTED_YEAR = 1000;
 
  193             private static readonly 
int MAX_SUPPORTED_YEAR = 2900;
 
  197             private static readonly TimeZoneInfo UTC = TimeZoneInfo.Utc;
 
  199             private readonly byte[] buffer;
 
  200             private readonly 
int buffer_size;
 
  201             private int current_size;
 
  202             private int hash_code;
 
  203             private bool is_valid;
 
  204             private long routingHash;
 
  211             public RecordKey( 
int size )
 
  215                                                   + 
"Size given: " + size );
 
  218                 buffer       = 
new byte[size];
 
  219                 this.is_valid = 
true;
 
  226             public bool isValid()
 
  228                 return this.is_valid;
 
  235             public int hashCode()
 
  237                 return this.hash_code;
 
  248             private bool isBufferFull( 
bool throw_if_full = 
true )
 
  250                 if ( this.current_size == this.buffer_size )
 
  253                         throw new KineticaException( 
"The buffer is already full!" );
 
  270             private bool willBufferOverflow( 
int n, 
bool throw_if_overflow = 
true )
 
  273                 if ( (this.current_size + n) > this.buffer_size )
 
  275                     if ( throw_if_overflow )
 
  276                         throw new KineticaException( $
"The buffer (of size {buffer_size}) does not have sufficient room in it to put {n} more byte(s) (current size is {this.current_size})." );
 
  289             private void add( byte b )
 
  292                 buffer.SetValue( b, current_size++ );
 
  301             public void addInt( 
int? value )
 
  304                 this.willBufferOverflow( 4 );  
 
  310                     this.add( ( byte ) 0 );  
 
  311                     this.add( ( byte ) 0 );  
 
  312                     this.add( ( byte ) 0 );  
 
  313                     this.add( ( byte ) 0 );  
 
  318                 byte[] int_bytes = BitConverter.GetBytes( (int)value );
 
  321                 foreach ( byte b 
in int_bytes )
 
  330             public void addInt8( 
int? value )
 
  333                 this.willBufferOverflow( 1 );  
 
  339                     this.add( ( byte ) 0 ); 
 
  344                 this.add( (byte)value );
 
  352             public void addInt16( 
int? value )
 
  355                 this.willBufferOverflow( 2 );  
 
  361                     this.add( ( byte ) 0 );  
 
  362                     this.add( ( byte ) 0 );  
 
  367                 byte[] short_bytes = BitConverter.GetBytes( (short)value );
 
  370                 foreach ( byte b 
in short_bytes )
 
  380             public void addLong( 
long? value )
 
  383                 this.willBufferOverflow( 8 );  
 
  389                     this.add( ( byte ) 0 );  
 
  390                     this.add( ( byte ) 0 );  
 
  391                     this.add( ( byte ) 0 );  
 
  392                     this.add( ( byte ) 0 );  
 
  393                     this.add( ( byte ) 0 );  
 
  394                     this.add( ( byte ) 0 );  
 
  395                     this.add( ( byte ) 0 );  
 
  396                     this.add( ( byte ) 0 );  
 
  401                 byte[] long_bytes = BitConverter.GetBytes( (long)value );
 
  404                 foreach ( byte b 
in long_bytes )
 
  413             public void addFloat( 
float? value )
 
  416                 this.willBufferOverflow( 4 );  
 
  422                     this.add( ( byte ) 0.0f );  
 
  423                     this.add( ( byte ) 0.0f );  
 
  424                     this.add( ( byte ) 0.0f );  
 
  425                     this.add( ( byte ) 0.0f );  
 
  430                 byte[] float_bytes = BitConverter.GetBytes( (float)value );
 
  433                 foreach ( byte b 
in float_bytes )
 
  443             public void addDouble( 
double? value )
 
  446                 this.willBufferOverflow( 8 );  
 
  452                     this.add( ( byte ) 0.0 );  
 
  453                     this.add( ( byte ) 0.0 );  
 
  454                     this.add( ( byte ) 0.0 );  
 
  455                     this.add( ( byte ) 0.0 );  
 
  456                     this.add( ( byte ) 0.0 );  
 
  457                     this.add( ( byte ) 0.0 );  
 
  458                     this.add( ( byte ) 0.0 );  
 
  459                     this.add( ( byte ) 0.0 );  
 
  464                 byte[] double_bytes = BitConverter.GetBytes( (double)value );
 
  467                 foreach ( byte b 
in double_bytes )
 
  478             public void addString( 
string value )
 
  488                 MurMurHash3.LongPair murmur = 
new MurMurHash3.LongPair();
 
  489                 System.Text.Encoding encoding = 
new System.Text.UTF8Encoding();
 
  490                 byte[] input = encoding.GetBytes( value );
 
  491                 MurMurHash3.murmurhash3_x64_128( input, 0, (uint)input.Length, 10, out murmur );
 
  494                 this.addLong( murmur.val1 );
 
  507             public void addCharN( 
string value, 
int N )
 
  510                 this.willBufferOverflow( N );
 
  518                     for ( 
int i = 0; i < N; ++i )
 
  520                         this.add( (byte) 0 );
 
  526                 byte[] bytes = System.Text.Encoding.UTF8.GetBytes( value );
 
  527                 int byte_count = bytes.GetLength( 0 );
 
  530                 if ( byte_count > N )
 
  537                 for ( 
int i = N; i > byte_count; --i )
 
  539                     this.add( (byte) 0 );
 
  543                 for ( 
int i = ( byte_count - 1 ); i >= 0; --i )
 
  545                     this.add( bytes[i] );
 
  556             public void addDate( 
string value )
 
  559                 this.isBufferFull( true );
 
  569                 Match match = DATE_REGEX.Match( value );
 
  570                 if ( !match.Success )
 
  573                     this.is_valid = 
false;
 
  579                 int year, month, day;
 
  581                 System.Globalization.GregorianCalendar calendar = 
new System.Globalization.GregorianCalendar();
 
  586                     year  = int.Parse( match.Groups[ 1 ].ToString() );
 
  587                     month = int.Parse( match.Groups[ 2 ].ToString() );
 
  588                     day   = int.Parse( match.Groups[ 3 ].ToString() );
 
  589                     date  = 
new DateTime( year, month, day, calendar );
 
  591                 catch ( Exception ex )
 
  595                     this.is_valid = 
false;
 
  600                 if ( ( year < MIN_SUPPORTED_YEAR ) || ( year > MAX_SUPPORTED_YEAR ) )
 
  603                     this.is_valid = 
false;
 
  608                 int date_integer = ( ((year - MIN_SUPPORTED_YEAR) << 21)
 
  611                                      | (calendar.GetDayOfYear( date ) << 3)
 
  612                                      | (int)calendar.GetDayOfWeek( date ) );
 
  613                 this.addInt( date_integer );
 
  623             public void addDecimal( 
string value )
 
  626                 this.isBufferFull( true );
 
  636                 Match match = DECIMAL_REGEX.Match( value );
 
  637                 if ( !match.Success )
 
  640                     this.is_valid = 
false;
 
  650                     string integral_part_str   = match.Groups[ 
"int"  ].Value;
 
  651                     string fractional_part_str = match.Groups[ 
"frac" ].Value;
 
  653                     long fractional_part;
 
  654                     bool has_integral_part   = long.TryParse( integral_part_str,   out integral_part   );
 
  655                     bool has_fractional_part = long.TryParse( fractional_part_str, out fractional_part );
 
  657                     int fractional_part_len = fractional_part_str.Length;
 
  658                     integral_part = integral_part * (long)Math.Pow(10, fractional_part_len );
 
  660                     decimal_value = integral_part + fractional_part;
 
  662                 catch ( Exception ex )
 
  666                     this.is_valid = 
false;
 
  671                 this.addLong( decimal_value );
 
  681             public void addIPv4( 
string value )
 
  684                 this.isBufferFull( true );
 
  694                 Match match = IPV4_REGEX.Match( value );
 
  695                 if ( !match.Success )
 
  698                     this.is_valid = 
false;
 
  709                     a = int.Parse( match.Groups[ 
"a" ].Value );
 
  710                     b = int.Parse( match.Groups[ 
"b" ].Value );
 
  711                     c = int.Parse( match.Groups[ 
"c" ].Value );
 
  712                     d = int.Parse( match.Groups[ 
"d" ].Value );
 
  714                 catch ( Exception ex )
 
  718                     this.is_valid = 
false;
 
  724                 if ( ( a > 255 ) || ( b > 255 ) || ( c > 255 ) || ( d > 255 ) )
 
  727                     this.is_valid = 
false;
 
  732                 int ipv4_integer = ( (a << 24) | (b << 16) | (c <<  8) |  d );
 
  733                 this.addInt( ipv4_integer );
 
  743             public void addTime( 
string value )
 
  746                 this.isBufferFull( true );
 
  756                 Match match = TIME_REGEX.Match( value );
 
  757                 if ( !match.Success )
 
  760                     this.is_valid = 
false;
 
  766                 uint hour, minute, second, milliseconds;
 
  771                     hour   = uint.Parse( match.Groups[
"hour"].Value );
 
  772                     minute = uint.Parse( match.Groups[
"minute"].Value );
 
  773                     second = uint.Parse( match.Groups[
"seconds"].Value );
 
  774                     Group msec_group = match.Groups[
"milliseconds"];
 
  775                     if ( msec_group.Success )
 
  776                         milliseconds = uint.Parse( msec_group.Value );
 
  780                 catch ( Exception ex )
 
  784                     this.is_valid = 
false;
 
  789                 if ( ( hour > 23 ) || ( minute > 59 ) || ( second > 59 ) )
 
  792                     this.is_valid = 
false;
 
  797                 int time_integer = (int)( (hour << 26) | (minute << 20) | (second << 14) | (milliseconds << 4) );
 
  798                 this.addInt( time_integer );
 
  806             public void addTimeStamp( 
long? value )
 
  816                 System.Globalization.GregorianCalendar calendar = 
new System.Globalization.GregorianCalendar();
 
  817                 DateTime time = MIN_DATE.AddMilliseconds( (double) value );
 
  818                 long timestamp = (long) ( ((time.Year - 1900) << 53)
 
  819                                           | ((time.Month + 1) << 49)
 
  822                                           | (time.Minute << 33)
 
  823                                           | (time.Second << 27)
 
  824                                           | (time.Millisecond << 17)
 
  825                                           | (time.DayOfYear << 8)
 
  826                                           | ((int)time.DayOfWeek << 5) );
 
  827                 this.addLong( timestamp );
 
  838             public void computHashes()
 
  841                 if ( this.current_size != this.buffer_size )
 
  842                     throw new KineticaException( 
"The RecordKey buffer is not full; check that all the relevant values have been added." );
 
  845                 MurMurHash3.LongPair murmur = 
new MurMurHash3.LongPair();
 
  846                 MurMurHash3.murmurhash3_x64_128( this.buffer, 0, ( uint ) this.buffer_size, 10, out murmur );
 
  849                 this.routingHash = murmur.val1;
 
  850                 this.hash_code = ( int ) ( this.routingHash ^ ((this.routingHash >> 32) & 0x0000ffffL));
 
  861             public int route( IList<int> routingTable )
 
  866                 return (routingTable[ Math.Abs( ( 
int ) ( 
this.routingHash % routingTable.Count ) ) ] - 1);
 
  876         private sealed 
class RecordKeyBuilder<T>
 
  881             private enum ColumnType
 
  908             private KineticaType ktype;
 
  909             private IList<int> routing_column_indices;
 
  910             private IList<ColumnType> column_types;
 
  911             private int buffer_size;
 
  913             public RecordKeyBuilder( 
bool is_primary_key, KineticaType ktype )
 
  917                 this.buffer_size = 0;
 
  918                 routing_column_indices = 
new List<int>();
 
  919                 column_types = 
new List<ColumnType>();
 
  924                 bool has_timestamp = 
false;
 
  927                 int track_id_column_idx = -1;  
 
  931                 IList<KineticaType.Column> columns = ktype.getColumns();
 
  932                 for ( 
int i = 0; i < columns.Count; ++i )
 
  935                     KineticaType.Column column = columns[ i ];
 
  938                     switch ( column.getName() )
 
  941                             track_id_column_idx = i;
 
  945                             has_timestamp = 
true;
 
  961                         routing_column_indices.Add( i );
 
  965                         routing_column_indices.Add( i );
 
  971                     && has_timestamp && has_x && has_y && ( track_id_column_idx != -1 ) )
 
  973                     if ( routing_column_indices.Count == 0 )
 
  975                         routing_column_indices.Add( track_id_column_idx );
 
  977                     else if ( ( routing_column_indices.Count != 1 )
 
  978                               || ( routing_column_indices[0] != track_id_column_idx ) )
 
  981                         throw new KineticaException( 
"Cannot have a shard key other than 'TRACKID' for track tables." );
 
  988                 foreach ( 
int i 
in routing_column_indices )
 
  991                     KineticaType.Column column = columns[ i ];
 
  993                     switch ( column.getType() )
 
  996                         case KineticaType.Column.ColumnType.FLOAT:
 
  998                             column_types.Add( ColumnType.FLOAT );
 
  999                             this.buffer_size += 4;
 
 1002                         case KineticaType.Column.ColumnType.DOUBLE:
 
 1004                             column_types.Add( ColumnType.DOUBLE );
 
 1005                             this.buffer_size += 8;
 
 1009                         case KineticaType.Column.ColumnType.INT:
 
 1014                                 column_types.Add( ColumnType.INT8 );
 
 1015                                 this.buffer_size += 1;
 
 1019                                 column_types.Add( ColumnType.INT16 );
 
 1020                                 this.buffer_size += 2;
 
 1024                                 column_types.Add( ColumnType.INT );
 
 1025                                 this.buffer_size += 4;
 
 1030                         case KineticaType.Column.ColumnType.LONG:
 
 1035                                 column_types.Add( ColumnType.TIMESTAMP );
 
 1039                                 column_types.Add( ColumnType.LONG );
 
 1041                             this.buffer_size += 8;
 
 1045                         case KineticaType.Column.ColumnType.STRING:
 
 1049                                 column_types.Add( ColumnType.CHAR1 );
 
 1050                                 this.buffer_size += 1;
 
 1054                                 column_types.Add( ColumnType.CHAR2 );
 
 1055                                 this.buffer_size += 2;
 
 1059                                 column_types.Add( ColumnType.CHAR4 );
 
 1060                                 this.buffer_size += 4;
 
 1064                                 column_types.Add( ColumnType.CHAR8 );
 
 1065                                 this.buffer_size += 8;
 
 1069                                 column_types.Add( ColumnType.CHAR16 );
 
 1070                                 this.buffer_size += 16;
 
 1074                                 column_types.Add( ColumnType.CHAR32 );
 
 1075                                 this.buffer_size += 32;
 
 1079                                 column_types.Add( ColumnType.CHAR64 );
 
 1080                                 this.buffer_size += 64;
 
 1084                                 column_types.Add( ColumnType.CHAR128 );
 
 1085                                 this.buffer_size += 128;
 
 1089                                 column_types.Add( ColumnType.CHAR256 );
 
 1090                                 this.buffer_size += 256;
 
 1094                                 column_types.Add( ColumnType.DATE );
 
 1095                                 this.buffer_size += 4;
 
 1099                                 column_types.Add( ColumnType.DECIMAL );
 
 1100                                 this.buffer_size += 8;
 
 1104                                 column_types.Add( ColumnType.IPV4 );
 
 1105                                 this.buffer_size += 4;
 
 1109                                 column_types.Add( ColumnType.TIME );
 
 1110                                 this.buffer_size += 4;
 
 1114                                 column_types.Add( ColumnType.STRING );
 
 1115                                 this.buffer_size += 8;
 
 1121                         case KineticaType.Column.ColumnType.BYTES:
 
 1122                         case KineticaType.Column.ColumnType.DEFAULT:
 
 1123                             throw new KineticaException( $
"Cannot use column '{column.getName()}' as a key."  );
 
 1136             public RecordKey build( T record )
 
 1139                 if ( this.buffer_size == 0 )
 
 1143                 RecordKey key = 
new RecordKey( this.buffer_size );
 
 1146                 for ( 
int i = 0; i < this.routing_column_indices.Count; ++i )
 
 1149                     KineticaType.Column column = this.ktype.getColumns()[ this.routing_column_indices[ i ] ];
 
 1152                     var value = record.GetType().GetProperty( column.getName() ).GetValue( record, null );
 
 1154                     switch ( this.column_types[i] )
 
 1156                         case ColumnType.CHAR1:
 
 1157                             key.addCharN( (string) value, 1 );
 
 1160                         case ColumnType.CHAR2:
 
 1161                             key.addCharN( ( string ) value, 2 );
 
 1164                         case ColumnType.CHAR4:
 
 1165                             key.addCharN( ( string ) value, 4 );
 
 1168                         case ColumnType.CHAR8:
 
 1169                             key.addCharN( ( string ) value, 8 );
 
 1172                         case ColumnType.CHAR16:
 
 1173                             key.addCharN( ( string ) value, 16 );
 
 1176                         case ColumnType.CHAR32:
 
 1177                             key.addCharN( ( string ) value, 32 );
 
 1180                         case ColumnType.CHAR64:
 
 1181                             key.addCharN( ( string ) value, 64 );
 
 1184                         case ColumnType.CHAR128:
 
 1185                             key.addCharN( ( string ) value, 128 );
 
 1188                         case ColumnType.CHAR256:
 
 1189                             key.addCharN( ( string ) value, 256 );
 
 1192                         case ColumnType.DATE:
 
 1193                             key.addDate( (string) value );
 
 1196                         case ColumnType.DECIMAL:
 
 1197                             key.addDecimal( (string) value );
 
 1200                         case ColumnType.DOUBLE:
 
 1201                             key.addDouble( ( 
double? ) value );
 
 1204                         case ColumnType.FLOAT:
 
 1205                             key.addFloat( ( 
float? ) value );
 
 1208                         case ColumnType.INT:
 
 1209                             key.addInt( ( 
int? ) value );
 
 1212                         case ColumnType.INT8:
 
 1213                             key.addInt8( ( 
int? ) value );
 
 1216                         case ColumnType.INT16:
 
 1217                             key.addInt16( ( 
int? ) value );
 
 1220                         case ColumnType.IPV4:
 
 1221                             key.addIPv4( ( string ) value );
 
 1224                         case ColumnType.LONG:
 
 1225                             key.addLong( ( 
long? ) value );
 
 1228                         case ColumnType.STRING:
 
 1229                             key.addString( ( string ) value );
 
 1232                         case ColumnType.TIME:
 
 1233                             key.addTime( ( string ) value );
 
 1236                         case ColumnType.TIMESTAMP:
 
 1237                             key.addTimeStamp( ( 
long? ) value );
 
 1254             public bool hasKey()
 
 1257                 return !(this.routing_column_indices.Count == 0);
 
 1266             public bool hasSameKey( RecordKeyBuilder<T> other)
 
 1268                 return this.column_types.Equals( other.column_types );
 
 1275         private sealed 
class WorkerQueue<T>
 
 1277             public System.Uri url { 
get; 
private set; }
 
 1278             private int capacity;
 
 1279             private bool has_primary_key;
 
 1280             private bool update_on_existing_pk;
 
 1281             private IList<T> queue;
 
 1282             private Dictionary<RecordKey, int> primary_key_map;
 
 1292             public WorkerQueue( System.Uri url, 
int capacity, 
bool has_primary_key, 
bool update_on_existing_pk )
 
 1295                 this.capacity = capacity;
 
 1296                 this.has_primary_key       = has_primary_key;
 
 1297                 this.update_on_existing_pk = update_on_existing_pk;
 
 1299                 queue = 
new List<T>();
 
 1303                 if ( this.has_primary_key )
 
 1304                     primary_key_map = 
new Dictionary<RecordKey, int>( (int)Math.Round( 
this.capacity/0.75 ) );
 
 1313             public IList<T> flush()
 
 1315                 IList<T> old_queue = this.queue;
 
 1316                 queue = 
new List<T>( this.capacity );
 
 1319                 if ( this.primary_key_map != null )
 
 1320                     this.primary_key_map.Clear();
 
 1334             public IList<T> insert( T record, RecordKey key )
 
 1336                 if ( this.has_primary_key && key.isValid() )
 
 1339                     if ( this.update_on_existing_pk )
 
 1343                         if ( this.primary_key_map.TryGetValue( key, out key_idx ) )
 
 1346                             this.queue[key_idx] = record;
 
 1350                             this.queue.Add( record );
 
 1351                             this.primary_key_map.Add( key, ( this.queue.Count - 1 ) );
 
 1356                         if ( this.primary_key_map.ContainsKey( key ) )
 
 1361                         this.queue.Add( record );
 
 1362                         this.primary_key_map.Add( key, ( this.queue.Count - 1 ) );
 
 1367                     queue.Add( record );
 
 1371                 if ( queue.Count == capacity )
 
 1384         public string table_name { 
get; }
 
 1385         public int batch_size { 
get; }
 
 1386         public IDictionary<string, string> options { 
get; }
 
 1391         private RecordKeyBuilder<T> primary_key_builder;
 
 1392         private RecordKeyBuilder<T> shard_key_builder;
 
 1393         private IList<int> routing_table;
 
 1394         private IList<WorkerQueue<T>> worker_queues;
 
 1395         private Random random;
 
 1409                                  Dictionary<string, string> options = null,
 
 1410                                  WorkerList workers = null )
 
 1412             this.kineticaDB = kdb;
 
 1413             this.table_name = table_name;
 
 1417             if ( batch_size < 1 )
 
 1418                 throw new KineticaException( $
"Batch size must be greater than one; given {batch_size}." );
 
 1419             this.batch_size = batch_size;
 
 1422             if ( options != null )
 
 1424                 this.options = options;
 
 1429                 this.options = null;
 
 1434             this.primary_key_builder = 
new RecordKeyBuilder<T>( 
true,  this.ktype );
 
 1435             this.shard_key_builder   = 
new RecordKeyBuilder<T>( 
false, this.ktype );
 
 1438             if ( this.primary_key_builder.hasKey() )
 
 1441                 if ( !this.shard_key_builder.hasKey()
 
 1442                      || this.shard_key_builder.hasSameKey( this.primary_key_builder ) )
 
 1443                     this.shard_key_builder = this.primary_key_builder; 
 
 1447                 this.primary_key_builder = null;
 
 1450                 if ( !this.shard_key_builder.hasKey() )
 
 1451                     this.shard_key_builder = null;
 
 1459             bool update_on_existing_pk = ( (options != null)
 
 1463             bool has_primary_key = (this.primary_key_builder != null);
 
 1464             this.worker_queues = 
new List<WorkerQueue<T>>();
 
 1468                 if ( ( workers == null ) || ( workers.Count == 0 ) )
 
 1470                     workers = 
new WorkerList( kdb );
 
 1475                 if ( ( workers != null ) && ( workers.Count > 0 ) )
 
 1478                     foreach ( System.Uri worker_url in workers )
 
 1480                         WorkerQueue<T> worker_queue = 
new WorkerQueue<T>( worker_url, batch_size, has_primary_key, update_on_existing_pk );
 
 1481                         this.worker_queues.Add( worker_queue );
 
 1485                     this.routing_table = kdb.adminGetShardAssignments().shard_assignments_rank;
 
 1487                     for ( 
int i = 0; i < routing_table.Count; ++i )
 
 1489                         if ( this.routing_table[i] > this.worker_queues.Count )
 
 1495                     System.Uri url = 
new System.Uri( kdb.Url + 
"/insert/records" );
 
 1496                     WorkerQueue<T> worker_queue = 
new WorkerQueue<T>( url, batch_size, has_primary_key, update_on_existing_pk );
 
 1497                     this.worker_queues.Add( worker_queue );
 
 1498                     this.routing_table = null;
 
 1501             catch ( Exception ex )
 
 1507             this.random = 
new Random( (
int) DateTime.Now.Ticks );
 
 1518             return System.Threading.Interlocked.Read( ref this.count_inserted );
 
 1529             return System.Threading.Interlocked.Read( ref this.count_updated );
 
 1544             foreach ( WorkerQueue<T> worker_queue 
in this.worker_queues )
 
 1547                 IList<T> queue = worker_queue.flush();
 
 1549                 flush( queue, worker_queue.url );
 
 1560         private void flush( IList<T> queue, System.Uri url )
 
 1562             if ( queue.Count == 0 )
 
 1570                 IList<byte[]> encoded_queue = 
new List<byte[]>();
 
 1571                 foreach ( var record 
in queue ) encoded_queue.Add( this.kineticaDB.AvroEncode( record ) );
 
 1579                     response = this.kineticaDB.insertRecordsRaw( request );
 
 1583                     response = this.kineticaDB.SubmitRequest<InsertRecordsResponse>( url, request );
 
 1587                 System.Threading.Interlocked.Add( ref this.count_inserted, response.count_inserted );
 
 1588                 System.Threading.Interlocked.Add( ref this.count_updated, response.count_updated );
 
 1590             catch ( Exception ex )
 
 1592                 throw new InsertException<T>( url, queue, ex.ToString() );
 
 1611             RecordKey primary_key = null;  
 
 1612             RecordKey shard_key = null;    
 
 1615             if ( this.primary_key_builder != null )
 
 1616                 primary_key = this.primary_key_builder.build( record );
 
 1619             if ( this.shard_key_builder != null )
 
 1620                 shard_key = this.shard_key_builder.build( record );
 
 1624             WorkerQueue<T> worker_queue;
 
 1625             if ( this.routing_table == null )
 
 1627                 worker_queue = this.worker_queues[0];
 
 1629             else if ( shard_key == null )
 
 1631                 worker_queue = this.worker_queues[ random.Next( this.worker_queues.Count ) ];
 
 1635                 int worker_index = shard_key.route( this.routing_table );
 
 1636                 worker_queue = this.worker_queues[worker_index];
 
 1640             IList<T> queue = worker_queue.insert( record, primary_key );
 
 1644             if ( queue != null )
 
 1646                 this.flush( queue, worker_queue.url );
 
 1668             for ( 
int i = 0; i < records.Count; ++i )
 
 1672                     this.insert( records[ i ] );
 
 1674                 catch ( InsertException<T> ex )
 
 1678                     IList<T> queue = ex.records;
 
 1680                     for ( 
int j = i + 1; j < records.Count; ++j )
 
 1682                         queue.Add( records[ j ] );
 
InsertException(string msg)
const string CHAR1
This property provides optimized memory, disk and query performance for string columns. 
A set of results returned by /show/system/properties. 
const string INT16
This property provides optimized memory and query performance for int columns. 
const string PRIMARY_KEY
This property indicates that this column will be part of (or the entire) primary key. 
const string CHAR128
This property provides optimized memory, disk and query performance for string columns. 
A set of parameters for /insert/records. 
WorkerList(Kinetica db, Regex ip_regex=null)
Creates a WorkerList object and automatically populates it with the worker URLs from GPUdb to support...
WorkerList()
Creates an empty WorkerList that can be populated manually with worker URLs to support multi-head ing...
void insert(T record)
Queues a record for insertion into Kinetica. 
KineticaIngestor(Kinetica kdb, string table_name, int batch_size, KineticaType ktype, Dictionary< string, string > options=null, WorkerList workers=null)
const string TIMESTAMP
Valid only for 'long' columns. 
const string CHAR16
This property provides optimized memory, disk and query performance for string columns. 
Int64 getCountInserted()
Returns the count of records inserted so far. 
const string CHAR64
This property provides optimized memory, disk and query performance for string columns. 
const string CHAR4
This property provides optimized memory, disk and query performance for string columns. 
const string CHAR2
This property provides optimized memory, disk and query performance for string columns. 
const string DATE
Valid only for 'string' columns. 
void insert(IList< T > records)
Queues a list of records for insertion into Kientica. 
const string CHAR8
This property provides optimized memory, disk and query performance for string columns. 
const string DECIMAL
Valid only for 'string' columns. 
void flush()
Ensures that all queued records are inserted into Kinetica. 
const string CHAR32
This property provides optimized memory, disk and query performance for string columns. 
const string IPV4
This property provides optimized memory, disk and query performance for string columns representing I...
const string CHAR256
This property provides optimized memory, disk and query performance for string columns. 
A set of parameters for /insert/records. 
const string SHARD_KEY
This property indicates that this column will be part of (or the entire) shard key. 
const string INT8
This property provides optimized memory and query performance for int columns. 
Int64 getCountUpdated()
Returns the count of records updated so far. 
A set of results returned by /insert/records.
const string TIME
Valid only for 'string' columns. 
API to talk to Kinetica Database 
override string ToString()