Kinetica   C#   API  Version 7.2.3.1
BulkInserter.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Concurrent;
3 using System.Collections.Generic;
4 using System.Diagnostics;
5 using System.Linq;
6 using System.Threading;
7 using System.Threading.Channels;
8 using System.Threading.Tasks;
9 using kinetica.Utils;
10 using Records = kinetica.Records;
11 
12 namespace kinetica;
13 
28  public sealed class BulkInserter<T> : IAsyncDisposable, IDisposable
29  where T : Records.IShardKeyExtractor
30  {
31  #region Fields
32 
33  private readonly Kinetica _kinetica;
34  private readonly string _tableName;
35  private readonly KineticaType _ktype;
36  private readonly BulkInserterOptions _options;
37 
38  // Avro encoder - uses DirectAvroEncoder<T> for POCOs, GenericRecordEncoder for GenericRecord
39  private readonly DirectAvroEncoder<T>? _directEncoder;
40  private readonly GenericRecordEncoder? _genericRecordEncoder;
41  private readonly bool _isGenericRecord;
42 
43  // New interface-based key builder (replaces reflection-based RecordKeyBuilder<T>)
44  private readonly Records.RecordKeyBuilder? _shardKeyBuilder;
45  private readonly IList<int>? _routingTable;
46 
47  private readonly StripedWorkerQueue<T>[] _workerQueues;
48  private readonly int _numWorkers;
49 
50  // Flush channel for async batch processing
51  private readonly Channel<FlushJob> _flushChannel;
52  private readonly Task[] _flushWorkers;
53  private readonly CancellationTokenSource _cts;
54 
55  // Backpressure control
56  private readonly SemaphoreSlim _inFlightSemaphore;
57 
58  // Metrics
59  private long _countInserted;
60  private long _countUpdated;
61  private long _pendingBatches;
62  private long _totalBatchesSent;
63  private long _totalBatchesFailed;
64 
65  // Error queue
66  private readonly ConcurrentQueue<InsertError> _errorQueue;
67  private int _errorCount;
68 
69  // Timed flush
70  private Timer? _flushTimer;
71  private volatile bool _timedFlushRunning;
72 
73  // State
74  private volatile bool _isDisposed;
75  private volatile bool _isClosed;
76 
77  // Random for worker selection when no shard key
78  private readonly ThreadLocal<Random> _random;
79 
80  // HA Failover fields (mutable for cluster switching)
81  private readonly int _dbHaRingSize;
82  private volatile int _numClusterSwitches;
83  private volatile Uri? _currentHeadNodeUrl;
84  private readonly object _haLock = new object();
85 
86  // Mutable worker state for HA failover
87  private volatile StripedWorkerQueue<T>[]? _mutableWorkerQueues;
88  private volatile IList<int>? _mutableRoutingTable;
89  private long _shardVersion; // Use Interlocked for thread-safe access
90  private long _shardUpdateTime; // Use Interlocked for thread-safe access
91  private readonly bool _multiHeadEnabled;
92 
93  #endregion
94 
95  #region Constructor
96 
104  public BulkInserter(
106  string tableName,
107  KineticaType ktype,
108  BulkInserterOptions? options = null)
109  {
110  _kinetica = kinetica ?? throw new ArgumentNullException(nameof(kinetica));
111  _tableName = tableName ?? throw new ArgumentNullException(nameof(tableName));
112  _ktype = ktype ?? throw new ArgumentNullException(nameof(ktype));
113  _options = options?.Clone() ?? new BulkInserterOptions();
114 
115  ValidateOptions();
116 
117  _cts = new CancellationTokenSource();
118  _errorQueue = new ConcurrentQueue<InsertError>();
119  _random = new ThreadLocal<Random>(() => new Random(Environment.CurrentManagedThreadId));
120 
121  // Initialize shard key builder using the new interface-based approach
122  // This replaces the old reflection-based RecordKeyBuilder<T>
123  var tempKeyBuilder = new Records.RecordKeyBuilder(_ktype, isPrimaryKey: false);
124  _shardKeyBuilder = tempKeyBuilder.HasKey ? tempKeyBuilder : null;
125 
126  // Register the type mapping so AvroEncode works (for backwards compatibility)
127  _kinetica.SetKineticaSourceClassToTypeMapping(typeof(T), _ktype);
128 
129  // Initialize Avro encoder - use GenericRecordEncoder for GenericRecord, DirectAvroEncoder for POCOs
130  _isGenericRecord = typeof(T) == typeof(Records.GenericRecord);
131  if (_isGenericRecord)
132  {
133  _genericRecordEncoder = GenericRecordEncoder.GetOrCreate(_ktype);
134  _directEncoder = null;
135  }
136  else
137  {
138  _directEncoder = DirectAvroEncoder<T>.GetOrCreate(_ktype);
139  _genericRecordEncoder = null;
140  }
141 
142  // Initialize worker queues
143  (_workerQueues, _routingTable) = InitializeWorkerQueues();
144  _numWorkers = _workerQueues.Length;
145  _multiHeadEnabled = _numWorkers > 1;
146 
147  // Initialize HA failover state
148  _dbHaRingSize = _kinetica.HAManager?.HARingSize ?? 1;
149  _numClusterSwitches = _kinetica.NumClusterSwitches;
150  _currentHeadNodeUrl = _kinetica.URL;
151  _shardVersion = 0;
152  _shardUpdateTime = 0;
153 
154  // Initialize backpressure semaphore
155  _inFlightSemaphore = new SemaphoreSlim(_options.MaxInFlightBatches, _options.MaxInFlightBatches);
156 
157  // Initialize flush channel (unbounded for maximum throughput, like Rust's flume)
158  // Backpressure is handled by semaphore only when needed
159  _flushChannel = Channel.CreateUnbounded<FlushJob>(new UnboundedChannelOptions
160  {
161  SingleReader = false,
162  SingleWriter = false
163  });
164 
165  // Start flush workers
166  _flushWorkers = new Task[_options.MaxFlushWorkers];
167  for (int i = 0; i < _options.MaxFlushWorkers; i++)
168  {
169  _flushWorkers[i] = Task.Run(() => FlushWorkerLoop(_cts.Token));
170  }
171 
172  // Start timed flush if configured (matches Rust implementation)
173  if (_options.FlushIntervalSeconds > 0)
174  {
175  _timedFlushRunning = true;
176  _flushTimer = new Timer(
177  TimedFlushCallback,
178  null,
179  TimeSpan.FromSeconds(_options.FlushIntervalSeconds),
180  TimeSpan.FromSeconds(_options.FlushIntervalSeconds));
181  }
182  }
183 
184  private void ValidateOptions()
185  {
186  if (_options.BatchSize < 1)
187  throw new ArgumentException("BatchSize must be at least 1", nameof(_options));
188  if (_options.MaxInFlightBatches < 1)
189  throw new ArgumentException("MaxInFlightBatches must be at least 1", nameof(_options));
190  if (_options.NumStripes < 1)
191  throw new ArgumentException("NumStripes must be at least 1", nameof(_options));
192  if (_options.MaxFlushWorkers < 1)
193  throw new ArgumentException("MaxFlushWorkers must be at least 1", nameof(_options));
194  }
195 
196  private (StripedWorkerQueue<T>[] queues, IList<int>? routingTable) InitializeWorkerQueues()
197  {
198  var workers = _options.WorkerList;
199  IList<int>? routingTable = null;
200 
201  // Try to get workers from Kinetica if not provided
202  if (workers == null || workers.Count == 0)
203  {
204  try
205  {
206  workers = new WorkerList(_kinetica);
207  }
208  catch
209  {
210  workers = null;
211  }
212  }
213 
214  var queues = new List<StripedWorkerQueue<T>>();
215 
216  if (workers != null && workers.Count > 0)
217  {
218  // Multi-head ingest mode
219  int workerIndex = 0;
220  foreach (var workerUrl in workers)
221  {
222  // Skip removed ranks (null URLs)
223  if (workerUrl == null)
224  {
225  workerIndex++;
226  continue;
227  }
228 
229  var urlStr = workerUrl.ToString().TrimEnd('/');
230  var insertUrl = new Uri($"{urlStr}/insert/records");
231  queues.Add(new StripedWorkerQueue<T>(
232  insertUrl,
233  workerIndex++,
234  _options.NumStripes,
235  _options.BatchSize));
236  }
237 
238  // Get routing table
239  routingTable = _kinetica.adminShowShards().rank;
240 
241  // Validate routing table
242  foreach (var rank in routingTable)
243  {
244  if (rank > queues.Count)
245  {
246  throw new KineticaException("Not enough worker URLs specified.");
247  }
248  }
249  }
250  else
251  {
252  // Single-head mode
253  var urlStr = _kinetica.URL.ToString().TrimEnd('/');
254  var insertUrl = new Uri($"{urlStr}/insert/records");
255  queues.Add(new StripedWorkerQueue<T>(
256  insertUrl,
257  0,
258  _options.NumStripes,
259  _options.BatchSize));
260  }
261 
262  return (queues.ToArray(), routingTable);
263  }
264 
265  #endregion
266 
267  #region Properties
268 
272  public long CountInserted => Interlocked.Read(ref _countInserted);
273 
277  public long CountUpdated => Interlocked.Read(ref _countUpdated);
278 
282  public long PendingBatches => Interlocked.Read(ref _pendingBatches);
283 
287  public long TotalBatchesSent => Interlocked.Read(ref _totalBatchesSent);
288 
292  public long TotalBatchesFailed => Interlocked.Read(ref _totalBatchesFailed);
293 
297  public string TableName => _tableName;
298 
302  public int NumWorkers => _numWorkers;
303 
307  public int ErrorCount => _errorCount;
308 
312  public bool IsTimedFlushRunning => _timedFlushRunning;
313 
318  {
319  var available = _inFlightSemaphore.CurrentCount;
320  var inFlight = _options.MaxInFlightBatches - available;
321 
322  return new BackpressureMetrics
323  {
324  MaxInFlightBatches = _options.MaxInFlightBatches,
325  AvailablePermits = available,
326  InFlightBatches = inFlight,
327  PendingBatches = Interlocked.Read(ref _pendingBatches),
328  UtilizationPercent = (double)inFlight / _options.MaxInFlightBatches * 100
329  };
330  }
331 
332  #endregion
333 
334  #region Insert Methods
335 
340  public void Insert(T record)
341  {
342  ThrowIfDisposed();
343 
344  var (workerIndex, stripeHash) = ComputeRouting(record);
345  var queue = _workerQueues[workerIndex];
346 
347  var batch = queue.Add(record, stripeHash, out int stripeIndex);
348 
349  if (batch != null)
350  {
351  EnqueueBatch(new FlushJob(queue.Url, workerIndex, stripeIndex, batch));
352  }
353  }
354 
361  public async ValueTask InsertAsync(T record, CancellationToken cancellationToken = default)
362  {
363  ThrowIfDisposed();
364 
365  var (workerIndex, stripeHash) = ComputeRouting(record);
366  var queue = _workerQueues[workerIndex];
367 
368  var batch = queue.Add(record, stripeHash, out int stripeIndex);
369 
370  if (batch != null)
371  {
372  await EnqueueBatchWithBackpressureAsync(
373  new FlushJob(queue.Url, workerIndex, stripeIndex, batch, usedBackpressure: true),
374  cancellationToken);
375  }
376  }
377 
383  public void InsertBatch(IReadOnlyList<T> records)
384  {
385  ThrowIfDisposed();
386 
387  var count = records.Count;
388  if (count == 0)
389  return;
390 
391  // Pre-allocate grouping dictionary with estimated capacity
392  // Key: (workerIndex << 16) | stripeIndex to avoid tuple allocation
393  var groups = new Dictionary<int, List<T>>(_numWorkers * _options.NumStripes);
394 
395  // Compute routing for all records in parallel, storing results in thread-local buffers
396  var routingResults = new (int workerIndex, int stripeIndex)[count];
397 
398  Parallel.For(0, count, i =>
399  {
400  var record = records[i];
401  var (workerIndex, stripeHash) = ComputeRouting(record);
402  var stripeIndex = (int)(stripeHash & (_workerQueues[workerIndex].NumStripes - 1));
403  routingResults[i] = (workerIndex, stripeIndex);
404  });
405 
406  // Group records by (workerIndex, stripeIndex) - sequential but fast
407  for (int i = 0; i < count; i++)
408  {
409  var (workerIndex, stripeIndex) = routingResults[i];
410  var key = (workerIndex << 16) | stripeIndex;
411 
412  if (!groups.TryGetValue(key, out var list))
413  {
414  list = new List<T>(Math.Min(count, _options.BatchSize));
415  groups[key] = list;
416  }
417  list.Add(records[i]);
418  }
419 
420  // Add to queues and collect batches - this is lock-heavy but grouped
421  var batches = new List<FlushJob>(groups.Count);
422 
423  foreach (var kvp in groups)
424  {
425  var workerIndex = kvp.Key >> 16;
426  var stripeIndex = kvp.Key & 0xFFFF;
427  var queue = _workerQueues[workerIndex];
428  var groupRecords = kvp.Value;
429 
430  var completeBatches = queue.AddRangeToStripe(groupRecords, stripeIndex);
431 
432  foreach (var batch in completeBatches)
433  {
434  batches.Add(new FlushJob(queue.Url, workerIndex, stripeIndex, batch));
435  }
436  }
437 
438  // Enqueue all batches
439  foreach (var batch in batches)
440  {
441  EnqueueBatch(batch);
442  }
443  }
444 
450  public async ValueTask InsertBatchAsync(IReadOnlyList<T> records, CancellationToken cancellationToken = default)
451  {
452  ThrowIfDisposed();
453 
454  var count = records.Count;
455  if (count == 0)
456  return;
457 
458  // Pre-allocate grouping structures
459  var groups = new Dictionary<int, List<T>>(_numWorkers * _options.NumStripes);
460  var routingResults = new (int workerIndex, int stripeIndex)[count];
461 
462  // Compute routing in parallel on thread pool
463  await Task.Run(() =>
464  {
465  Parallel.For(0, count, i =>
466  {
467  var record = records[i];
468  var (workerIndex, stripeHash) = ComputeRouting(record);
469  var stripeIndex = (int)(stripeHash & (_workerQueues[workerIndex].NumStripes - 1));
470  routingResults[i] = (workerIndex, stripeIndex);
471  });
472  }, cancellationToken);
473 
474  // Group records by (workerIndex, stripeIndex) - sequential but fast
475  for (int i = 0; i < count; i++)
476  {
477  var (workerIndex, stripeIndex) = routingResults[i];
478  var key = (workerIndex << 16) | stripeIndex;
479 
480  if (!groups.TryGetValue(key, out var list))
481  {
482  list = new List<T>(Math.Min(count, _options.BatchSize));
483  groups[key] = list;
484  }
485  list.Add(records[i]);
486  }
487 
488  // Collect batches
489  var batches = new List<FlushJob>(groups.Count);
490 
491  foreach (var kvp in groups)
492  {
493  var workerIndex = kvp.Key >> 16;
494  var stripeIndex = kvp.Key & 0xFFFF;
495  var queue = _workerQueues[workerIndex];
496  var groupRecords = kvp.Value;
497 
498  var completeBatches = queue.AddRangeToStripe(groupRecords, stripeIndex);
499 
500  foreach (var batch in completeBatches)
501  {
502  batches.Add(new FlushJob(queue.Url, workerIndex, stripeIndex, batch, usedBackpressure: true));
503  }
504  }
505 
506  // Enqueue all batches with backpressure
507  foreach (var batch in batches)
508  {
509  await EnqueueBatchWithBackpressureAsync(batch, cancellationToken);
510  }
511  }
512 
513  #endregion
514 
515  #region Routing
516 
517  private (int workerIndex, long stripeHash) ComputeRouting(T record)
518  {
519  int workerIndex;
520  long stripeHash;
521 
522  if (_routingTable == null)
523  {
524  // Single worker mode
525  workerIndex = 0;
526  stripeHash = ComputeStripeHash(record);
527  }
528  else if (_shardKeyBuilder == null)
529  {
530  // No shard key, use random worker
531  workerIndex = _random.Value!.Next(_numWorkers);
532  stripeHash = ComputeStripeHash(record);
533  }
534  else
535  {
536  // Route based on shard key using the new IShardKeyExtractor interface
537  var shardKeyValues = record.GetShardKeyValues();
538  var shardKey = _shardKeyBuilder.Build(shardKeyValues);
539 
540  if (shardKey != null && shardKey.IsValid)
541  {
542  workerIndex = shardKey.Route(_routingTable);
543  stripeHash = shardKey.HashCode();
544  }
545  else
546  {
547  workerIndex = _random.Value!.Next(_numWorkers);
548  stripeHash = ComputeStripeHash(record);
549  }
550  }
551 
552  return (workerIndex, stripeHash);
553  }
554 
555  private long ComputeStripeHash(T record)
556  {
557  // Use shard key values hash or record's hash code for stripe distribution
558  var shardKeyValues = record.GetShardKeyValues();
559  if (shardKeyValues.Count > 0)
560  {
561  // Compute hash from shard key values
562  int hash = 17;
563  foreach (var (name, value) in shardKeyValues)
564  {
565  hash = hash * 31 + (name?.GetHashCode() ?? 0);
566  hash = hash * 31 + value.GetHashCode();
567  }
568  return (uint)hash;
569  }
570  return (uint)(record?.GetHashCode() ?? 0);
571  }
572 
573  #endregion
574 
575  #region Flush
576 
580  public void Flush()
581  {
582  FlushAsync().GetAwaiter().GetResult();
583  }
584 
589  public async Task FlushAsync(CancellationToken cancellationToken = default)
590  {
591  ThrowIfDisposed();
592 
593  var batches = new List<FlushJob>();
594 
595  // Collect all partial batches from all worker queues
596  foreach (var queue in _workerQueues)
597  {
598  var flushedBatches = queue.FlushAll();
599  foreach (var (stripeIndex, batch) in flushedBatches)
600  {
601  batches.Add(new FlushJob(queue.Url, queue.WorkerIndex, stripeIndex, batch, usedBackpressure: true));
602  }
603  }
604 
605  // Enqueue all batches with backpressure
606  foreach (var batch in batches)
607  {
608  await EnqueueBatchWithBackpressureAsync(batch, cancellationToken);
609  }
610  }
611 
612  private void EnqueueBatch(FlushJob job)
613  {
614  Interlocked.Increment(ref _pendingBatches);
615 
616  // With unbounded channel, TryWrite always succeeds (non-blocking)
617  if (!_flushChannel.Writer.TryWrite(job))
618  {
619  // Should never happen with unbounded channel, but handle gracefully
620  Interlocked.Decrement(ref _pendingBatches);
621  ProcessBatch(job).GetAwaiter().GetResult();
622  }
623  }
624 
625  private async ValueTask EnqueueBatchWithBackpressureAsync(FlushJob job, CancellationToken cancellationToken)
626  {
627  // Wait for a permit from the semaphore (backpressure control)
628  // This limits the number of in-flight batches to prevent memory overflow
629  await _inFlightSemaphore.WaitAsync(cancellationToken);
630 
631  Interlocked.Increment(ref _pendingBatches);
632 
633  // With unbounded channel, TryWrite always succeeds (non-blocking)
634  if (!_flushChannel.Writer.TryWrite(job))
635  {
636  // Should never happen with unbounded channel, but handle gracefully
637  Interlocked.Decrement(ref _pendingBatches);
638  _inFlightSemaphore.Release();
639  await ProcessBatch(job);
640  }
641  }
642 
643  #endregion
644 
645  #region Timed Flush
646 
651  private void TimedFlushCallback(object? state)
652  {
653  // Check if timed flush is still running (matches Rust's double-check pattern)
654  if (!_timedFlushRunning || _isClosed || _isDisposed)
655  return;
656 
657  try
658  {
659  // Use Task.Run to avoid blocking the timer thread, but properly await
660  Task.Run(async () =>
661  {
662  // Double-check after potential await (matches Rust pattern)
663  if (!_timedFlushRunning || _isClosed || _isDisposed)
664  return;
665 
666  await FlushAsync().ConfigureAwait(false);
667  }).ConfigureAwait(false);
668  }
669  catch
670  {
671  // Suppress exceptions in timer callback to prevent timer from stopping
672  // Errors are already captured in the error queue by ProcessBatch
673  }
674  }
675 
676  #endregion
677 
678  #region Flush Worker
679 
680  private async Task FlushWorkerLoop(CancellationToken cancellationToken)
681  {
682  try
683  {
684  await foreach (var job in _flushChannel.Reader.ReadAllAsync(cancellationToken))
685  {
686  try
687  {
688  await ProcessBatch(job);
689  }
690  finally
691  {
692  Interlocked.Decrement(ref _pendingBatches);
693  // Only release semaphore permit if this batch was enqueued with backpressure
694  // This matches the Rust implementation where insert() is unbounded but
695  // insert_with_backpressure() uses semaphore-based flow control
696  if (job.UsedBackpressure)
697  {
698  _inFlightSemaphore.Release();
699  }
700  }
701  }
702  }
703  catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
704  {
705  // Normal shutdown
706  }
707  }
708 
709  private async Task ProcessBatch(FlushJob job)
710  {
711  if (job.Records.Count == 0)
712  return;
713 
714  var sw = Stopwatch.StartNew();
715  double encodeTimeMs = 0;
716  double networkTimeMs = 0;
717  int retryAttempt = 0;
718  Exception? lastException = null;
719  bool haFailoverAttempted = false;
720 
721  // Capture the insertion attempt timestamp for HA failover logic
722  var insertionAttemptTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
723 
724  // Get current worker URL (may be updated after failover)
725  var currentWorkerUrl = job.WorkerUrl;
726 
727  while (retryAttempt <= _options.MaxRetries)
728  {
729  // Check for updated worker URL after a failover has occurred
730  if (haFailoverAttempted && _mutableWorkerQueues != null)
731  {
732  var effectiveQueues = GetEffectiveWorkerQueues();
733  if (job.WorkerIndex < effectiveQueues.Length)
734  {
735  currentWorkerUrl = effectiveQueues[job.WorkerIndex].Url;
736  }
737  }
738 
739  try
740  {
741  // Encode records and build request
742  var encodeSw = Stopwatch.StartNew();
743  var encodedRecords = EncodeRecords(job.Records);
744  byte[] requestBytes = DirectRequestEncoder.Encode(_tableName, encodedRecords, _options.InsertOptions);
745  encodeTimeMs = encodeSw.Elapsed.TotalMilliseconds;
746 
747  // Send request using Kinetica's centralized submission method
748  var networkSw = Stopwatch.StartNew();
749  var rawResponse = await Task.Run(() =>
750  _kinetica.SubmitRequestRawBytes(currentWorkerUrl, requestBytes)).ConfigureAwait(false);
751 
752  if (rawResponse.status != "OK")
753  {
754  throw new KineticaException(rawResponse.message ?? "Unknown server error");
755  }
756 
757  InsertRecordsResponse response = _kinetica.AvroDecode<InsertRecordsResponse>(rawResponse.data);
758  networkTimeMs = networkSw.Elapsed.TotalMilliseconds;
759 
760  // Update metrics
761  Interlocked.Add(ref _countInserted, response.count_inserted);
762  Interlocked.Add(ref _countUpdated, response.count_updated);
763  Interlocked.Increment(ref _totalBatchesSent);
764 
765  // Notify listener
766  _options.BatchListener?.OnBatchInserted(new BatchInsertionResult
767  {
768  WorkerUrl = currentWorkerUrl,
769  BatchSize = job.Records.Count,
770  CountInserted = response.count_inserted,
771  CountUpdated = response.count_updated,
772  Success = true,
773  EncodeTimeMs = encodeTimeMs,
774  NetworkTimeMs = networkTimeMs,
775  TotalTimeMs = sw.Elapsed.TotalMilliseconds,
776  WorkerIndex = job.WorkerIndex,
777  StripeIndex = job.StripeIndex,
778  RetryAttempt = retryAttempt
779  });
780 
781  return; // Success
782  }
783  catch (Exception ex)
784  {
785  lastException = ex;
786 
787  // Check if this is a connection error that warrants HA failover
788  // Use Kinetica's centralized connection error detection
789  if (Kinetica.IsConnectionError(ex) && !haFailoverAttempted && _dbHaRingSize > 1)
790  {
791  // Attempt HA failover
792  var shouldRetry = await HandleConnectionErrorAsync(ex, insertionAttemptTimestamp).ConfigureAwait(false);
793  if (shouldRetry)
794  {
795  haFailoverAttempted = true;
796  // Don't increment retry count for failover, retry immediately
797  continue;
798  }
799  }
800 
801  retryAttempt++;
802 
803  if (retryAttempt <= _options.MaxRetries)
804  {
805  // Exponential backoff
806  await Task.Delay(TimeSpan.FromSeconds(retryAttempt)).ConfigureAwait(false);
807  }
808  }
809  }
810 
811  // All retries failed
812  Interlocked.Increment(ref _totalBatchesFailed);
813 
814  // Queue error
815  QueueError(new InsertError
816  {
817  WorkerUrl = currentWorkerUrl,
818  RecordCount = job.Records.Count,
819  Message = lastException?.Message ?? "Unknown error",
820  Exception = lastException,
821  RetryAttempt = retryAttempt - 1
822  });
823 
824  // Notify listener
825  _options.BatchListener?.OnBatchInserted(new BatchInsertionResult
826  {
827  WorkerUrl = currentWorkerUrl,
828  BatchSize = job.Records.Count,
829  Success = false,
830  ErrorMessage = lastException?.Message,
831  EncodeTimeMs = encodeTimeMs,
832  NetworkTimeMs = networkTimeMs,
833  TotalTimeMs = sw.Elapsed.TotalMilliseconds,
834  WorkerIndex = job.WorkerIndex,
835  StripeIndex = job.StripeIndex,
836  RetryAttempt = retryAttempt - 1
837  });
838  }
839 
840  private List<byte[]> EncodeRecords(IReadOnlyList<T> records)
841  {
842  // Use the appropriate encoder based on record type
843  if (_isGenericRecord)
844  {
845  // For GenericRecord, use the GenericRecordEncoder
846  var genericRecords = (IReadOnlyList<Records.GenericRecord>)(object)records;
847  return _genericRecordEncoder!.EncodeManyAsList(genericRecords);
848  }
849  else
850  {
851  // For POCOs, use the DirectAvroEncoder with compiled property accessors
852  return _directEncoder!.EncodeManyAsList(records);
853  }
854  }
855 
856  #endregion
857 
858  #region Error Handling
859 
860  private void QueueError(InsertError error)
861  {
862  if (Interlocked.Increment(ref _errorCount) <= _options.MaxErrorQueueSize)
863  {
864  _errorQueue.Enqueue(error);
865  }
866  else
867  {
868  Interlocked.Decrement(ref _errorCount);
869  }
870  }
871 
875  public List<InsertError> DrainErrors()
876  {
877  var errors = new List<InsertError>();
878  while (_errorQueue.TryDequeue(out var error))
879  {
880  Interlocked.Decrement(ref _errorCount);
881  errors.Add(error);
882  }
883  return errors;
884  }
885 
889  public IReadOnlyList<InsertError> PeekErrors()
890  {
891  return _errorQueue.ToArray();
892  }
893 
894  #endregion
895 
896  #region HA Failover
897 
901  public int NumClusterSwitches => _numClusterSwitches;
902 
906  public int HARingSize => _dbHaRingSize;
907 
915  private Task<bool> ForceFailoverAsync(Uri oldUrl, int oldClusterSwitchCount)
916  {
917  if (_kinetica.HAManager == null || _dbHaRingSize <= 1)
918  {
919  return Task.FromResult(false);
920  }
921 
922  // Use Kinetica's centralized failover method
923  var newUrl = _kinetica.ForceHAFailover(oldUrl, oldClusterSwitchCount);
924  if (newUrl != null)
925  {
926  // Check worker ranks if multi-head is enabled
927  bool isClusterHealthy = true;
928 
929  if (_multiHeadEnabled)
930  {
931  try
932  {
933  // Test getting workers from the new cluster
934  var workers = new WorkerList(_kinetica);
935  if (workers.Count == 0)
936  {
937  isClusterHealthy = false;
938  }
939  }
940  catch
941  {
942  isClusterHealthy = false;
943  }
944  }
945 
946  if (isClusterHealthy)
947  {
948  lock (_haLock)
949  {
950  _currentHeadNodeUrl = newUrl;
951  _numClusterSwitches = _kinetica.NumClusterSwitches;
952  }
953  return Task.FromResult(true);
954  }
955  }
956 
957  return Task.FromResult(false);
958  }
959 
966  private async Task<bool> UpdateWorkerQueuesAsync(int countClusterSwitches, bool doReconstructWorkerUrls)
967  {
968  var reconstructWorkerUrls = doReconstructWorkerUrls && _multiHeadEnabled;
969 
970  try
971  {
972  // Get the latest shard mapping information
973  var shardInfo = _kinetica.adminShowShards();
974  var newShardVersion = shardInfo.version;
975 
976  // No-op if the shard version hasn't changed
977  if (Interlocked.Read(ref _shardVersion) == newShardVersion)
978  {
979  var currNumClusterSwitches = _kinetica.NumClusterSwitches;
980  if (countClusterSwitches == currNumClusterSwitches)
981  {
982  if (reconstructWorkerUrls)
983  {
984  return await ReconstructWorkerUrlsAsync().ConfigureAwait(false);
985  }
986  return false;
987  }
988 
989  // Update the HA ring node switch counter
990  lock (_haLock)
991  {
992  _numClusterSwitches = currNumClusterSwitches;
993  }
994  }
995 
996  // Save the new shard version and update time
997  Interlocked.Exchange(ref _shardVersion, newShardVersion);
998  Interlocked.Exchange(ref _shardUpdateTime, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
999 
1000  // Update the routing table
1001  lock (_haLock)
1002  {
1003  _mutableRoutingTable = shardInfo.rank;
1004  _currentHeadNodeUrl = _kinetica.URL;
1005  _numClusterSwitches = _kinetica.NumClusterSwitches;
1006  }
1007 
1008  // Reconstruct worker URLs if needed
1009  if (reconstructWorkerUrls)
1010  {
1011  await ReconstructWorkerUrlsAsync().ConfigureAwait(false);
1012  }
1013 
1014  return true;
1015  }
1016  catch (Exception ex) when (Kinetica.IsConnectionError(ex))
1017  {
1018  // Connection error during shard info retrieval
1019  return false;
1020  }
1021  }
1022 
1026  private Task<bool> ReconstructWorkerUrlsAsync()
1027  {
1028  try
1029  {
1030  // Fetch new worker list from the cluster
1031  var newWorkerList = new WorkerList(_kinetica);
1032 
1033  if (newWorkerList.Count == 0)
1034  {
1035  return Task.FromResult(false);
1036  }
1037 
1038  // Build new worker queues
1039  var newQueues = new List<StripedWorkerQueue<T>>();
1040  int workerIndex = 0;
1041  foreach (var workerUrl in newWorkerList)
1042  {
1043  // Skip removed ranks (null URLs)
1044  if (workerUrl == null)
1045  {
1046  workerIndex++;
1047  continue;
1048  }
1049 
1050  var urlStr = workerUrl.ToString().TrimEnd('/');
1051  var insertUrl = new Uri($"{urlStr}/insert/records");
1052  newQueues.Add(new StripedWorkerQueue<T>(
1053  insertUrl,
1054  workerIndex++,
1055  _options.NumStripes,
1056  _options.BatchSize));
1057  }
1058 
1059  // Update mutable worker queues
1060  lock (_haLock)
1061  {
1062  _mutableWorkerQueues = newQueues.ToArray();
1063  }
1064 
1065  return Task.FromResult(true);
1066  }
1067  catch (Exception)
1068  {
1069  return Task.FromResult(false);
1070  }
1071  }
1072 
1076  private StripedWorkerQueue<T>[] GetEffectiveWorkerQueues()
1077  {
1078  return _mutableWorkerQueues ?? _workerQueues;
1079  }
1080 
1084  private IList<int>? GetEffectiveRoutingTable()
1085  {
1086  return _mutableRoutingTable ?? _routingTable;
1087  }
1088 
1096  private async Task<bool> HandleConnectionErrorAsync(Exception ex, long insertionAttemptTimestamp)
1097  {
1098  if (!Kinetica.IsConnectionError(ex))
1099  {
1100  return false;
1101  }
1102 
1103  var currUrl = _currentHeadNodeUrl;
1104  var currentCountClusterSwitches = _numClusterSwitches;
1105 
1106  bool didFailoverSucceed = false;
1107 
1108  if (currUrl != null && _dbHaRingSize > 1)
1109  {
1110  didFailoverSucceed = await ForceFailoverAsync(currUrl, currentCountClusterSwitches).ConfigureAwait(false);
1111  }
1112 
1113  // Update worker queues
1114  var updatedWorkerQueues = await UpdateWorkerQueuesAsync(currentCountClusterSwitches, true).ConfigureAwait(false);
1115 
1116  var shardUpdateTime = Interlocked.Read(ref _shardUpdateTime);
1117  var retry = didFailoverSucceed || updatedWorkerQueues || insertionAttemptTimestamp < shardUpdateTime;
1118 
1119  return retry;
1120  }
1121 
1122  #endregion
1123 
1124  #region Close
1125 
1129  public void Close()
1130  {
1131  CloseAsync().GetAwaiter().GetResult();
1132  }
1133 
1139  public async Task CloseAsync(CancellationToken cancellationToken = default)
1140  {
1141  if (_isClosed)
1142  return;
1143 
1144  _isClosed = true;
1145 
1146  // Stop timed flush (matches Rust's stop_timed_flush pattern)
1147  _timedFlushRunning = false;
1148  _flushTimer?.Dispose();
1149  _flushTimer = null;
1150 
1151  // Flush remaining records
1152  await FlushAsync(cancellationToken);
1153 
1154  // Complete the channel - no more jobs can be enqueued
1155  _flushChannel.Writer.Complete();
1156 
1157  // Wait for all flush workers to complete processing all remaining batches.
1158  // This is the definitive wait - workers will exit only after the channel is
1159  // drained (Complete() was called) and all jobs have been processed.
1160  // We do NOT use a timeout here to prevent data loss.
1161  try
1162  {
1163  await Task.WhenAll(_flushWorkers).WaitAsync(cancellationToken);
1164  }
1165  catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
1166  {
1167  // User requested cancellation - cancel workers and exit
1168  _cts.Cancel();
1169  // Give workers a moment to respond to cancellation
1170  try
1171  {
1172  await Task.WhenAll(_flushWorkers).WaitAsync(TimeSpan.FromSeconds(1));
1173  }
1174  catch
1175  {
1176  // Ignore - best effort cleanup
1177  }
1178  throw; // Re-throw the cancellation
1179  }
1180 
1181  // Verify all batches were processed (defensive check)
1182  var remainingBatches = Interlocked.Read(ref _pendingBatches);
1183  if (remainingBatches != 0)
1184  {
1185  // This should never happen, but log/handle if it does
1186  throw new InvalidOperationException(
1187  $"BulkInserter.CloseAsync completed but {remainingBatches} batches are still pending. " +
1188  "This indicates a bug in the batch processing logic.");
1189  }
1190  }
1191 
1192  #endregion
1193 
1194  #region Disposal
1195 
1196  private void ThrowIfDisposed()
1197  {
1198  if (_isDisposed)
1199  throw new ObjectDisposedException(nameof(BulkInserter<T>));
1200  }
1201 
1205  public void Dispose()
1206  {
1207  DisposeAsync().AsTask().GetAwaiter().GetResult();
1208  }
1209 
1213  public async ValueTask DisposeAsync()
1214  {
1215  if (_isDisposed)
1216  return;
1217 
1218  _isDisposed = true;
1219 
1220  try
1221  {
1222  if (!_isClosed)
1223  {
1224  await CloseAsync();
1225  }
1226  }
1227  catch
1228  {
1229  // Suppress exceptions during disposal
1230  }
1231  finally
1232  {
1233  _cts.Cancel();
1234  _cts.Dispose();
1235  _inFlightSemaphore.Dispose();
1236  _flushTimer?.Dispose();
1237  _random.Dispose();
1238  }
1239  }
1240 
1241  #endregion
1242 
1243  #region Nested Types
1244 
1245  private readonly struct FlushJob
1246  {
1247  public Uri WorkerUrl { get; }
1248  public int WorkerIndex { get; }
1249  public int StripeIndex { get; }
1250  public IReadOnlyList<T> Records { get; }
1251  public bool UsedBackpressure { get; }
1252 
1253  public FlushJob(Uri workerUrl, int workerIndex, int stripeIndex, IReadOnlyList<T> records, bool usedBackpressure = false)
1254  {
1255  WorkerUrl = workerUrl;
1256  WorkerIndex = workerIndex;
1257  StripeIndex = stripeIndex;
1258  Records = records;
1259  UsedBackpressure = usedBackpressure;
1260  }
1261  }
1262 
1263  #endregion
1264  }
1265 
1269  public sealed class BackpressureMetrics
1270  {
1274  public int MaxInFlightBatches { get; init; }
1275 
1279  public int AvailablePermits { get; init; }
1280 
1284  public int InFlightBatches { get; init; }
1285 
1289  public long PendingBatches { get; init; }
1290 
1294  public double UtilizationPercent { get; init; }
1295  }
A list of worker URLs to use for multi-head ingest.
Definition: WorkerList.cs:11
int AvailablePermits
Number of available permits in the semaphore.
string Message
Error message.
long TotalBatchesSent
Gets the total number of batches sent.
async Task CloseAsync(CancellationToken cancellationToken=default)
Flushes all records and waits for all pending batches to complete.
void InsertBatch(IReadOnlyList< T > records)
Inserts multiple records with parallel routing computation.
IReadOnlyList< InsertError > PeekErrors()
Peeks at errors without removing them.
double UtilizationPercent
Utilization percentage (in-flight / max * 100).
void Close()
Flushes all records and waits for all pending batches to complete.
async Task FlushAsync(CancellationToken cancellationToken=default)
Flushes all queued records to Kinetica asynchronously.
int count_inserted
The number of records inserted.
async ValueTask DisposeAsync()
Disposes the BulkInserter asynchronously.
int ErrorCount
Gets the number of errors in the error queue.
int NumWorkers
Gets the number of workers.
A worker queue with multiple stripes to reduce lock contention.
List< InsertError > DrainErrors()
Drains and returns all errors from the error queue.
long PendingBatches
Number of batches pending (queued + in-flight).
int InFlightBatches
Number of batches currently in-flight.
High-performance Avro encoder for GenericRecord instances.
A set of results returned by Kinetica.insertRecords.
int MaxInFlightBatches
Maximum number of in-flight batches allowed.
long CountUpdated
Gets the total count of records updated.
High-performance bulk inserter for Kinetica with support for multi-head ingest,
Definition: BulkInserter.cs:28
BackpressureMetrics GetBackpressureMetrics()
Gets backpressure metrics.
void Dispose()
Disposes the BulkInserter, closing it if not already closed.
Direct binary encoder for RawInsertRecordsRequest that bypasses the Apache Avro library.
static DirectAvroEncoder< T > GetOrCreate(KineticaType ktype)
Gets or creates a DirectAvroEncoder for the specified type and KineticaType.
Definition: AvroEncoders.cs:95
int NumClusterSwitches
Gets the number of times the cluster has been switched due to failover.
void Flush()
Flushes all queued records to Kinetica.
kinetica.Records Records
Definition: BulkInserter.cs:10
Error information for failed insertions.
async ValueTask InsertBatchAsync(IReadOnlyList< T > records, CancellationToken cancellationToken=default)
Inserts multiple records with async backpressure control.
async ValueTask InsertAsync(T record, CancellationToken cancellationToken=default)
Inserts a single record with async backpressure control.
long CountInserted
Gets the total count of records inserted.
int count_updated
The number of records updated.
Backpressure metrics for monitoring the BulkInserter.
int HARingSize
Gets the HA ring size (number of clusters).
BulkInserter(Kinetica kinetica, string tableName, KineticaType ktype, BulkInserterOptions? options=null)
Creates a new BulkInserter for the specified table.
void Insert(T record)
Inserts a single record.
Ultra-high-performance Avro encoder that writes directly to binary format without using GenericRecord...
Definition: AvroEncoders.cs:83
long TotalBatchesFailed
Gets the total number of batches that failed.
Failover to clusters in a random order (default)
static byte [] Encode(string tableName, IReadOnlyList< byte[]> records, IDictionary< string, string >? options)
Encodes a RawInsertRecordsRequest directly to Avro binary format.
Interface for extracting shard key values from a record.
long PendingBatches
Gets the number of batches currently pending (queued or in-flight).
bool IsTimedFlushRunning
Gets whether timed flush is currently running.
string TableName
Gets the table name.
static GenericRecordEncoder GetOrCreate(KineticaType ktype)
Gets or creates a GenericRecordEncoder for the specified KineticaType.
Result of a batch insertion operation.
Configuration options for the BulkInserter<T>.