Kinetica   C#   API  Version 7.2.3.1
InsertBatchManager.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Concurrent;
3 using System.Collections.Generic;
4 using System.Linq;
5 using System.Threading;
6 using System.Threading.Tasks;
7 using kinetica;
8 using kinetica.Records;
9 
10 namespace KineticaAdo
11 {
19  public class InsertBatchManager : IAsyncDisposable, IDisposable
20  {
21  private readonly Kinetica _kinetica;
22  private readonly ConcurrentDictionary<string, TableBatchContext> _tableContexts;
23  private readonly InsertBatchOptions _options;
24  private readonly SemaphoreSlim _contextLock;
25  private volatile bool _isDisposed;
26 
27  // Metrics
28  private long _totalRecordsInserted;
29  private long _totalBatchesFlushed;
30  private long _bufferedRecordCount;
31 
33  {
34  _kinetica = kinetica ?? throw new ArgumentNullException(nameof(kinetica));
35  _options = options ?? new InsertBatchOptions();
36  _tableContexts = new ConcurrentDictionary<string, TableBatchContext>(StringComparer.OrdinalIgnoreCase);
37  _contextLock = new SemaphoreSlim(1, 1);
38  }
39 
43  public long TotalRecordsInserted => Interlocked.Read(ref _totalRecordsInserted);
44 
48  public long TotalBatchesFlushed => Interlocked.Read(ref _totalBatchesFlushed);
49 
53  public long BufferedRecordCount => Interlocked.Read(ref _bufferedRecordCount);
54 
58  public async Task<bool> InsertAsync(
59  string tableName,
60  IList<string> columnNames,
61  IList<object?> values,
62  CancellationToken cancellationToken = default)
63  {
64  ThrowIfDisposed();
65 
66  var context = await GetOrCreateContextAsync(tableName, columnNames, cancellationToken).ConfigureAwait(false);
67  await context.InsertAsync(values, cancellationToken).ConfigureAwait(false);
68  Interlocked.Increment(ref _bufferedRecordCount);
69 
70  return true;
71  }
72 
76  public async Task<int> InsertBatchAsync(
77  string tableName,
78  IList<string> columnNames,
79  IList<IList<object?>> valuesList,
80  CancellationToken cancellationToken = default)
81  {
82  ThrowIfDisposed();
83 
84  var context = await GetOrCreateContextAsync(tableName, columnNames, cancellationToken).ConfigureAwait(false);
85 
86  foreach (var values in valuesList)
87  {
88  await context.InsertAsync(values, cancellationToken).ConfigureAwait(false);
89  Interlocked.Increment(ref _bufferedRecordCount);
90  }
91 
92  return valuesList.Count;
93  }
94 
98  public async Task<long> FlushTableAsync(string tableName, CancellationToken cancellationToken = default)
99  {
100  ThrowIfDisposed();
101 
102  if (_tableContexts.TryGetValue(tableName, out var context))
103  {
104  // Get the current buffered count before flushing
105  var flushedCount = Interlocked.Exchange(ref _bufferedRecordCount, 0);
106  await context.FlushAsync(cancellationToken).ConfigureAwait(false);
107  Interlocked.Add(ref _totalRecordsInserted, flushedCount);
108  Interlocked.Increment(ref _totalBatchesFlushed);
109  return flushedCount;
110  }
111 
112  return 0;
113  }
114 
118  public async Task<long> FlushAllAsync(CancellationToken cancellationToken = default)
119  {
120  ThrowIfDisposed();
121 
122  // Get the current buffered count before flushing
123  var flushedCount = Interlocked.Exchange(ref _bufferedRecordCount, 0);
124 
125  var flushTasks = new List<Task>();
126 
127  foreach (var kvp in _tableContexts)
128  {
129  flushTasks.Add(kvp.Value.FlushAsync(cancellationToken));
130  }
131 
132  await Task.WhenAll(flushTasks).ConfigureAwait(false);
133 
134  Interlocked.Add(ref _totalRecordsInserted, flushedCount);
135  Interlocked.Add(ref _totalBatchesFlushed, flushTasks.Count);
136 
137  return flushedCount;
138  }
139 
143  public int GetPendingCount(string tableName)
144  {
145  if (_tableContexts.TryGetValue(tableName, out var context))
146  {
147  return (int)context.PendingBatches;
148  }
149  return 0;
150  }
151 
155  public int GetTotalPendingCount()
156  {
157  return (int)Interlocked.Read(ref _bufferedRecordCount);
158  }
159 
160  private async Task<TableBatchContext> GetOrCreateContextAsync(
161  string tableName,
162  IList<string> columnNames,
163  CancellationToken cancellationToken)
164  {
165  // Fast path: context already exists
166  if (_tableContexts.TryGetValue(tableName, out var existingContext))
167  {
168  return existingContext;
169  }
170 
171  // Slow path: need to create context with schema discovery
172  await _contextLock.WaitAsync(cancellationToken).ConfigureAwait(false);
173  try
174  {
175  // Double-check after acquiring lock
176  if (_tableContexts.TryGetValue(tableName, out existingContext))
177  {
178  return existingContext;
179  }
180 
181  // Discover table schema - run on thread pool to avoid blocking
182  var ktype = await Task.Run(() => KineticaType.fromTable(_kinetica, tableName), cancellationToken).ConfigureAwait(false);
183 
184  var context = new TableBatchContext(
185  _kinetica,
186  tableName,
187  ktype,
188  columnNames,
189  _options);
190 
191  _tableContexts[tableName] = context;
192  return context;
193  }
194  finally
195  {
196  _contextLock.Release();
197  }
198  }
199 
200  private void ThrowIfDisposed()
201  {
202  if (_isDisposed)
203  {
204  throw new ObjectDisposedException(nameof(InsertBatchManager));
205  }
206  }
207 
208  public void Dispose()
209  {
210  if (_isDisposed) return;
211  _isDisposed = true;
212 
213  // Flush all pending data synchronously
214  try
215  {
216  Task.Run(async () => await FlushAllAsync().ConfigureAwait(false))
217  .ConfigureAwait(false)
218  .GetAwaiter()
219  .GetResult();
220  }
221  catch
222  {
223  // Ignore errors during disposal
224  }
225 
226  foreach (var context in _tableContexts.Values)
227  {
228  context.Dispose();
229  }
230 
231  _tableContexts.Clear();
232  _contextLock.Dispose();
233  }
234 
235  public async ValueTask DisposeAsync()
236  {
237  if (_isDisposed) return;
238  _isDisposed = true;
239 
240  // Flush all pending data
241  try
242  {
243  await FlushAllAsync().ConfigureAwait(false);
244  }
245  catch
246  {
247  // Ignore errors during disposal
248  }
249 
250  foreach (var context in _tableContexts.Values)
251  {
252  await context.DisposeAsync().ConfigureAwait(false);
253  }
254 
255  _tableContexts.Clear();
256  _contextLock.Dispose();
257  }
258  }
259 
263  public class InsertBatchOptions
264  {
268  public int BatchSize { get; set; } = 10000;
269 
273  public bool UpdateOnExistingPk { get; set; } = false;
274 
278  public bool ReturnIndividualErrors { get; set; } = false;
279 
283  public int MaxRetries { get; set; } = 3;
284 
288  public int RetryDelayMs { get; set; } = 100;
289 
293  public int MaxInFlightBatches { get; set; } = 100;
294 
298  public int NumStripes { get; set; } = Environment.ProcessorCount;
299 
303  public int MaxFlushWorkers { get; set; } = Math.Max(4, Environment.ProcessorCount * 2);
304 
305  }
306 
310  internal class TableBatchContext : IAsyncDisposable, IDisposable
311  {
312  private readonly BulkInserter<GenericRecord> _inserter;
313  private readonly kinetica.Records.Type _recordType;
314  private readonly int[] _columnMapping; // Maps INSERT column index to schema column index
315 
316  public long CountInserted => _inserter.CountInserted;
317  public long CountUpdated => _inserter.CountUpdated;
318  public long PendingBatches => _inserter.PendingBatches;
319 
322  string tableName,
323  KineticaType ktype,
324  IList<string> insertColumnNames,
325  InsertBatchOptions options)
326  {
327  // Create Type from KineticaType
328  _recordType = CreateTypeFromKineticaType(ktype);
329 
330  // Build column mapping from INSERT order to schema order
331  _columnMapping = CreateColumnMapping(ktype, insertColumnNames);
332 
333  var bulkOptions = new BulkInserterOptions
334  {
335  BatchSize = options.BatchSize,
336  MaxRetries = options.MaxRetries,
337  MaxInFlightBatches = options.MaxInFlightBatches,
338  NumStripes = options.NumStripes,
339  MaxFlushWorkers = options.MaxFlushWorkers,
340  InsertOptions = new Dictionary<string, string>()
341  };
342 
343  if (options.UpdateOnExistingPk)
344  {
345  bulkOptions.InsertOptions["update_on_existing_pk"] = "true";
346  }
347  if (options.ReturnIndividualErrors)
348  {
349  bulkOptions.InsertOptions["return_individual_errors"] = "true";
350  }
351 
352  _inserter = new BulkInserter<GenericRecord>(kinetica, tableName, ktype, bulkOptions);
353  }
354 
355  private static kinetica.Records.Type CreateTypeFromKineticaType(KineticaType ktype)
356  {
357  var columns = ktype.getColumns();
358  var columnDefs = new List<Column>();
359 
360  foreach (var col in columns)
361  {
362  var props = col.getProperties();
363  var colType = DetermineColumnType(col);
364 
365  columnDefs.Add(new Column(col.getName(), colType, props));
366  }
367 
368  return new kinetica.Records.Type("dynamic_record", columnDefs);
369  }
370 
371  private static ColumnType DetermineColumnType(KineticaType.Column col)
372  {
373  var props = col.getProperties();
374 
375  if (props.Contains("boolean")) return ColumnType.Boolean;
376  if (props.Contains("int8")) return ColumnType.Int8;
377  if (props.Contains("int16")) return ColumnType.Int16;
378  if (props.Contains("timestamp")) return ColumnType.Timestamp;
379  if (props.Contains("date")) return ColumnType.Date;
380  if (props.Contains("datetime")) return ColumnType.DateTime;
381  if (props.Contains("time")) return ColumnType.Time;
382  if (props.Contains("decimal")) return ColumnType.Decimal;
383  if (props.Contains("ipv4")) return ColumnType.Ipv4;
384  if (props.Contains("uuid")) return ColumnType.Uuid;
385  if (props.Contains("char1")) return ColumnType.Char1;
386  if (props.Contains("char2")) return ColumnType.Char2;
387  if (props.Contains("char4")) return ColumnType.Char4;
388  if (props.Contains("char8")) return ColumnType.Char8;
389  if (props.Contains("char16")) return ColumnType.Char16;
390  if (props.Contains("char32")) return ColumnType.Char32;
391  if (props.Contains("char64")) return ColumnType.Char64;
392  if (props.Contains("char128")) return ColumnType.Char128;
393  if (props.Contains("char256")) return ColumnType.Char256;
394 
395  return col.getType() switch
396  {
397  KineticaType.Column.ColumnType.INT => ColumnType.Integer,
398  KineticaType.Column.ColumnType.LONG => ColumnType.Long,
399  KineticaType.Column.ColumnType.FLOAT => ColumnType.Float,
400  KineticaType.Column.ColumnType.DOUBLE => ColumnType.Double,
401  KineticaType.Column.ColumnType.STRING => ColumnType.String,
402  KineticaType.Column.ColumnType.BYTES => ColumnType.Bytes,
403  _ => ColumnType.String
404  };
405  }
406 
407  private static int[] CreateColumnMapping(KineticaType ktype, IList<string> insertColumnNames)
408  {
409  var schemaColumns = ktype.getColumns();
410  var mapping = new int[insertColumnNames.Count];
411 
412  for (int i = 0; i < insertColumnNames.Count; i++)
413  {
414  var insertCol = insertColumnNames[i];
415  int schemaIndex = -1;
416 
417  for (int j = 0; j < schemaColumns.Count; j++)
418  {
419  if (string.Equals(schemaColumns[j].getName(), insertCol, StringComparison.OrdinalIgnoreCase))
420  {
421  schemaIndex = j;
422  break;
423  }
424  }
425 
426  if (schemaIndex == -1)
427  {
428  throw new ArgumentException($"Column '{insertCol}' not found in table schema");
429  }
430 
431  mapping[i] = schemaIndex;
432  }
433 
434  return mapping;
435  }
436 
437  public async Task InsertAsync(IList<object?> values, CancellationToken cancellationToken)
438  {
439  // Create a GenericRecord and populate with mapped values
440  var record = _recordType.NewInstance();
441 
442  for (int i = 0; i < values.Count && i < _columnMapping.Length; i++)
443  {
444  var schemaIndex = _columnMapping[i];
445  var value = values[i];
446 
447  if (value == null || value == DBNull.Value)
448  {
449  record.PutNull(schemaIndex);
450  }
451  else
452  {
453  // Get the column's expected type and convert the value accordingly
454  var column = _recordType.GetColumn(schemaIndex);
455  var colType = column?.ColumnType ?? ColumnType.String;
456 
457  PutValueWithTypeConversion(record, schemaIndex, value, colType);
458  }
459  }
460 
461  await _inserter.InsertAsync(record, cancellationToken).ConfigureAwait(false);
462  }
463 
464  private static void PutValueWithTypeConversion(GenericRecord record, int index, object value, ColumnType targetType)
465  {
466  switch (targetType)
467  {
468  case ColumnType.Integer:
469  case ColumnType.Int8:
470  case ColumnType.Int16:
471  case ColumnType.Boolean:
472  // Integer types - convert numeric values to int
473  record.Put(index, Convert.ToInt32(value));
474  break;
475 
476  case ColumnType.Long:
477  case ColumnType.Timestamp:
478  // Long types - convert numeric values to long
479  if (value is DateTime dt)
480  record.Put(index, new DateTimeOffset(dt).ToUnixTimeMilliseconds());
481  else if (value is DateTimeOffset dto)
482  record.Put(index, dto.ToUnixTimeMilliseconds());
483  else
484  record.Put(index, Convert.ToInt64(value));
485  break;
486 
487  case ColumnType.Float:
488  // Float - convert numeric values to float
489  record.Put(index, Convert.ToSingle(value));
490  break;
491 
492  case ColumnType.Double:
493  // Double - convert numeric values to double
494  record.Put(index, Convert.ToDouble(value));
495  break;
496 
497  case ColumnType.String:
498  case ColumnType.Char1:
499  case ColumnType.Char2:
500  case ColumnType.Char4:
501  case ColumnType.Char8:
502  case ColumnType.Char16:
503  case ColumnType.Char32:
504  case ColumnType.Char64:
505  case ColumnType.Char128:
506  case ColumnType.Char256:
507  case ColumnType.Wkt:
508  case ColumnType.Json:
509  case ColumnType.Ipv4:
510  case ColumnType.Uuid:
511  case ColumnType.Decimal:
512  case ColumnType.Date:
513  case ColumnType.DateTime:
514  case ColumnType.Time:
515  // String-based types
516  if (value is DateTime dtVal)
517  record.Put(index, dtVal.ToString("yyyy-MM-dd HH:mm:ss.fff"));
518  else if (value is DateTimeOffset dtoVal)
519  record.Put(index, dtoVal.ToString("yyyy-MM-dd HH:mm:ss.fff"));
520  else if (value is decimal decVal)
521  record.Put(index, decVal.ToString());
522  else if (value is Guid guidVal)
523  record.Put(index, guidVal.ToString());
524  else
525  record.Put(index, value?.ToString() ?? string.Empty);
526  break;
527 
528  case ColumnType.Bytes:
529  if (value is byte[] bytesVal)
530  record.Put(index, bytesVal);
531  else
532  record.Put(index, Array.Empty<byte>());
533  break;
534 
535  default:
536  // Default: try to use the value directly
537  switch (value)
538  {
539  case int intVal:
540  record.Put(index, intVal);
541  break;
542  case long longVal:
543  record.Put(index, longVal);
544  break;
545  case float floatVal:
546  record.Put(index, floatVal);
547  break;
548  case double doubleVal:
549  record.Put(index, doubleVal);
550  break;
551  case string strVal:
552  record.Put(index, strVal);
553  break;
554  case bool boolVal:
555  record.Put(index, boolVal);
556  break;
557  case byte[] bytes:
558  record.Put(index, bytes);
559  break;
560  default:
561  record.Put(index, value?.ToString() ?? string.Empty);
562  break;
563  }
564  break;
565  }
566  }
567 
568  public async Task FlushAsync(CancellationToken cancellationToken = default)
569  {
570  await _inserter.FlushAsync(cancellationToken).ConfigureAwait(false);
571 
572  // Wait for all pending batches to be processed
573  while (_inserter.PendingBatches > 0)
574  {
575  cancellationToken.ThrowIfCancellationRequested();
576  await Task.Delay(10, cancellationToken).ConfigureAwait(false);
577  }
578 
579  // Check for errors
580  var errors = _inserter.DrainErrors();
581  if (errors.Count > 0)
582  {
583  var errorMessages = string.Join("; ", errors.Select(e => e.Message));
584  throw new KineticaException($"Batch insert failed with {errors.Count} error(s): {errorMessages}");
585  }
586  }
587 
588  public List<InsertError> GetErrors() => _inserter.DrainErrors();
589 
590  public void Dispose()
591  {
592  _inserter.Dispose();
593  }
594 
595  public async ValueTask DisposeAsync()
596  {
597  await _inserter.DisposeAsync().ConfigureAwait(false);
598  }
599  }
600 
605  {
606  // Pattern 1: INSERT INTO [schema.]table (columns) VALUES (values)
607  private static readonly System.Text.RegularExpressions.Regex _insertWithColumnsPattern = new System.Text.RegularExpressions.Regex(
608  @"^\s*INSERT\s+INTO\s+(?<table>[\w\.]+)\s*\(\s*(?<columns>[\w\s,]+)\s*\)\s*VALUES\s*\((?<values>.*)\)\s*$",
609  System.Text.RegularExpressions.RegexOptions.IgnoreCase | System.Text.RegularExpressions.RegexOptions.Compiled | System.Text.RegularExpressions.RegexOptions.Singleline);
610 
611  // Pattern 2: INSERT INTO [schema.]table VALUES (values) - no column names
612  private static readonly System.Text.RegularExpressions.Regex _insertWithoutColumnsPattern = new System.Text.RegularExpressions.Regex(
613  @"^\s*INSERT\s+INTO\s+(?<table>[\w\.]+)\s+VALUES\s*\((?<values>.*)\)\s*$",
614  System.Text.RegularExpressions.RegexOptions.IgnoreCase | System.Text.RegularExpressions.RegexOptions.Compiled | System.Text.RegularExpressions.RegexOptions.Singleline);
615 
616  // Pattern for extracting column names
617  private static readonly System.Text.RegularExpressions.Regex _columnPattern = new System.Text.RegularExpressions.Regex(
618  @"[\w]+",
619  System.Text.RegularExpressions.RegexOptions.Compiled);
620 
621  public static bool TryParse(string sql, out ParsedInsert? result)
622  {
623  result = null;
624 
625  // Try pattern with columns first
626  var match = _insertWithColumnsPattern.Match(sql);
627  string columnsStr = "";
628 
629  if (!match.Success)
630  {
631  // Try pattern without columns
632  match = _insertWithoutColumnsPattern.Match(sql);
633  if (!match.Success)
634  {
635  return false;
636  }
637  }
638  else
639  {
640  columnsStr = match.Groups["columns"].Value.Trim();
641  }
642 
643  var tableName = match.Groups["table"].Value.Trim();
644  var valuesStr = match.Groups["values"].Value.Trim();
645 
646  // Extract column names
647  var columnNames = new List<string>();
648  if (!string.IsNullOrEmpty(columnsStr))
649  {
650  var columnMatches = _columnPattern.Matches(columnsStr);
651  foreach (System.Text.RegularExpressions.Match cm in columnMatches)
652  {
653  columnNames.Add(cm.Value);
654  }
655  }
656 
657  // Extract values
658  var values = ParseValues(valuesStr);
659 
660  result = new ParsedInsert
661  {
662  TableName = tableName,
663  ColumnNames = columnNames,
664  Values = values
665  };
666 
667  return true;
668  }
669 
670  private static List<object?> ParseValues(string valuesStr)
671  {
672  var values = new List<object?>();
673  var currentValue = new System.Text.StringBuilder();
674  int depth = 0;
675  bool inString = false;
676  char stringChar = '\0';
677 
678  for (int i = 0; i < valuesStr.Length; i++)
679  {
680  char c = valuesStr[i];
681 
682  if (inString)
683  {
684  if (c == stringChar)
685  {
686  // Check for escaped quote
687  if (i + 1 < valuesStr.Length && valuesStr[i + 1] == stringChar)
688  {
689  currentValue.Append(c);
690  i++; // Skip next char
691  }
692  else
693  {
694  inString = false;
695  }
696  }
697  else
698  {
699  currentValue.Append(c);
700  }
701  }
702  else if (c == '\'' || c == '"')
703  {
704  inString = true;
705  stringChar = c;
706  }
707  else if (c == '(')
708  {
709  depth++;
710  currentValue.Append(c);
711  }
712  else if (c == ')')
713  {
714  depth--;
715  currentValue.Append(c);
716  }
717  else if (c == ',' && depth == 0)
718  {
719  values.Add(ParseSingleValue(currentValue.ToString().Trim()));
720  currentValue.Clear();
721  }
722  else
723  {
724  currentValue.Append(c);
725  }
726  }
727 
728  // Add last value
729  if (currentValue.Length > 0)
730  {
731  values.Add(ParseSingleValue(currentValue.ToString().Trim()));
732  }
733 
734  return values;
735  }
736 
737  private static object? ParseSingleValue(string valueStr)
738  {
739  // Check for NULL first (before string handling)
740  if (string.Equals(valueStr, "NULL", StringComparison.OrdinalIgnoreCase))
741  {
742  return null;
743  }
744 
745  // String value (quoted) - handle before empty check to preserve empty strings
746  if ((valueStr.StartsWith("'") && valueStr.EndsWith("'")) ||
747  (valueStr.StartsWith("\"") && valueStr.EndsWith("\"")))
748  {
749  return valueStr.Substring(1, valueStr.Length - 2)
750  .Replace("''", "'")
751  .Replace("\"\"", "\"");
752  }
753 
754  // Boolean
755  if (string.Equals(valueStr, "TRUE", StringComparison.OrdinalIgnoreCase))
756  {
757  return true;
758  }
759  if (string.Equals(valueStr, "FALSE", StringComparison.OrdinalIgnoreCase))
760  {
761  return false;
762  }
763 
764  // Numeric
765  if (long.TryParse(valueStr, out long longValue))
766  {
767  if (longValue >= int.MinValue && longValue <= int.MaxValue)
768  {
769  return (int)longValue;
770  }
771  return longValue;
772  }
773 
774  if (double.TryParse(valueStr, out double doubleValue))
775  {
776  return doubleValue;
777  }
778 
779  // Return as string
780  return valueStr;
781  }
782  }
783 
787  public class ParsedInsert
788  {
789  public string TableName { get; set; } = string.Empty;
790  public List<string> ColumnNames { get; set; } = new();
791  public List<object?> Values { get; set; } = new();
792  }
793 }
int MaxRetries
Maximum number of retry attempts for failed batches.
Manages bulk insert operations for the ADO.NET driver.
static bool TryParse(string sql, out ParsedInsert? result)
bool UpdateOnExistingPk
If true, updates existing records with matching primary keys.
int GetPendingCount(string tableName)
Gets the number of pending records for the specified table.
async Task< long > FlushTableAsync(string tableName, CancellationToken cancellationToken=default)
Flushes all pending records for the specified table.
bool ReturnIndividualErrors
If true, returns individual errors for each failed record.
Column? GetColumn(int index)
Gets a column by index.
Definition: Type.cs:312
bool PutNull(int index)
Sets a null value by column index.
TableBatchContext(Kinetica kinetica, string tableName, KineticaType ktype, IList< string > insertColumnNames, InsertBatchOptions options)
bool Put(int index, int value)
Sets an integer value by column index.
async Task< long > FlushAllAsync(CancellationToken cancellationToken=default)
Flushes all pending records for all tables.
InsertBatchManager(Kinetica kinetica, InsertBatchOptions? options=null)
async Task< bool > InsertAsync(string tableName, IList< string > columnNames, IList< object?> values, CancellationToken cancellationToken=default)
Inserts a record into the specified table using batch processing.
GenericRecord NewInstance()
Creates a new GenericRecord instance of this type.
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
long TotalRecordsInserted
Gets the total number of records inserted across all tables.
int GetTotalPendingCount()
Gets the total number of pending records across all tables.
long TotalBatchesFlushed
Gets the total number of batches flushed across all tables.
SQL INSERT statement parser that extracts table name, column names, and values.
if(args.Length > 0)
Definition: Program.cs:5
int MaxFlushWorkers
Maximum concurrent flush workers.
int MaxInFlightBatches
Maximum in-flight batches for backpressure control.
Parsed INSERT statement result.
Immutable metadata about a column in a Kinetica type.
Definition: Column.cs:11
Manages batch context for a single table using BulkInserter<GenericRecord>.
IList< Column > getColumns()
async Task FlushAsync(CancellationToken cancellationToken=default)
List< InsertError > GetErrors()
long BufferedRecordCount
Gets the number of records currently buffered (not yet flushed).
Options for batch insert operations.
ColumnType ColumnType
Gets the column type.
Definition: Column.cs:57
async Task< int > InsertBatchAsync(string tableName, IList< string > columnNames, IList< IList< object?>> valuesList, CancellationToken cancellationToken=default)
Inserts multiple records into the specified table using batch processing.
int BatchSize
The number of records to batch before flushing.
A generic record that can hold values for any Kinetica type.
int NumStripes
Number of stripes per worker queue.
int RetryDelayMs
Base delay in milliseconds between retries.
async Task InsertAsync(IList< object?> values, CancellationToken cancellationToken)
Configuration options for the BulkInserter<T>.