2 using System.Collections.Concurrent;
3 using System.Collections.Generic;
5 using System.Threading;
6 using System.Threading.Tasks;
22 private readonly ConcurrentDictionary<string, TableBatchContext> _tableContexts;
24 private readonly SemaphoreSlim _contextLock;
25 private volatile bool _isDisposed;
28 private long _totalRecordsInserted;
29 private long _totalBatchesFlushed;
30 private long _bufferedRecordCount;
36 _tableContexts =
new ConcurrentDictionary<string, TableBatchContext>(StringComparer.OrdinalIgnoreCase);
37 _contextLock =
new SemaphoreSlim(1, 1);
60 IList<string> columnNames,
61 IList<object?> values,
62 CancellationToken cancellationToken =
default)
66 var context = await GetOrCreateContextAsync(tableName, columnNames, cancellationToken).ConfigureAwait(
false);
67 await context.InsertAsync(values, cancellationToken).ConfigureAwait(
false);
68 Interlocked.Increment(ref _bufferedRecordCount);
78 IList<string> columnNames,
79 IList<IList<object?>> valuesList,
80 CancellationToken cancellationToken =
default)
84 var context = await GetOrCreateContextAsync(tableName, columnNames, cancellationToken).ConfigureAwait(
false);
86 foreach (var values
in valuesList)
88 await context.InsertAsync(values, cancellationToken).ConfigureAwait(
false);
89 Interlocked.Increment(ref _bufferedRecordCount);
92 return valuesList.Count;
98 public async Task<long>
FlushTableAsync(
string tableName, CancellationToken cancellationToken =
default)
102 if (_tableContexts.TryGetValue(tableName, out var context))
105 var flushedCount = Interlocked.Exchange(ref _bufferedRecordCount, 0);
106 await context.FlushAsync(cancellationToken).ConfigureAwait(
false);
107 Interlocked.Add(ref _totalRecordsInserted, flushedCount);
108 Interlocked.Increment(ref _totalBatchesFlushed);
118 public async Task<long>
FlushAllAsync(CancellationToken cancellationToken =
default)
123 var flushedCount = Interlocked.Exchange(ref _bufferedRecordCount, 0);
125 var flushTasks =
new List<Task>();
127 foreach (var kvp
in _tableContexts)
129 flushTasks.Add(kvp.Value.FlushAsync(cancellationToken));
132 await Task.WhenAll(flushTasks).ConfigureAwait(
false);
134 Interlocked.Add(ref _totalRecordsInserted, flushedCount);
135 Interlocked.Add(ref _totalBatchesFlushed, flushTasks.Count);
145 if (_tableContexts.TryGetValue(tableName, out var context))
147 return (
int)context.PendingBatches;
157 return (
int)Interlocked.Read(ref _bufferedRecordCount);
160 private async Task<TableBatchContext> GetOrCreateContextAsync(
162 IList<string> columnNames,
163 CancellationToken cancellationToken)
166 if (_tableContexts.TryGetValue(tableName, out var existingContext))
168 return existingContext;
172 await _contextLock.WaitAsync(cancellationToken).ConfigureAwait(
false);
176 if (_tableContexts.TryGetValue(tableName, out existingContext))
178 return existingContext;
182 var ktype = await Task.Run(() =>
KineticaType.
fromTable(_kinetica, tableName), cancellationToken).ConfigureAwait(
false);
184 var context =
new TableBatchContext(
191 _tableContexts[tableName] = context;
196 _contextLock.Release();
200 private void ThrowIfDisposed()
210 if (_isDisposed)
return;
216 Task.Run(async () => await
FlushAllAsync().ConfigureAwait(
false))
217 .ConfigureAwait(
false)
226 foreach (var context
in _tableContexts.Values)
231 _tableContexts.Clear();
232 _contextLock.Dispose();
237 if (_isDisposed)
return;
250 foreach (var context
in _tableContexts.Values)
252 await context.DisposeAsync().ConfigureAwait(
false);
255 _tableContexts.Clear();
256 _contextLock.Dispose();
298 public int NumStripes {
get;
set; } = Environment.ProcessorCount;
303 public int MaxFlushWorkers {
get;
set; } = Math.Max(4, Environment.ProcessorCount * 2);
313 private readonly
kinetica.Records.Type _recordType;
314 private readonly
int[] _columnMapping;
324 IList<string> insertColumnNames,
328 _recordType = CreateTypeFromKineticaType(ktype);
331 _columnMapping = CreateColumnMapping(ktype, insertColumnNames);
340 InsertOptions =
new Dictionary<string, string>()
345 bulkOptions.InsertOptions[
"update_on_existing_pk"] =
"true";
349 bulkOptions.InsertOptions[
"return_individual_errors"] =
"true";
358 var columnDefs =
new List<Column>();
360 foreach (var col
in columns)
362 var props = col.getProperties();
363 var colType = DetermineColumnType(col);
365 columnDefs.Add(
new Column(col.getName(), colType, props));
368 return new kinetica.Records.Type(
"dynamic_record", columnDefs);
373 var props = col.getProperties();
375 if (props.Contains(
"boolean"))
return ColumnType.Boolean;
376 if (props.Contains(
"int8"))
return ColumnType.Int8;
377 if (props.Contains(
"int16"))
return ColumnType.Int16;
378 if (props.Contains(
"timestamp"))
return ColumnType.Timestamp;
379 if (props.Contains(
"date"))
return ColumnType.Date;
380 if (props.Contains(
"datetime"))
return ColumnType.DateTime;
381 if (props.Contains(
"time"))
return ColumnType.Time;
382 if (props.Contains(
"decimal"))
return ColumnType.Decimal;
383 if (props.Contains(
"ipv4"))
return ColumnType.Ipv4;
384 if (props.Contains(
"uuid"))
return ColumnType.Uuid;
385 if (props.Contains(
"char1"))
return ColumnType.Char1;
386 if (props.Contains(
"char2"))
return ColumnType.Char2;
387 if (props.Contains(
"char4"))
return ColumnType.Char4;
388 if (props.Contains(
"char8"))
return ColumnType.Char8;
389 if (props.Contains(
"char16"))
return ColumnType.Char16;
390 if (props.Contains(
"char32"))
return ColumnType.Char32;
391 if (props.Contains(
"char64"))
return ColumnType.Char64;
392 if (props.Contains(
"char128"))
return ColumnType.Char128;
393 if (props.Contains(
"char256"))
return ColumnType.Char256;
395 return col.getType()
switch 403 _ => ColumnType.String
407 private static int[] CreateColumnMapping(
KineticaType ktype, IList<string> insertColumnNames)
410 var mapping =
new int[insertColumnNames.Count];
412 for (
int i = 0; i < insertColumnNames.Count; i++)
414 var insertCol = insertColumnNames[i];
415 int schemaIndex = -1;
417 for (
int j = 0; j < schemaColumns.Count; j++)
419 if (
string.Equals(schemaColumns[j].getName(), insertCol, StringComparison.OrdinalIgnoreCase))
426 if (schemaIndex == -1)
428 throw new ArgumentException($
"Column '{insertCol}' not found in table schema");
431 mapping[i] = schemaIndex;
437 public async Task
InsertAsync(IList<object?> values, CancellationToken cancellationToken)
442 for (
int i = 0; i < values.Count && i < _columnMapping.Length; i++)
444 var schemaIndex = _columnMapping[i];
445 var value = values[i];
447 if (value ==
null || value == DBNull.Value)
454 var column = _recordType.
GetColumn(schemaIndex);
455 var colType = column?.
ColumnType ?? ColumnType.String;
457 PutValueWithTypeConversion(record, schemaIndex, value, colType);
461 await _inserter.InsertAsync(record, cancellationToken).ConfigureAwait(
false);
464 private static void PutValueWithTypeConversion(
GenericRecord record,
int index,
object value, ColumnType targetType)
468 case ColumnType.Integer:
469 case ColumnType.Int8:
470 case ColumnType.Int16:
471 case ColumnType.Boolean:
473 record.
Put(index, Convert.ToInt32(value));
476 case ColumnType.Long:
477 case ColumnType.Timestamp:
479 if (value is DateTime dt)
480 record.
Put(index,
new DateTimeOffset(dt).ToUnixTimeMilliseconds());
481 else if (value is DateTimeOffset dto)
482 record.
Put(index, dto.ToUnixTimeMilliseconds());
484 record.
Put(index, Convert.ToInt64(value));
487 case ColumnType.Float:
489 record.
Put(index, Convert.ToSingle(value));
492 case ColumnType.Double:
494 record.
Put(index, Convert.ToDouble(value));
497 case ColumnType.String:
498 case ColumnType.Char1:
499 case ColumnType.Char2:
500 case ColumnType.Char4:
501 case ColumnType.Char8:
502 case ColumnType.Char16:
503 case ColumnType.Char32:
504 case ColumnType.Char64:
505 case ColumnType.Char128:
506 case ColumnType.Char256:
508 case ColumnType.Json:
509 case ColumnType.Ipv4:
510 case ColumnType.Uuid:
511 case ColumnType.Decimal:
512 case ColumnType.Date:
513 case ColumnType.DateTime:
514 case ColumnType.Time:
516 if (value is DateTime dtVal)
517 record.
Put(index, dtVal.ToString(
"yyyy-MM-dd HH:mm:ss.fff"));
518 else if (value is DateTimeOffset dtoVal)
519 record.
Put(index, dtoVal.ToString(
"yyyy-MM-dd HH:mm:ss.fff"));
520 else if (value is decimal decVal)
521 record.
Put(index, decVal.ToString());
522 else if (value is Guid guidVal)
523 record.
Put(index, guidVal.ToString());
525 record.
Put(index, value?.ToString() ??
string.Empty);
528 case ColumnType.Bytes:
529 if (value is
byte[] bytesVal)
530 record.
Put(index, bytesVal);
532 record.
Put(index,
Array.Empty<
byte>());
540 record.
Put(index, intVal);
543 record.
Put(index, longVal);
546 record.
Put(index, floatVal);
548 case double doubleVal:
549 record.
Put(index, doubleVal);
552 record.
Put(index, strVal);
555 record.
Put(index, boolVal);
558 record.
Put(index, bytes);
561 record.
Put(index, value?.ToString() ??
string.Empty);
568 public async Task
FlushAsync(CancellationToken cancellationToken =
default)
570 await _inserter.FlushAsync(cancellationToken).ConfigureAwait(
false);
573 while (_inserter.PendingBatches > 0)
575 cancellationToken.ThrowIfCancellationRequested();
576 await Task.Delay(10, cancellationToken).ConfigureAwait(
false);
580 var errors = _inserter.DrainErrors();
581 if (errors.Count > 0)
583 var errorMessages =
string.Join(
"; ", errors.Select(e => e.Message));
584 throw new KineticaException($
"Batch insert failed with {errors.Count} error(s): {errorMessages}");
588 public List<InsertError>
GetErrors() => _inserter.DrainErrors();
597 await _inserter.DisposeAsync().ConfigureAwait(
false);
607 private static readonly System.Text.RegularExpressions.Regex _insertWithColumnsPattern =
new System.Text.RegularExpressions.Regex(
608 @"^\s*INSERT\s+INTO\s+(?<table>[\w\.]+)\s*\(\s*(?<columns>[\w\s,]+)\s*\)\s*VALUES\s*\((?<values>.*)\)\s*$",
609 System.Text.RegularExpressions.RegexOptions.IgnoreCase | System.Text.RegularExpressions.RegexOptions.Compiled | System.Text.RegularExpressions.RegexOptions.Singleline);
612 private static readonly System.Text.RegularExpressions.Regex _insertWithoutColumnsPattern =
new System.Text.RegularExpressions.Regex(
613 @"^\s*INSERT\s+INTO\s+(?<table>[\w\.]+)\s+VALUES\s*\((?<values>.*)\)\s*$",
614 System.Text.RegularExpressions.RegexOptions.IgnoreCase | System.Text.RegularExpressions.RegexOptions.Compiled | System.Text.RegularExpressions.RegexOptions.Singleline);
617 private static readonly System.Text.RegularExpressions.Regex _columnPattern =
new System.Text.RegularExpressions.Regex(
619 System.Text.RegularExpressions.RegexOptions.Compiled);
626 var match = _insertWithColumnsPattern.Match(sql);
627 string columnsStr =
"";
632 match = _insertWithoutColumnsPattern.Match(sql);
640 columnsStr = match.Groups[
"columns"].Value.Trim();
643 var tableName = match.Groups[
"table"].Value.Trim();
644 var valuesStr = match.Groups[
"values"].Value.Trim();
647 var columnNames =
new List<string>();
648 if (!
string.IsNullOrEmpty(columnsStr))
650 var columnMatches = _columnPattern.Matches(columnsStr);
651 foreach (System.Text.RegularExpressions.Match cm in columnMatches)
653 columnNames.Add(cm.Value);
658 var values = ParseValues(valuesStr);
662 TableName = tableName,
663 ColumnNames = columnNames,
670 private static List<object?> ParseValues(
string valuesStr)
672 var values =
new List<object?>();
673 var currentValue =
new System.Text.StringBuilder();
675 bool inString =
false;
676 char stringChar =
'\0';
678 for (
int i = 0; i < valuesStr.Length; i++)
680 char c = valuesStr[i];
687 if (i + 1 < valuesStr.Length && valuesStr[i + 1] == stringChar)
689 currentValue.Append(c);
699 currentValue.Append(c);
702 else if (c ==
'\'' || c ==
'"')
710 currentValue.Append(c);
715 currentValue.Append(c);
717 else if (c ==
',' && depth == 0)
719 values.Add(ParseSingleValue(currentValue.ToString().Trim()));
720 currentValue.Clear();
724 currentValue.Append(c);
729 if (currentValue.Length > 0)
731 values.Add(ParseSingleValue(currentValue.ToString().Trim()));
737 private static object? ParseSingleValue(
string valueStr)
740 if (
string.Equals(valueStr,
"NULL", StringComparison.OrdinalIgnoreCase))
746 if ((valueStr.StartsWith(
"'") && valueStr.EndsWith(
"'")) ||
747 (valueStr.StartsWith(
"\"") && valueStr.EndsWith(
"\"")))
749 return valueStr.Substring(1, valueStr.Length - 2)
751 .Replace(
"\"\"",
"\"");
755 if (
string.Equals(valueStr,
"TRUE", StringComparison.OrdinalIgnoreCase))
759 if (
string.Equals(valueStr,
"FALSE", StringComparison.OrdinalIgnoreCase))
765 if (
long.
TryParse(valueStr, out
long longValue))
767 if (longValue >=
int.MinValue && longValue <=
int.MaxValue)
769 return (
int)longValue;
774 if (
double.
TryParse(valueStr, out
double doubleValue))
791 public List<object?>
Values {
get;
set; } =
new();
int MaxRetries
Maximum number of retry attempts for failed batches.
Manages bulk insert operations for the ADO.NET driver.
async ValueTask DisposeAsync()
static bool TryParse(string sql, out ParsedInsert? result)
bool UpdateOnExistingPk
If true, updates existing records with matching primary keys.
int GetPendingCount(string tableName)
Gets the number of pending records for the specified table.
async Task< long > FlushTableAsync(string tableName, CancellationToken cancellationToken=default)
Flushes all pending records for the specified table.
bool ReturnIndividualErrors
If true, returns individual errors for each failed record.
Column? GetColumn(int index)
Gets a column by index.
bool PutNull(int index)
Sets a null value by column index.
TableBatchContext(Kinetica kinetica, string tableName, KineticaType ktype, IList< string > insertColumnNames, InsertBatchOptions options)
bool Put(int index, int value)
Sets an integer value by column index.
async Task< long > FlushAllAsync(CancellationToken cancellationToken=default)
Flushes all pending records for all tables.
List< string > ColumnNames
InsertBatchManager(Kinetica kinetica, InsertBatchOptions? options=null)
async Task< bool > InsertAsync(string tableName, IList< string > columnNames, IList< object?> values, CancellationToken cancellationToken=default)
Inserts a record into the specified table using batch processing.
GenericRecord NewInstance()
Creates a new GenericRecord instance of this type.
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
long TotalRecordsInserted
Gets the total number of records inserted across all tables.
int GetTotalPendingCount()
Gets the total number of pending records across all tables.
long TotalBatchesFlushed
Gets the total number of batches flushed across all tables.
SQL INSERT statement parser that extracts table name, column names, and values.
int MaxFlushWorkers
Maximum concurrent flush workers.
int MaxInFlightBatches
Maximum in-flight batches for backpressure control.
Parsed INSERT statement result.
Immutable metadata about a column in a Kinetica type.
Manages batch context for a single table using BulkInserter<GenericRecord>.
async ValueTask DisposeAsync()
IList< Column > getColumns()
async Task FlushAsync(CancellationToken cancellationToken=default)
List< InsertError > GetErrors()
long BufferedRecordCount
Gets the number of records currently buffered (not yet flushed).
Options for batch insert operations.
ColumnType ColumnType
Gets the column type.
async Task< int > InsertBatchAsync(string tableName, IList< string > columnNames, IList< IList< object?>> valuesList, CancellationToken cancellationToken=default)
Inserts multiple records into the specified table using batch processing.
int BatchSize
The number of records to batch before flushing.
A generic record that can hold values for any Kinetica type.
int NumStripes
Number of stripes per worker queue.
int RetryDelayMs
Base delay in milliseconds between retries.
async Task InsertAsync(IList< object?> values, CancellationToken cancellationToken)
Configuration options for the BulkInserter<T>.