2 using System.Collections.Generic;
3 using System.Text.RegularExpressions;
21 public Uri
url {
get;
private set; }
22 public IList<T>
records {
get;
private set; }
23 private string message;
27 internal InsertException( Uri url_, IList<T> records_,
string msg ) : base ( msg )
34 public override string ToString() {
return "InsertException: " + message; }
71 string multi_head_ingestion_param;
73 if ( multi_head_ingestion_param == null )
78 if ( !is_multi_head_ingest_enabled )
82 string worker_ips_str, worker_ports_str;
87 if ( worker_ips_str.Length == 0 )
89 if ( worker_ports_str.Length == 0 )
95 string[] worker_ip_lists = worker_ips_str.Split(
';' );
96 string[] worker_ports = worker_ports_str.Split(
';' );
99 if ( worker_ip_lists.Length != worker_ports.Length )
106 for (
int i = 1; i < worker_ip_lists.Length; ++i )
108 string ip_list = worker_ip_lists[ i ];
111 string[] ips = ip_list.Split(
',' );
113 bool matching_ip_found =
false;
116 foreach (
string ip
in ips )
122 if ( ip_regex != null )
123 matching_ip_found = ip_regex.IsMatch( ip );
125 matching_ip_found =
true;
127 if ( matching_ip_found )
129 UriBuilder uri_builder =
new UriBuilder(
"http", ip, Int32.Parse( worker_ports[ i ] ),
"insert/records" );
130 Uri
url = uri_builder.Uri;
137 catch ( Exception ex )
143 if ( !matching_ip_found )
148 if ( this.Count == 0 )
161 private sealed
class RecordKey
166 private static readonly Regex DATE_REGEX =
new Regex(
"\\A(\\d{4})-(\\d{2})-(\\d{2})$" );
171 private static readonly Regex DATETIME_REGEX =
new Regex(
"\\A(?<year>\\d{4})-(?<month>\\d{2})-(?<day>\\d{2})(?<time>\\s+(?<hour>\\d{1,2}):(?<min>\\d{2}):(?<sec>\\d{2})(?:\\.(?<ms>\\d{1,6}))?)?$" );
176 private static readonly Regex DECIMAL_REGEX =
new Regex(
"\\A\\s*(?<sign>[+-]?)((?<int>\\d+)(\\.(?<intfrac>\\d{0,4}))?|\\.(?<onlyfrac>\\d{1,4}))\\s*\\z" );
181 private static readonly Regex IPV4_REGEX =
new Regex(
"\\A(?<a>\\d{1,3})\\.(?<b>\\d{1,3})\\.(?<c>\\d{1,3})\\.(?<d>\\d{1,3})$" );
186 private static readonly Regex TIME_REGEX =
new Regex(
"\\A(?<hour>\\d{1,2}):(?<minute>\\d{2}):(?<seconds>\\d{2})(\\.(?<milliseconds>\\d{1,3}))?$" );
191 private static readonly DateTime EPOCH_DATE =
new DateTime( 1970, 1, 1 );
196 private static readonly
int MIN_SUPPORTED_YEAR = 1000;
201 private static readonly
int MAX_SUPPORTED_YEAR = 2900;
206 private static readonly
int YEAR_1900 = 1900;
211 private static readonly TimeZoneInfo UTC = TimeZoneInfo.Utc;
213 private readonly byte[] buffer;
214 private readonly
int buffer_size;
215 private int current_size;
216 private int hash_code;
217 private bool is_valid;
218 private long routingHash;
225 public RecordKey(
int size )
229 +
"Size given: " + size );
232 buffer =
new byte[size];
233 this.is_valid =
true;
240 public bool isValid()
242 return this.is_valid;
249 public int hashCode()
251 return this.hash_code;
262 private bool isBufferFull(
bool throw_if_full =
true )
264 if ( this.current_size == this.buffer_size )
284 private bool willBufferOverflow(
int n,
bool throw_if_overflow =
true )
287 if ( (this.current_size + n) > this.buffer_size )
289 if ( throw_if_overflow )
290 throw new KineticaException( $
"The buffer (of size {buffer_size}) does not have sufficient room in it to put {n} more byte(s) (current size is {this.current_size})." );
303 private void add( byte b )
306 buffer.SetValue( b, current_size++ );
315 public void addInt(
int? value )
318 this.willBufferOverflow( 4 );
324 this.add( ( byte ) 0 );
325 this.add( ( byte ) 0 );
326 this.add( ( byte ) 0 );
327 this.add( ( byte ) 0 );
332 byte[] int_bytes = BitConverter.GetBytes( (
int)value );
335 foreach ( byte b
in int_bytes )
344 public void addInt8(
int? value )
347 this.willBufferOverflow( 1 );
353 this.add( ( byte ) 0 );
358 this.add( (byte)value );
366 public void addInt16(
int? value )
369 this.willBufferOverflow( 2 );
375 this.add( ( byte ) 0 );
376 this.add( ( byte ) 0 );
381 byte[] short_bytes = BitConverter.GetBytes( (
short)value );
384 foreach ( byte b
in short_bytes )
394 public void addLong(
long? value )
397 this.willBufferOverflow( 8 );
403 this.add( ( byte ) 0 );
404 this.add( ( byte ) 0 );
405 this.add( ( byte ) 0 );
406 this.add( ( byte ) 0 );
407 this.add( ( byte ) 0 );
408 this.add( ( byte ) 0 );
409 this.add( ( byte ) 0 );
410 this.add( ( byte ) 0 );
415 byte[] long_bytes = BitConverter.GetBytes( (
long)value );
418 foreach ( byte b
in long_bytes )
427 public void addFloat(
float? value )
430 this.willBufferOverflow( 4 );
436 this.add( ( byte ) 0.0f );
437 this.add( ( byte ) 0.0f );
438 this.add( ( byte ) 0.0f );
439 this.add( ( byte ) 0.0f );
444 byte[] float_bytes = BitConverter.GetBytes( (
float)value );
447 foreach ( byte b
in float_bytes )
457 public void addDouble(
double? value )
460 this.willBufferOverflow( 8 );
466 this.add( ( byte ) 0.0 );
467 this.add( ( byte ) 0.0 );
468 this.add( ( byte ) 0.0 );
469 this.add( ( byte ) 0.0 );
470 this.add( ( byte ) 0.0 );
471 this.add( ( byte ) 0.0 );
472 this.add( ( byte ) 0.0 );
473 this.add( ( byte ) 0.0 );
478 byte[] double_bytes = BitConverter.GetBytes( (
double)value );
481 foreach ( byte b
in double_bytes )
492 public void addString(
string value )
503 System.Text.Encoding encoding =
new System.Text.UTF8Encoding();
504 byte[] input = encoding.GetBytes( value );
508 this.addLong( murmur.val1 );
521 public void addCharN(
string value,
int N )
524 this.willBufferOverflow( N );
532 for (
int i = 0; i < N; ++i )
534 this.add( (byte) 0 );
540 byte[] bytes =
System.Text.Encoding.UTF8.GetBytes( value );
541 int byte_count = bytes.GetLength( 0 );
544 if ( byte_count > N )
551 for (
int i = N; i > byte_count; --i )
553 this.add( (byte) 0 );
557 for (
int i = ( byte_count - 1 ); i >= 0; --i )
559 this.add( bytes[i] );
570 public void addDate(
string value )
573 this.isBufferFull(
true );
583 Match match = DATE_REGEX.Match( value );
584 if ( !match.Success )
587 this.is_valid =
false;
593 int year, month, day;
595 System.Globalization.GregorianCalendar calendar =
new System.Globalization.GregorianCalendar();
600 year =
int.Parse( match.Groups[ 1 ].ToString() );
601 month =
int.Parse( match.Groups[ 2 ].ToString() );
602 day =
int.Parse( match.Groups[ 3 ].ToString() );
603 date =
new DateTime( year, month, day, calendar );
605 catch ( Exception ex )
609 this.is_valid =
false;
614 if ( ( year < MIN_SUPPORTED_YEAR ) || ( year > MAX_SUPPORTED_YEAR ) )
617 this.is_valid =
false;
621 int fixed_day_of_week = ( ( int ) calendar.GetDayOfWeek( date ) + 1 );
624 int date_integer = ( ((year - YEAR_1900) << 21)
627 | (calendar.GetDayOfYear( date ) << 3)
628 | fixed_day_of_week );
629 this.addInt( date_integer );
639 public void addDateTime(
string value )
642 this.isBufferFull(
true );
652 Match match = DATETIME_REGEX.Match( value );
653 if ( !match.Success )
656 this.is_valid =
false;
663 int year, month, day;
669 System.Globalization.GregorianCalendar calendar =
new System.Globalization.GregorianCalendar();
674 year =
int.Parse( match.Groups[
"year" ].Value );
675 month =
int.Parse( match.Groups[
"month" ].Value );
676 day =
int.Parse( match.Groups[
"day" ].Value );
679 Group time_group = match.Groups[
"time" ];
680 if ( time_group.Success )
682 hour =
int.Parse( match.Groups[
"hour"].Value );
683 minute =
int.Parse( match.Groups[
"min"].Value );
684 second =
int.Parse( match.Groups[
"sec"].Value );
687 Group ms_group = match.Groups[
"ms"];
688 if (ms_group.Success)
690 msecond =
int.Parse(match.Groups[
"ms"].Value);
692 switch (ms_group.Value.Length)
695 msecond *= 100;
break;
697 msecond *= 10;
break;
700 msecond /= 10;
break;
702 msecond /= 100;
break;
704 msecond /= 1000;
break;
710 date =
new DateTime( year, month, day, hour, minute, second, msecond, calendar );
712 catch ( Exception ex )
716 this.is_valid =
false;
721 if ( ( year < MIN_SUPPORTED_YEAR ) || ( year > MAX_SUPPORTED_YEAR ) )
724 this.is_valid =
false;
728 int fixed_day_of_week = ( ( int ) calendar.GetDayOfWeek( date ) + 1 );
731 long datetime_long = (long) ( (((
long)(year - YEAR_1900)) << 53)
732 | (((
long)month) << 49)
733 | (((
long)day) << 44)
734 | (((
long)hour) << 39)
735 | (((
long)minute) << 33)
736 | (((
long)second) << 27)
737 | (((
long)msecond) << 17)
738 | (((
long)calendar.GetDayOfYear( date )) << 8)
739 | (((long)fixed_day_of_week) << 5) );
740 this.addLong( datetime_long );
750 public void addDecimal(
string value )
753 this.isBufferFull(
true );
763 Match match = DECIMAL_REGEX.Match( value );
764 if ( !match.Success )
767 this.is_valid =
false;
777 Group integral_group = match.Groups[
"int" ];
778 Group fraction_with_integral_group = match.Groups[
"intfrac" ];
779 Group frac_only_group = match.Groups[
"onlyfrac" ];
781 if ( integral_group.Success )
783 decimal_value =
long.Parse( integral_group.Value );
785 if ( fraction_with_integral_group.Success )
789 if (fraction_with_integral_group.Value.Length > 0 )
790 fraction =
long.Parse( fraction_with_integral_group.Value );
794 long integral_part = decimal_value * (long)Math.Pow(10, fraction_with_integral_group.Value.Length );
795 decimal_value = integral_part + fraction;
798 switch ( fraction_with_integral_group.Value.Length )
801 decimal_value *= 1000;
break;
803 decimal_value *= 100;
break;
805 decimal_value *= 10;
break;
809 else if ( frac_only_group.Success )
811 decimal_value =
long.Parse( frac_only_group.Value );
814 switch ( frac_only_group.Value.Length )
817 decimal_value *= 1000;
break;
819 decimal_value *= 100;
break;
821 decimal_value *= 10;
break;
828 Group sign_group = match.Groups[
"sign" ];
829 if ( sign_group.Success )
831 if ( sign_group.Value ==
"-" )
832 decimal_value = ( -1 ) * decimal_value;
835 catch ( Exception ex )
839 this.is_valid =
false;
844 this.addLong( decimal_value );
854 public void addIPv4(
string value )
857 this.isBufferFull(
true );
867 Match match = IPV4_REGEX.Match( value );
868 if ( !match.Success )
871 this.is_valid =
false;
882 a =
int.Parse( match.Groups[
"a" ].Value );
883 b =
int.Parse( match.Groups[
"b" ].Value );
884 c =
int.Parse( match.Groups[
"c" ].Value );
885 d =
int.Parse( match.Groups[
"d" ].Value );
887 catch ( Exception ex )
891 this.is_valid =
false;
897 if ( ( a > 255 ) || ( b > 255 ) || ( c > 255 ) || ( d > 255 ) )
900 this.is_valid =
false;
905 int ipv4_integer = ( (a << 24) | (b << 16) | (c << 8) | d );
906 this.addInt( ipv4_integer );
917 public void addTime(
string value )
920 this.isBufferFull(
true );
930 Match match = TIME_REGEX.Match( value );
931 if ( !match.Success )
934 this.is_valid =
false;
940 uint hour, minute, second, milliseconds;
945 hour = uint.Parse( match.Groups[
"hour"].Value );
946 minute = uint.Parse( match.Groups[
"minute"].Value );
947 second = uint.Parse( match.Groups[
"seconds"].Value );
948 Group msec_group = match.Groups[
"milliseconds"];
952 if (msec_group.Success)
954 milliseconds = uint.Parse(msec_group.Value);
957 switch ( msec_group.Value.Length )
960 milliseconds *= 100;
break;
962 milliseconds *= 10;
break;
966 catch ( Exception ex )
970 this.is_valid =
false;
975 if ( ( hour > 23 ) || ( minute > 59 ) || ( second > 59 ) )
978 this.is_valid =
false;
983 int time_integer = (int)( (hour << 26) | (minute << 20) | (second << 14) | (milliseconds << 4) );
984 this.addInt( time_integer );
992 public void addTimeStamp(
long? value )
1002 DateTime time = EPOCH_DATE.AddMilliseconds( (
double) value );
1003 long fixed_day_of_week = ( ( long ) time.DayOfWeek + 1 );
1005 long timestamp = (long) ( (((
long)(time.Year - YEAR_1900)) << 53)
1006 | (((
long)(time.Month)) << 49)
1007 | (((
long)time.Day) << 44)
1008 | (((long)time.Hour) << 39)
1009 | (((
long)time.Minute) << 33)
1010 | (((long)time.Second) << 27)
1011 | (((
long)time.Millisecond) << 17)
1012 | (((long)time.DayOfYear) << 8)
1013 | ( fixed_day_of_week << 5) );
1014 this.addLong( timestamp );
1025 public void computHashes()
1028 if ( this.current_size != this.buffer_size )
1029 throw new KineticaException(
"The RecordKey buffer is not full; check that all the relevant values have been added." );
1036 this.routingHash = murmur.val1;
1037 this.hash_code = ( int ) ( this.routingHash ^ ((this.routingHash >> 32) & 0x0000ffffL));
1048 public int route( IList<int> routingTable )
1053 return (routingTable[ Math.Abs( (
int ) (
this.routingHash % routingTable.Count ) ) ] - 1);
1063 private sealed
class RecordKeyBuilder<T>
1068 private enum ColumnType
1097 private IList<int> routing_column_indices;
1098 private IList<ColumnType> column_types;
1099 private int buffer_size;
1101 public RecordKeyBuilder(
bool is_primary_key,
KineticaType ktype )
1105 this.buffer_size = 0;
1106 routing_column_indices =
new List<int>();
1107 column_types =
new List<ColumnType>();
1112 bool has_timestamp =
false;
1115 int track_id_column_idx = -1;
1120 for (
int i = 0; i < columns.Count; ++i )
1126 switch ( column.getName() )
1129 track_id_column_idx = i;
1133 has_timestamp =
true;
1149 routing_column_indices.Add( i );
1153 routing_column_indices.Add( i );
1158 if ( !is_primary_key
1159 && has_timestamp && has_x && has_y && ( track_id_column_idx != -1 ) )
1161 if ( routing_column_indices.Count == 0 )
1163 routing_column_indices.Add( track_id_column_idx );
1165 else if ( ( routing_column_indices.Count != 1 )
1166 || ( routing_column_indices[0] != track_id_column_idx ) )
1169 throw new KineticaException(
"Cannot have a shard key other than 'TRACKID' for track tables." );
1176 foreach (
int i
in routing_column_indices )
1181 switch ( column.getType() )
1186 column_types.Add( ColumnType.FLOAT );
1187 this.buffer_size += 4;
1192 column_types.Add( ColumnType.DOUBLE );
1193 this.buffer_size += 8;
1202 column_types.Add( ColumnType.INT8 );
1203 this.buffer_size += 1;
1207 column_types.Add( ColumnType.INT16 );
1208 this.buffer_size += 2;
1212 column_types.Add( ColumnType.INT );
1213 this.buffer_size += 4;
1223 column_types.Add( ColumnType.TIMESTAMP );
1227 column_types.Add( ColumnType.LONG );
1229 this.buffer_size += 8;
1237 column_types.Add( ColumnType.CHAR1 );
1238 this.buffer_size += 1;
1242 column_types.Add( ColumnType.CHAR2 );
1243 this.buffer_size += 2;
1247 column_types.Add( ColumnType.CHAR4 );
1248 this.buffer_size += 4;
1252 column_types.Add( ColumnType.CHAR8 );
1253 this.buffer_size += 8;
1257 column_types.Add( ColumnType.CHAR16 );
1258 this.buffer_size += 16;
1262 column_types.Add( ColumnType.CHAR32 );
1263 this.buffer_size += 32;
1267 column_types.Add( ColumnType.CHAR64 );
1268 this.buffer_size += 64;
1272 column_types.Add( ColumnType.CHAR128 );
1273 this.buffer_size += 128;
1277 column_types.Add( ColumnType.CHAR256 );
1278 this.buffer_size += 256;
1282 column_types.Add( ColumnType.DATE );
1283 this.buffer_size += 4;
1287 column_types.Add( ColumnType.DATETIME );
1288 this.buffer_size += 8;
1292 column_types.Add( ColumnType.DECIMAL );
1293 this.buffer_size += 8;
1297 column_types.Add( ColumnType.IPV4 );
1298 this.buffer_size += 4;
1302 column_types.Add( ColumnType.TIME );
1303 this.buffer_size += 4;
1307 column_types.Add( ColumnType.STRING );
1308 this.buffer_size += 8;
1316 throw new KineticaException( $
"Cannot use column '{column.getName()}' as a key." );
1329 public RecordKey build( T record )
1332 if ( this.buffer_size == 0 )
1336 RecordKey key =
new RecordKey( this.buffer_size );
1339 for (
int i = 0; i < this.routing_column_indices.Count; ++i )
1345 var value = record.GetType().GetProperty( column.getName() ).GetValue( record, null );
1347 switch ( this.column_types[i] )
1349 case ColumnType.CHAR1:
1350 key.addCharN( (
string) value, 1 );
1353 case ColumnType.CHAR2:
1354 key.addCharN( (
string ) value, 2 );
1357 case ColumnType.CHAR4:
1358 key.addCharN( (
string ) value, 4 );
1361 case ColumnType.CHAR8:
1362 key.addCharN( (
string ) value, 8 );
1365 case ColumnType.CHAR16:
1366 key.addCharN( (
string ) value, 16 );
1369 case ColumnType.CHAR32:
1370 key.addCharN( (
string ) value, 32 );
1373 case ColumnType.CHAR64:
1374 key.addCharN( (
string ) value, 64 );
1377 case ColumnType.CHAR128:
1378 key.addCharN( (
string ) value, 128 );
1381 case ColumnType.CHAR256:
1382 key.addCharN( (
string ) value, 256 );
1385 case ColumnType.DATE:
1386 key.addDate( (
string) value );
1389 case ColumnType.DATETIME:
1390 key.addDateTime( (
string ) value );
1393 case ColumnType.DECIMAL:
1394 key.addDecimal( (
string) value );
1397 case ColumnType.DOUBLE:
1398 key.addDouble( (
double? ) value );
1401 case ColumnType.FLOAT:
1402 key.addFloat( (
float? ) value );
1405 case ColumnType.INT:
1406 key.addInt( (
int? ) value );
1409 case ColumnType.INT8:
1410 key.addInt8( (
int? ) value );
1413 case ColumnType.INT16:
1414 key.addInt16( (
int? ) value );
1417 case ColumnType.IPV4:
1418 key.addIPv4( (
string ) value );
1421 case ColumnType.LONG:
1422 key.addLong( (
long? ) value );
1425 case ColumnType.STRING:
1426 key.addString( (
string ) value );
1429 case ColumnType.TIME:
1430 key.addTime( (
string ) value );
1433 case ColumnType.TIMESTAMP:
1434 key.addTimeStamp( (
long? ) value );
1451 public bool hasKey()
1454 return !(this.routing_column_indices.Count == 0);
1463 public bool hasSameKey( RecordKeyBuilder<T> other)
1465 return this.column_types.Equals( other.column_types );
1472 private sealed
class WorkerQueue<T>
1474 public System.Uri
url {
get;
private set; }
1475 private int capacity;
1476 private bool has_primary_key;
1477 private bool update_on_existing_pk;
1478 private IList<T> queue;
1479 private Dictionary<RecordKey, int> primary_key_map;
1489 public WorkerQueue(
System.Uri
url,
int capacity,
bool has_primary_key,
bool update_on_existing_pk )
1492 this.capacity = capacity;
1493 this.has_primary_key = has_primary_key;
1494 this.update_on_existing_pk = update_on_existing_pk;
1496 queue =
new List<T>();
1500 if ( this.has_primary_key )
1501 primary_key_map =
new Dictionary<RecordKey, int>( (int)Math.Round(
this.capacity/0.75 ) );
1510 public IList<T>
flush()
1512 IList<T> old_queue = this.queue;
1513 queue =
new List<T>( this.capacity );
1516 if ( this.primary_key_map != null )
1517 this.primary_key_map.Clear();
1531 public IList<T>
insert( T record, RecordKey key )
1533 if ( this.has_primary_key && key.isValid() )
1536 if ( this.update_on_existing_pk )
1540 if ( this.primary_key_map.TryGetValue( key, out key_idx ) )
1543 this.queue[key_idx] = record;
1547 this.queue.Add( record );
1548 this.primary_key_map.Add( key, ( this.queue.Count - 1 ) );
1553 if ( this.primary_key_map.ContainsKey( key ) )
1558 this.queue.Add( record );
1559 this.primary_key_map.Add( key, ( this.queue.Count - 1 ) );
1564 queue.Add( record );
1568 if ( queue.Count == capacity )
1583 public IDictionary<string, string>
options {
get; }
1588 private RecordKeyBuilder<T> primary_key_builder;
1589 private RecordKeyBuilder<T> shard_key_builder;
1590 private IList<int> routing_table;
1591 private IList<WorkerQueue<T>> worker_queues;
1592 private Random random;
1606 Dictionary<string, string>
options = null,
1614 if ( batch_size < 1 )
1615 throw new KineticaException( $
"Batch size must be greater than one; given {batch_size}." );
1631 this.primary_key_builder =
new RecordKeyBuilder<T>(
true, this.ktype );
1632 this.shard_key_builder =
new RecordKeyBuilder<T>(
false, this.ktype );
1635 if ( this.primary_key_builder.hasKey() )
1638 if ( !this.shard_key_builder.hasKey()
1639 || this.shard_key_builder.hasSameKey( this.primary_key_builder ) )
1640 this.shard_key_builder = this.primary_key_builder;
1644 this.primary_key_builder = null;
1647 if ( !this.shard_key_builder.hasKey() )
1648 this.shard_key_builder = null;
1656 bool update_on_existing_pk = ( (
options != null)
1660 bool has_primary_key = (this.primary_key_builder != null);
1661 this.worker_queues =
new List<WorkerQueue<T>>();
1665 if ( ( workers == null ) || ( workers.Count == 0 ) )
1672 if ( ( workers != null ) && ( workers.Count > 0 ) )
1675 foreach (
System.Uri worker_url in workers )
1677 WorkerQueue<T> worker_queue =
new WorkerQueue<T>( worker_url,
batch_size, has_primary_key, update_on_existing_pk );
1678 this.worker_queues.Add( worker_queue );
1684 for (
int i = 0; i < routing_table.Count; ++i )
1686 if ( this.routing_table[i] > this.worker_queues.Count )
1693 WorkerQueue<T> worker_queue =
new WorkerQueue<T>(
url,
batch_size, has_primary_key, update_on_existing_pk );
1694 this.worker_queues.Add( worker_queue );
1695 this.routing_table = null;
1698 catch ( Exception ex )
1704 this.random =
new Random( (
int) DateTime.Now.Ticks );
1715 return System.Threading.Interlocked.Read( ref this.count_inserted );
1726 return System.Threading.Interlocked.Read( ref this.count_updated );
1741 foreach ( WorkerQueue<T> worker_queue
in this.worker_queues )
1744 IList<T> queue = worker_queue.flush();
1746 flush( queue, worker_queue.url );
1759 if ( queue.Count == 0 )
1767 IList<byte[]> encoded_queue =
new List<byte[]>();
1768 foreach ( var record
in queue ) encoded_queue.Add( this.
kineticaDB.AvroEncode( record ) );
1787 catch ( Exception ex )
1808 RecordKey primary_key = null;
1809 RecordKey shard_key = null;
1812 if ( this.primary_key_builder != null )
1813 primary_key = this.primary_key_builder.build( record );
1816 if ( this.shard_key_builder != null )
1817 shard_key = this.shard_key_builder.build( record );
1821 WorkerQueue<T> worker_queue;
1822 if ( this.routing_table == null )
1824 worker_queue = this.worker_queues[0];
1826 else if ( shard_key == null )
1828 worker_queue = this.worker_queues[ random.Next( this.worker_queues.Count ) ];
1832 int worker_index = shard_key.route( this.routing_table );
1833 worker_queue = this.worker_queues[worker_index];
1837 IList<T> queue = worker_queue.insert( record, primary_key );
1841 if ( queue != null )
1843 this.
flush( queue, worker_queue.url );
1865 for (
int i = 0; i < records.Count; ++i )
1869 this.
insert( records[ i ] );
1877 for (
int j = i + 1; j < records.Count; ++j )
1879 queue.Add( records[ j ] );
const string CONF_ENABLE_WORKER_HTTP_SERVERS
Boolean value indicating whether the system is configured for multi-head ingestion.
int count_inserted
The number of records inserted.
const string CHAR1
This property provides optimized memory, disk and query performance for string columns.
A set of results returned by Kinetica.showSystemProperties(IDictionary<string, string>).
const string UPDATE_ON_EXISTING_PK
Specifies the record collision policy for inserting into a table with a primary key.
Int64 getCountInserted()
Returns the count of records inserted so far.
const string DATETIME
Valid only for 'string' columns.
const string INT16
This property provides optimized memory and query performance for int columns.
void insert(T record)
Queues a record for insertion into Kinetica.
const string PRIMARY_KEY
This property indicates that this column will be part of (or the entire) primary key.
const string CHAR128
This property provides optimized memory, disk and query performance for string columns.
AdminShowShardsResponse adminShowShards(AdminShowShardsRequest request_)
Show the mapping of shards to the corresponding rank and tom.
WorkerList()
Creates an empty WorkerList that can be populated manually with worker URLs to support multi-head ing...
WorkerList(Kinetica db, Regex ip_regex=null)
Creates a WorkerList object and automatically populates it with the worker URLs from GPUdb to support...
string Url
URL for Kinetica Server (including "http:" and port)
A map of server configuration parameters and version information.
InsertException(string msg)
const string CONF_WORKER_HTTP_SERVER_IPS
Semicolon (';') separated string of IP addresses of all the ingestion-enabled worker heads of the sys...
int count_updated
The number of records updated.
Column properties used for Kinetica types.
const string TIMESTAMP
Valid only for 'long' columns.
const string CHAR16
This property provides optimized memory, disk and query performance for string columns.
A list of worker URLs to use for multi-head ingest.
static void murmurhash3_x64_128(byte[] key, uint offset, uint len, int seed, out LongPair output)
Returns the MurmurHash3_x64_128 hash, placing the result in output
override string ToString()
const string CHAR64
This property provides optimized memory, disk and query performance for string columns.
const string CHAR4
This property provides optimized memory, disk and query performance for string columns.
const string CHAR2
This property provides optimized memory, disk and query performance for string columns.
const string DATE
Valid only for 'string' columns.
const string CHAR8
This property provides optimized memory, disk and query performance for string columns.
const string DECIMAL
Valid only for 'string' columns.
A set of parameters for Kinetica.insertRecords<T>(string,IList<T>,IDictionary<string, string>).
const string CHAR32
This property provides optimized memory, disk and query performance for string columns.
IList< Column > getColumns()
const string IPV4
This property provides optimized memory, disk and query performance for string columns representing I...
const string CHAR256
This property provides optimized memory, disk and query performance for string columns.
void insert(IList< T > records)
Queues a list of records for insertion into Kientica.
ShowSystemPropertiesResponse showSystemProperties(ShowSystemPropertiesRequest request_)
Returns server configuration and version related information to the caller.
A set of parameters for Kinetica.insertRecords<T>(string,IList<T>,IDictionary<string, string>).
const string SHARD_KEY
This property indicates that this column will be part of (or the entire) shard key.
const string INT8
This property provides optimized memory and query performance for int columns.
const string CONF_WORKER_HTTP_SERVER_PORTS
Semicolon (';') separated string of the port numbers of all the ingestion-enabled worker ranks of the...
IDictionary< string, string > property_map
A map of server configuration parameters and version information.
IList< int > rank
Array of ranks indexed by the shard number.
const string TRUE
Indicates that the system is configured for multi-head ingestion.
A set of results returned by Kinetica.insertRecords<T>(string,IList<T>,IDictionary<string, string>).
IDictionary< string, string > options
InsertRecordsResponse insertRecordsRaw(RawInsertRecordsRequest request_)
Adds multiple records to the specified table.
const string TIME
Valid only for 'string' columns.
KineticaIngestor(Kinetica kdb, string table_name, int batch_size, KineticaType ktype, Dictionary< string, string > options=null, WorkerList workers=null)
API to talk to Kinetica Database
Int64 getCountUpdated()
Returns the count of records updated so far.
Manages the insertion into GPUdb of large numbers of records in bulk, with automatic batch management...
void flush()
Ensures that all queued records are inserted into Kinetica.