2 using System.Collections.Generic;
3 using System.Collections.Immutable;
5 using System.Text.RegularExpressions;
18 public class KineticaIngestor<T>
23 public Uri url {
get;
private set; }
24 public IList<T> records {
get;
private set; }
25 private string message;
29 internal InsertException( Uri url_, IList<T> records_,
string msg ) : base ( msg )
33 this.records = records_;
36 public override string ToString() {
return "InsertException: " + message; }
43 public sealed
class WorkerList : List<System.Uri>
70 IDictionary<string, string> system_properties = db.showSystemProperties().property_map;
73 string multi_head_ingestion_param;
74 system_properties.TryGetValue( ShowSystemPropertiesResponse.PropertyMap.CONF_ENABLE_WORKER_HTTP_SERVERS, out multi_head_ingestion_param );
75 if ( multi_head_ingestion_param == null )
77 bool is_multi_head_ingest_enabled = multi_head_ingestion_param.Equals( ShowSystemPropertiesResponse.PropertyMap.TRUE );
80 if ( !is_multi_head_ingest_enabled )
84 string worker_ips_str, worker_ports_str;
85 system_properties.TryGetValue( ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_IPS, out worker_ips_str );
86 system_properties.TryGetValue( ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_PORTS, out worker_ports_str );
89 if ( worker_ips_str.Length == 0 )
91 if ( worker_ports_str.Length == 0 )
97 string[] worker_ip_lists = worker_ips_str.Split(
';' );
98 string[] worker_ports = worker_ports_str.Split(
';' );
101 if ( worker_ip_lists.Length != worker_ports.Length )
108 for (
int i = 1; i < worker_ip_lists.Length; ++i )
110 string ip_list = worker_ip_lists[ i ];
113 string[] ips = ip_list.Split(
',' );
115 bool matching_ip_found =
false;
118 foreach (
string ip
in ips )
124 if ( ip_regex != null )
125 matching_ip_found = ip_regex.IsMatch( ip );
127 matching_ip_found =
true;
129 if ( matching_ip_found )
131 UriBuilder uri_builder =
new UriBuilder(
"http", ip, Int32.Parse( worker_ports[ i ] ),
"insert/records" );
132 Uri url = uri_builder.Uri;
139 catch ( Exception ex )
145 if ( !matching_ip_found )
150 if ( this.Count == 0 )
163 private sealed
class RecordKey
168 private static readonly Regex DATE_REGEX =
new Regex(
"\\A(\\d{4})-(\\d{2})-(\\d{2})$" );
172 private static readonly Regex DECIMAL_REGEX =
new Regex(
"\\A\\s*[+-]?((?<int>\\d+)(\\.(?<frac>\\d{0,4}))?|\\.(?<frac>\\d{1,4}))\\s*\\z" );
176 private static readonly Regex IPV4_REGEX =
new Regex(
"\\A(?<a>\\d{1,3})\\.(?<b>\\d{1,3})\\.(?<c>\\d{1,3})\\.(?<d>\\d{1,3})$" );
180 private static readonly Regex TIME_REGEX =
new Regex(
"\\A(?<hour>\\d{1,2}):(?<minute>\\d{2}):(?<seconds>\\d{2})(\\.(?<milliseconds>\\d{3}))?$" );
184 private static readonly DateTime MIN_DATE = (
new System.Globalization.GregorianCalendar()).MinSupportedDateTime;
189 private static readonly
int MIN_SUPPORTED_YEAR = 1000;
193 private static readonly
int MAX_SUPPORTED_YEAR = 2900;
197 private static readonly TimeZoneInfo UTC = TimeZoneInfo.Utc;
199 private readonly byte[] buffer;
200 private readonly
int buffer_size;
201 private int current_size;
202 private int hash_code;
203 private bool is_valid;
204 private long routingHash;
211 public RecordKey(
int size )
215 +
"Size given: " + size );
218 buffer =
new byte[size];
219 this.is_valid =
true;
226 public bool isValid()
228 return this.is_valid;
235 public int hashCode()
237 return this.hash_code;
248 private bool isBufferFull(
bool throw_if_full =
true )
250 if ( this.current_size == this.buffer_size )
253 throw new KineticaException(
"The buffer is already full!" );
270 private bool willBufferOverflow(
int n,
bool throw_if_overflow =
true )
273 if ( (this.current_size + n) > this.buffer_size )
275 if ( throw_if_overflow )
276 throw new KineticaException( $
"The buffer (of size {buffer_size}) does not have sufficient room in it to put {n} more byte(s) (current size is {this.current_size})." );
289 private void add( byte b )
292 buffer.SetValue( b, current_size++ );
301 public void addInt(
int? value )
304 this.willBufferOverflow( 4 );
310 this.add( ( byte ) 0 );
311 this.add( ( byte ) 0 );
312 this.add( ( byte ) 0 );
313 this.add( ( byte ) 0 );
318 byte[] int_bytes = BitConverter.GetBytes( (int)value );
321 foreach ( byte b
in int_bytes )
330 public void addInt8(
int? value )
333 this.willBufferOverflow( 1 );
339 this.add( ( byte ) 0 );
344 this.add( (byte)value );
352 public void addInt16(
int? value )
355 this.willBufferOverflow( 2 );
361 this.add( ( byte ) 0 );
362 this.add( ( byte ) 0 );
367 byte[] short_bytes = BitConverter.GetBytes( (short)value );
370 foreach ( byte b
in short_bytes )
380 public void addLong(
long? value )
383 this.willBufferOverflow( 8 );
389 this.add( ( byte ) 0 );
390 this.add( ( byte ) 0 );
391 this.add( ( byte ) 0 );
392 this.add( ( byte ) 0 );
393 this.add( ( byte ) 0 );
394 this.add( ( byte ) 0 );
395 this.add( ( byte ) 0 );
396 this.add( ( byte ) 0 );
401 byte[] long_bytes = BitConverter.GetBytes( (long)value );
404 foreach ( byte b
in long_bytes )
413 public void addFloat(
float? value )
416 this.willBufferOverflow( 4 );
422 this.add( ( byte ) 0.0f );
423 this.add( ( byte ) 0.0f );
424 this.add( ( byte ) 0.0f );
425 this.add( ( byte ) 0.0f );
430 byte[] float_bytes = BitConverter.GetBytes( (float)value );
433 foreach ( byte b
in float_bytes )
443 public void addDouble(
double? value )
446 this.willBufferOverflow( 8 );
452 this.add( ( byte ) 0.0 );
453 this.add( ( byte ) 0.0 );
454 this.add( ( byte ) 0.0 );
455 this.add( ( byte ) 0.0 );
456 this.add( ( byte ) 0.0 );
457 this.add( ( byte ) 0.0 );
458 this.add( ( byte ) 0.0 );
459 this.add( ( byte ) 0.0 );
464 byte[] double_bytes = BitConverter.GetBytes( (double)value );
467 foreach ( byte b
in double_bytes )
478 public void addString(
string value )
488 MurMurHash3.LongPair murmur =
new MurMurHash3.LongPair();
489 System.Text.Encoding encoding =
new System.Text.UTF8Encoding();
490 byte[] input = encoding.GetBytes( value );
491 MurMurHash3.murmurhash3_x64_128( input, 0, (uint)input.Length, 10, out murmur );
494 this.addLong( murmur.val1 );
507 public void addCharN(
string value,
int N )
510 this.willBufferOverflow( N );
518 for (
int i = 0; i < N; ++i )
520 this.add( (byte) 0 );
526 byte[] bytes = System.Text.Encoding.UTF8.GetBytes( value );
527 int byte_count = bytes.GetLength( 0 );
530 if ( byte_count > N )
537 for (
int i = N; i > byte_count; --i )
539 this.add( (byte) 0 );
543 for (
int i = ( byte_count - 1 ); i >= 0; --i )
545 this.add( bytes[i] );
556 public void addDate(
string value )
559 this.isBufferFull( true );
569 Match match = DATE_REGEX.Match( value );
570 if ( !match.Success )
573 this.is_valid =
false;
579 int year, month, day;
581 System.Globalization.GregorianCalendar calendar =
new System.Globalization.GregorianCalendar();
586 year = int.Parse( match.Groups[ 1 ].ToString() );
587 month = int.Parse( match.Groups[ 2 ].ToString() );
588 day = int.Parse( match.Groups[ 3 ].ToString() );
589 date =
new DateTime( year, month, day, calendar );
591 catch ( Exception ex )
595 this.is_valid =
false;
600 if ( ( year < MIN_SUPPORTED_YEAR ) || ( year > MAX_SUPPORTED_YEAR ) )
603 this.is_valid =
false;
608 int date_integer = ( ((year - MIN_SUPPORTED_YEAR) << 21)
611 | (calendar.GetDayOfYear( date ) << 3)
612 | (int)calendar.GetDayOfWeek( date ) );
613 this.addInt( date_integer );
623 public void addDecimal(
string value )
626 this.isBufferFull( true );
636 Match match = DECIMAL_REGEX.Match( value );
637 if ( !match.Success )
640 this.is_valid =
false;
650 string integral_part_str = match.Groups[
"int" ].Value;
651 string fractional_part_str = match.Groups[
"frac" ].Value;
653 long fractional_part;
654 bool has_integral_part = long.TryParse( integral_part_str, out integral_part );
655 bool has_fractional_part = long.TryParse( fractional_part_str, out fractional_part );
657 int fractional_part_len = fractional_part_str.Length;
658 integral_part = integral_part * (long)Math.Pow(10, fractional_part_len );
660 decimal_value = integral_part + fractional_part;
662 catch ( Exception ex )
666 this.is_valid =
false;
671 this.addLong( decimal_value );
681 public void addIPv4(
string value )
684 this.isBufferFull( true );
694 Match match = IPV4_REGEX.Match( value );
695 if ( !match.Success )
698 this.is_valid =
false;
709 a = int.Parse( match.Groups[
"a" ].Value );
710 b = int.Parse( match.Groups[
"b" ].Value );
711 c = int.Parse( match.Groups[
"c" ].Value );
712 d = int.Parse( match.Groups[
"d" ].Value );
714 catch ( Exception ex )
718 this.is_valid =
false;
724 if ( ( a > 255 ) || ( b > 255 ) || ( c > 255 ) || ( d > 255 ) )
727 this.is_valid =
false;
732 int ipv4_integer = ( (a << 24) | (b << 16) | (c << 8) | d );
733 this.addInt( ipv4_integer );
743 public void addTime(
string value )
746 this.isBufferFull( true );
756 Match match = TIME_REGEX.Match( value );
757 if ( !match.Success )
760 this.is_valid =
false;
766 uint hour, minute, second, milliseconds;
771 hour = uint.Parse( match.Groups[
"hour"].Value );
772 minute = uint.Parse( match.Groups[
"minute"].Value );
773 second = uint.Parse( match.Groups[
"seconds"].Value );
774 Group msec_group = match.Groups[
"milliseconds"];
775 if ( msec_group.Success )
776 milliseconds = uint.Parse( msec_group.Value );
780 catch ( Exception ex )
784 this.is_valid =
false;
789 if ( ( hour > 23 ) || ( minute > 59 ) || ( second > 59 ) )
792 this.is_valid =
false;
797 int time_integer = (int)( (hour << 26) | (minute << 20) | (second << 14) | (milliseconds << 4) );
798 this.addInt( time_integer );
806 public void addTimeStamp(
long? value )
816 System.Globalization.GregorianCalendar calendar =
new System.Globalization.GregorianCalendar();
817 DateTime time = MIN_DATE.AddMilliseconds( (double) value );
818 long timestamp = (long) ( ((time.Year - 1900) << 53)
819 | ((time.Month + 1) << 49)
822 | (time.Minute << 33)
823 | (time.Second << 27)
824 | (time.Millisecond << 17)
825 | (time.DayOfYear << 8)
826 | ((int)time.DayOfWeek << 5) );
827 this.addLong( timestamp );
838 public void computHashes()
841 if ( this.current_size != this.buffer_size )
842 throw new KineticaException(
"The RecordKey buffer is not full; check that all the relevant values have been added." );
845 MurMurHash3.LongPair murmur =
new MurMurHash3.LongPair();
846 MurMurHash3.murmurhash3_x64_128( this.buffer, 0, ( uint ) this.buffer_size, 10, out murmur );
849 this.routingHash = murmur.val1;
850 this.hash_code = ( int ) ( this.routingHash ^ ((this.routingHash >> 32) & 0x0000ffffL));
861 public int route( IList<int> routingTable )
866 return (routingTable[ Math.Abs( (
int ) (
this.routingHash % routingTable.Count ) ) ] - 1);
876 private sealed
class RecordKeyBuilder<T>
881 private enum ColumnType
908 private KineticaType ktype;
909 private IList<int> routing_column_indices;
910 private IList<ColumnType> column_types;
911 private int buffer_size;
913 public RecordKeyBuilder(
bool is_primary_key, KineticaType ktype )
917 this.buffer_size = 0;
918 routing_column_indices =
new List<int>();
919 column_types =
new List<ColumnType>();
924 bool has_timestamp =
false;
927 int track_id_column_idx = -1;
931 IList<KineticaType.Column> columns = ktype.getColumns();
932 for (
int i = 0; i < columns.Count; ++i )
935 KineticaType.Column column = columns[ i ];
938 switch ( column.getName() )
941 track_id_column_idx = i;
945 has_timestamp =
true;
961 routing_column_indices.Add( i );
965 routing_column_indices.Add( i );
971 && has_timestamp && has_x && has_y && ( track_id_column_idx != -1 ) )
973 if ( routing_column_indices.Count == 0 )
975 routing_column_indices.Add( track_id_column_idx );
977 else if ( ( routing_column_indices.Count != 1 )
978 || ( routing_column_indices[0] != track_id_column_idx ) )
981 throw new KineticaException(
"Cannot have a shard key other than 'TRACKID' for track tables." );
988 foreach (
int i
in routing_column_indices )
991 KineticaType.Column column = columns[ i ];
993 switch ( column.getType() )
996 case KineticaType.Column.ColumnType.FLOAT:
998 column_types.Add( ColumnType.FLOAT );
999 this.buffer_size += 4;
1002 case KineticaType.Column.ColumnType.DOUBLE:
1004 column_types.Add( ColumnType.DOUBLE );
1005 this.buffer_size += 8;
1009 case KineticaType.Column.ColumnType.INT:
1014 column_types.Add( ColumnType.INT8 );
1015 this.buffer_size += 1;
1019 column_types.Add( ColumnType.INT16 );
1020 this.buffer_size += 2;
1024 column_types.Add( ColumnType.INT );
1025 this.buffer_size += 4;
1030 case KineticaType.Column.ColumnType.LONG:
1035 column_types.Add( ColumnType.TIMESTAMP );
1039 column_types.Add( ColumnType.LONG );
1041 this.buffer_size += 8;
1045 case KineticaType.Column.ColumnType.STRING:
1049 column_types.Add( ColumnType.CHAR1 );
1050 this.buffer_size += 1;
1054 column_types.Add( ColumnType.CHAR2 );
1055 this.buffer_size += 2;
1059 column_types.Add( ColumnType.CHAR4 );
1060 this.buffer_size += 4;
1064 column_types.Add( ColumnType.CHAR8 );
1065 this.buffer_size += 8;
1069 column_types.Add( ColumnType.CHAR16 );
1070 this.buffer_size += 16;
1074 column_types.Add( ColumnType.CHAR32 );
1075 this.buffer_size += 32;
1079 column_types.Add( ColumnType.CHAR64 );
1080 this.buffer_size += 64;
1084 column_types.Add( ColumnType.CHAR128 );
1085 this.buffer_size += 128;
1089 column_types.Add( ColumnType.CHAR256 );
1090 this.buffer_size += 256;
1094 column_types.Add( ColumnType.DATE );
1095 this.buffer_size += 4;
1099 column_types.Add( ColumnType.DECIMAL );
1100 this.buffer_size += 8;
1104 column_types.Add( ColumnType.IPV4 );
1105 this.buffer_size += 4;
1109 column_types.Add( ColumnType.TIME );
1110 this.buffer_size += 4;
1114 column_types.Add( ColumnType.STRING );
1115 this.buffer_size += 8;
1121 case KineticaType.Column.ColumnType.BYTES:
1122 case KineticaType.Column.ColumnType.DEFAULT:
1123 throw new KineticaException( $
"Cannot use column '{column.getName()}' as a key." );
1136 public RecordKey build( T record )
1139 if ( this.buffer_size == 0 )
1143 RecordKey key =
new RecordKey( this.buffer_size );
1146 for (
int i = 0; i < this.routing_column_indices.Count; ++i )
1149 KineticaType.Column column = this.ktype.getColumns()[ this.routing_column_indices[ i ] ];
1152 var value = record.GetType().GetProperty( column.getName() ).GetValue( record, null );
1154 switch ( this.column_types[i] )
1156 case ColumnType.CHAR1:
1157 key.addCharN( (string) value, 1 );
1160 case ColumnType.CHAR2:
1161 key.addCharN( ( string ) value, 2 );
1164 case ColumnType.CHAR4:
1165 key.addCharN( ( string ) value, 4 );
1168 case ColumnType.CHAR8:
1169 key.addCharN( ( string ) value, 8 );
1172 case ColumnType.CHAR16:
1173 key.addCharN( ( string ) value, 16 );
1176 case ColumnType.CHAR32:
1177 key.addCharN( ( string ) value, 32 );
1180 case ColumnType.CHAR64:
1181 key.addCharN( ( string ) value, 64 );
1184 case ColumnType.CHAR128:
1185 key.addCharN( ( string ) value, 128 );
1188 case ColumnType.CHAR256:
1189 key.addCharN( ( string ) value, 256 );
1192 case ColumnType.DATE:
1193 key.addDate( (string) value );
1196 case ColumnType.DECIMAL:
1197 key.addDecimal( (string) value );
1200 case ColumnType.DOUBLE:
1201 key.addDouble( (
double? ) value );
1204 case ColumnType.FLOAT:
1205 key.addFloat( (
float? ) value );
1208 case ColumnType.INT:
1209 key.addInt( (
int? ) value );
1212 case ColumnType.INT8:
1213 key.addInt8( (
int? ) value );
1216 case ColumnType.INT16:
1217 key.addInt16( (
int? ) value );
1220 case ColumnType.IPV4:
1221 key.addIPv4( ( string ) value );
1224 case ColumnType.LONG:
1225 key.addLong( (
long? ) value );
1228 case ColumnType.STRING:
1229 key.addString( ( string ) value );
1232 case ColumnType.TIME:
1233 key.addTime( ( string ) value );
1236 case ColumnType.TIMESTAMP:
1237 key.addTimeStamp( (
long? ) value );
1254 public bool hasKey()
1257 return !(this.routing_column_indices.Count == 0);
1266 public bool hasSameKey( RecordKeyBuilder<T> other)
1268 return this.column_types.Equals( other.column_types );
1275 private sealed
class WorkerQueue<T>
1277 public System.Uri url {
get;
private set; }
1278 private int capacity;
1279 private bool has_primary_key;
1280 private bool update_on_existing_pk;
1281 private IList<T> queue;
1282 private Dictionary<RecordKey, int> primary_key_map;
1292 public WorkerQueue( System.Uri url,
int capacity,
bool has_primary_key,
bool update_on_existing_pk )
1295 this.capacity = capacity;
1296 this.has_primary_key = has_primary_key;
1297 this.update_on_existing_pk = update_on_existing_pk;
1299 queue =
new List<T>();
1303 if ( this.has_primary_key )
1304 primary_key_map =
new Dictionary<RecordKey, int>( (int)Math.Round(
this.capacity/0.75 ) );
1313 public IList<T> flush()
1315 IList<T> old_queue = this.queue;
1316 queue =
new List<T>( this.capacity );
1319 if ( this.primary_key_map != null )
1320 this.primary_key_map.Clear();
1334 public IList<T> insert( T record, RecordKey key )
1336 if ( this.has_primary_key && key.isValid() )
1339 if ( this.update_on_existing_pk )
1343 if ( this.primary_key_map.TryGetValue( key, out key_idx ) )
1346 this.queue[key_idx] = record;
1350 this.queue.Add( record );
1351 this.primary_key_map.Add( key, ( this.queue.Count - 1 ) );
1356 if ( this.primary_key_map.ContainsKey( key ) )
1361 this.queue.Add( record );
1362 this.primary_key_map.Add( key, ( this.queue.Count - 1 ) );
1367 queue.Add( record );
1371 if ( queue.Count == capacity )
1384 public string table_name {
get; }
1385 public int batch_size {
get; }
1386 public IDictionary<string, string> options {
get; }
1391 private RecordKeyBuilder<T> primary_key_builder;
1392 private RecordKeyBuilder<T> shard_key_builder;
1393 private IList<int> routing_table;
1394 private IList<WorkerQueue<T>> worker_queues;
1395 private Random random;
1409 Dictionary<string, string> options = null,
1410 WorkerList workers = null )
1412 this.kineticaDB = kdb;
1413 this.table_name = table_name;
1417 if ( batch_size < 1 )
1418 throw new KineticaException( $
"Batch size must be greater than one; given {batch_size}." );
1419 this.batch_size = batch_size;
1422 if ( options != null )
1424 this.options = options;
1429 this.options = null;
1434 this.primary_key_builder =
new RecordKeyBuilder<T>(
true, this.ktype );
1435 this.shard_key_builder =
new RecordKeyBuilder<T>(
false, this.ktype );
1438 if ( this.primary_key_builder.hasKey() )
1441 if ( !this.shard_key_builder.hasKey()
1442 || this.shard_key_builder.hasSameKey( this.primary_key_builder ) )
1443 this.shard_key_builder = this.primary_key_builder;
1447 this.primary_key_builder = null;
1450 if ( !this.shard_key_builder.hasKey() )
1451 this.shard_key_builder = null;
1459 bool update_on_existing_pk = ( (options != null)
1463 bool has_primary_key = (this.primary_key_builder != null);
1464 this.worker_queues =
new List<WorkerQueue<T>>();
1468 if ( ( workers == null ) || ( workers.Count == 0 ) )
1470 workers =
new WorkerList( kdb );
1475 if ( ( workers != null ) && ( workers.Count > 0 ) )
1478 foreach ( System.Uri worker_url in workers )
1480 WorkerQueue<T> worker_queue =
new WorkerQueue<T>( worker_url, batch_size, has_primary_key, update_on_existing_pk );
1481 this.worker_queues.Add( worker_queue );
1485 this.routing_table = kdb.adminGetShardAssignments().shard_assignments_rank;
1487 for (
int i = 0; i < routing_table.Count; ++i )
1489 if ( this.routing_table[i] > this.worker_queues.Count )
1495 System.Uri url =
new System.Uri( kdb.Url +
"/insert/records" );
1496 WorkerQueue<T> worker_queue =
new WorkerQueue<T>( url, batch_size, has_primary_key, update_on_existing_pk );
1497 this.worker_queues.Add( worker_queue );
1498 this.routing_table = null;
1501 catch ( Exception ex )
1507 this.random =
new Random( (
int) DateTime.Now.Ticks );
1518 return System.Threading.Interlocked.Read( ref this.count_inserted );
1529 return System.Threading.Interlocked.Read( ref this.count_updated );
1544 foreach ( WorkerQueue<T> worker_queue
in this.worker_queues )
1547 IList<T> queue = worker_queue.flush();
1549 flush( queue, worker_queue.url );
1560 private void flush( IList<T> queue, System.Uri url )
1562 if ( queue.Count == 0 )
1570 IList<byte[]> encoded_queue =
new List<byte[]>();
1571 foreach ( var record
in queue ) encoded_queue.Add( this.kineticaDB.AvroEncode( record ) );
1579 response = this.kineticaDB.insertRecordsRaw( request );
1583 response = this.kineticaDB.SubmitRequest<InsertRecordsResponse>( url, request );
1587 System.Threading.Interlocked.Add( ref this.count_inserted, response.count_inserted );
1588 System.Threading.Interlocked.Add( ref this.count_updated, response.count_updated );
1590 catch ( Exception ex )
1592 throw new InsertException<T>( url, queue, ex.ToString() );
1611 RecordKey primary_key = null;
1612 RecordKey shard_key = null;
1615 if ( this.primary_key_builder != null )
1616 primary_key = this.primary_key_builder.build( record );
1619 if ( this.shard_key_builder != null )
1620 shard_key = this.shard_key_builder.build( record );
1624 WorkerQueue<T> worker_queue;
1625 if ( this.routing_table == null )
1627 worker_queue = this.worker_queues[0];
1629 else if ( shard_key == null )
1631 worker_queue = this.worker_queues[ random.Next( this.worker_queues.Count ) ];
1635 int worker_index = shard_key.route( this.routing_table );
1636 worker_queue = this.worker_queues[worker_index];
1640 IList<T> queue = worker_queue.insert( record, primary_key );
1644 if ( queue != null )
1646 this.flush( queue, worker_queue.url );
1668 for (
int i = 0; i < records.Count; ++i )
1672 this.insert( records[ i ] );
1674 catch ( InsertException<T> ex )
1678 IList<T> queue = ex.records;
1680 for (
int j = i + 1; j < records.Count; ++j )
1682 queue.Add( records[ j ] );
InsertException(string msg)
const string CHAR1
This property provides optimized memory, disk and query performance for string columns.
A set of results returned by /show/system/properties.
const string INT16
This property provides optimized memory and query performance for int columns.
const string PRIMARY_KEY
This property indicates that this column will be part of (or the entire) primary key.
const string CHAR128
This property provides optimized memory, disk and query performance for string columns.
A set of parameters for /insert/records.
WorkerList(Kinetica db, Regex ip_regex=null)
Creates a WorkerList object and automatically populates it with the worker URLs from GPUdb to support...
WorkerList()
Creates an empty WorkerList that can be populated manually with worker URLs to support multi-head ing...
void insert(T record)
Queues a record for insertion into Kinetica.
KineticaIngestor(Kinetica kdb, string table_name, int batch_size, KineticaType ktype, Dictionary< string, string > options=null, WorkerList workers=null)
const string TIMESTAMP
Valid only for 'long' columns.
const string CHAR16
This property provides optimized memory, disk and query performance for string columns.
Int64 getCountInserted()
Returns the count of records inserted so far.
const string CHAR64
This property provides optimized memory, disk and query performance for string columns.
const string CHAR4
This property provides optimized memory, disk and query performance for string columns.
const string CHAR2
This property provides optimized memory, disk and query performance for string columns.
const string DATE
Valid only for 'string' columns.
void insert(IList< T > records)
Queues a list of records for insertion into Kientica.
const string CHAR8
This property provides optimized memory, disk and query performance for string columns.
const string DECIMAL
Valid only for 'string' columns.
void flush()
Ensures that all queued records are inserted into Kinetica.
const string CHAR32
This property provides optimized memory, disk and query performance for string columns.
const string IPV4
This property provides optimized memory, disk and query performance for string columns representing I...
const string CHAR256
This property provides optimized memory, disk and query performance for string columns.
A set of parameters for /insert/records.
const string SHARD_KEY
This property indicates that this column will be part of (or the entire) shard key.
const string INT8
This property provides optimized memory and query performance for int columns.
Int64 getCountUpdated()
Returns the count of records updated so far.
A set of results returned by /insert/records.
const string TIME
Valid only for 'string' columns.
API to talk to Kinetica Database
override string ToString()