2 using System.Collections.Concurrent;
3 using System.Collections.Generic;
4 using System.Diagnostics;
6 using System.Threading;
7 using System.Threading.Channels;
8 using System.Threading.Tasks;
28 public sealed
class BulkInserter<T> : IAsyncDisposable, IDisposable
34 private readonly
string _tableName;
41 private readonly
bool _isGenericRecord;
44 private readonly
Records.RecordKeyBuilder? _shardKeyBuilder;
45 private readonly IList<int>? _routingTable;
48 private readonly
int _numWorkers;
51 private readonly Channel<FlushJob> _flushChannel;
52 private readonly Task[] _flushWorkers;
53 private readonly CancellationTokenSource _cts;
56 private readonly SemaphoreSlim _inFlightSemaphore;
59 private long _countInserted;
60 private long _countUpdated;
61 private long _pendingBatches;
62 private long _totalBatchesSent;
63 private long _totalBatchesFailed;
66 private readonly ConcurrentQueue<InsertError> _errorQueue;
67 private int _errorCount;
70 private Timer? _flushTimer;
71 private volatile bool _timedFlushRunning;
74 private volatile bool _isDisposed;
75 private volatile bool _isClosed;
78 private readonly ThreadLocal<Random> _random;
81 private readonly
int _dbHaRingSize;
82 private volatile int _numClusterSwitches;
83 private volatile Uri? _currentHeadNodeUrl;
84 private readonly
object _haLock =
new object();
88 private volatile IList<int>? _mutableRoutingTable;
89 private long _shardVersion;
90 private long _shardUpdateTime;
91 private readonly
bool _multiHeadEnabled;
111 _tableName = tableName ??
throw new ArgumentNullException(nameof(tableName));
112 _ktype = ktype ??
throw new ArgumentNullException(nameof(ktype));
117 _cts =
new CancellationTokenSource();
118 _errorQueue =
new ConcurrentQueue<InsertError>();
119 _random =
new ThreadLocal<Random>(() =>
new Random(Environment.CurrentManagedThreadId));
123 var tempKeyBuilder =
new Records.RecordKeyBuilder(_ktype, isPrimaryKey:
false);
124 _shardKeyBuilder = tempKeyBuilder.HasKey ? tempKeyBuilder :
null;
127 _kinetica.SetKineticaSourceClassToTypeMapping(typeof(T), _ktype);
130 _isGenericRecord = typeof(T) == typeof(
Records.GenericRecord);
131 if (_isGenericRecord)
134 _directEncoder =
null;
139 _genericRecordEncoder =
null;
143 (_workerQueues, _routingTable) = InitializeWorkerQueues();
144 _numWorkers = _workerQueues.Length;
145 _multiHeadEnabled = _numWorkers > 1;
148 _dbHaRingSize = _kinetica.HAManager?.HARingSize ?? 1;
149 _numClusterSwitches = _kinetica.NumClusterSwitches;
150 _currentHeadNodeUrl = _kinetica.URL;
152 _shardUpdateTime = 0;
155 _inFlightSemaphore =
new SemaphoreSlim(_options.MaxInFlightBatches, _options.MaxInFlightBatches);
159 _flushChannel = Channel.CreateUnbounded<FlushJob>(
new UnboundedChannelOptions
161 SingleReader =
false,
166 _flushWorkers =
new Task[_options.MaxFlushWorkers];
167 for (
int i = 0; i < _options.MaxFlushWorkers; i++)
169 _flushWorkers[i] = Task.Run(() => FlushWorkerLoop(_cts.Token));
173 if (_options.FlushIntervalSeconds > 0)
175 _timedFlushRunning =
true;
176 _flushTimer =
new Timer(
179 TimeSpan.FromSeconds(_options.FlushIntervalSeconds),
180 TimeSpan.FromSeconds(_options.FlushIntervalSeconds));
184 private void ValidateOptions()
186 if (_options.BatchSize < 1)
187 throw new ArgumentException(
"BatchSize must be at least 1", nameof(_options));
188 if (_options.MaxInFlightBatches < 1)
189 throw new ArgumentException(
"MaxInFlightBatches must be at least 1", nameof(_options));
190 if (_options.NumStripes < 1)
191 throw new ArgumentException(
"NumStripes must be at least 1", nameof(_options));
192 if (_options.MaxFlushWorkers < 1)
193 throw new ArgumentException(
"MaxFlushWorkers must be at least 1", nameof(_options));
198 var workers = _options.WorkerList;
199 IList<int>? routingTable =
null;
202 if (workers ==
null || workers.Count == 0)
214 var queues =
new List<StripedWorkerQueue<T>>();
216 if (workers !=
null && workers.Count > 0)
220 foreach (var workerUrl
in workers)
223 if (workerUrl ==
null)
229 var urlStr = workerUrl.ToString().TrimEnd(
'/');
230 var insertUrl =
new Uri($
"{urlStr}/insert/records");
235 _options.BatchSize));
239 routingTable = _kinetica.adminShowShards().rank;
242 foreach (var rank
in routingTable)
244 if (rank > queues.Count)
253 var urlStr = _kinetica.URL.ToString().TrimEnd(
'/');
254 var insertUrl =
new Uri($
"{urlStr}/insert/records");
259 _options.BatchSize));
262 return (queues.ToArray(), routingTable);
319 var available = _inFlightSemaphore.CurrentCount;
325 AvailablePermits = available,
326 InFlightBatches = inFlight,
328 UtilizationPercent = (double)inFlight / _options.MaxInFlightBatches * 100
334 #region Insert Methods 344 var (workerIndex, stripeHash) = ComputeRouting(record);
345 var queue = _workerQueues[workerIndex];
347 var batch = queue.Add(record, stripeHash, out
int stripeIndex);
351 EnqueueBatch(
new FlushJob(queue.Url, workerIndex, stripeIndex, batch));
361 public async ValueTask
InsertAsync(T record, CancellationToken cancellationToken =
default)
365 var (workerIndex, stripeHash) = ComputeRouting(record);
366 var queue = _workerQueues[workerIndex];
368 var batch = queue.Add(record, stripeHash, out
int stripeIndex);
372 await EnqueueBatchWithBackpressureAsync(
373 new FlushJob(queue.Url, workerIndex, stripeIndex, batch, usedBackpressure:
true),
387 var count = records.Count;
393 var groups =
new Dictionary<int, List<T>>(_numWorkers * _options.NumStripes);
396 var routingResults =
new (
int workerIndex,
int stripeIndex)[count];
398 Parallel.For(0, count, i =>
400 var record = records[i];
401 var (workerIndex, stripeHash) = ComputeRouting(record);
402 var stripeIndex = (int)(stripeHash & (_workerQueues[workerIndex].NumStripes - 1));
403 routingResults[i] = (workerIndex, stripeIndex);
407 for (
int i = 0; i < count; i++)
409 var (workerIndex, stripeIndex) = routingResults[i];
410 var key = (workerIndex << 16) | stripeIndex;
412 if (!groups.TryGetValue(key, out var list))
414 list =
new List<T>(Math.Min(count, _options.BatchSize));
417 list.Add(records[i]);
421 var batches =
new List<FlushJob>(groups.Count);
423 foreach (var kvp
in groups)
425 var workerIndex = kvp.Key >> 16;
426 var stripeIndex = kvp.Key & 0xFFFF;
427 var queue = _workerQueues[workerIndex];
428 var groupRecords = kvp.Value;
430 var completeBatches = queue.AddRangeToStripe(groupRecords, stripeIndex);
432 foreach (var batch
in completeBatches)
434 batches.Add(
new FlushJob(queue.Url, workerIndex, stripeIndex, batch));
439 foreach (var batch
in batches)
450 public async ValueTask
InsertBatchAsync(IReadOnlyList<T> records, CancellationToken cancellationToken =
default)
454 var count = records.Count;
459 var groups =
new Dictionary<int, List<T>>(_numWorkers * _options.NumStripes);
460 var routingResults =
new (
int workerIndex,
int stripeIndex)[count];
465 Parallel.For(0, count, i =>
467 var record = records[i];
468 var (workerIndex, stripeHash) = ComputeRouting(record);
469 var stripeIndex = (int)(stripeHash & (_workerQueues[workerIndex].NumStripes - 1));
470 routingResults[i] = (workerIndex, stripeIndex);
472 }, cancellationToken);
475 for (
int i = 0; i < count; i++)
477 var (workerIndex, stripeIndex) = routingResults[i];
478 var key = (workerIndex << 16) | stripeIndex;
480 if (!groups.TryGetValue(key, out var list))
482 list =
new List<T>(Math.Min(count, _options.BatchSize));
485 list.Add(records[i]);
489 var batches =
new List<FlushJob>(groups.Count);
491 foreach (var kvp
in groups)
493 var workerIndex = kvp.Key >> 16;
494 var stripeIndex = kvp.Key & 0xFFFF;
495 var queue = _workerQueues[workerIndex];
496 var groupRecords = kvp.Value;
498 var completeBatches = queue.AddRangeToStripe(groupRecords, stripeIndex);
500 foreach (var batch
in completeBatches)
502 batches.Add(
new FlushJob(queue.Url, workerIndex, stripeIndex, batch, usedBackpressure:
true));
507 foreach (var batch
in batches)
509 await EnqueueBatchWithBackpressureAsync(batch, cancellationToken);
517 private (
int workerIndex,
long stripeHash) ComputeRouting(T record)
522 if (_routingTable ==
null)
526 stripeHash = ComputeStripeHash(record);
528 else if (_shardKeyBuilder ==
null)
531 workerIndex = _random.Value!.Next(_numWorkers);
532 stripeHash = ComputeStripeHash(record);
537 var shardKeyValues = record.GetShardKeyValues();
538 var shardKey = _shardKeyBuilder.Build(shardKeyValues);
540 if (shardKey !=
null && shardKey.IsValid)
542 workerIndex = shardKey.Route(_routingTable);
543 stripeHash = shardKey.HashCode();
547 workerIndex = _random.Value!.Next(_numWorkers);
548 stripeHash = ComputeStripeHash(record);
552 return (workerIndex, stripeHash);
555 private long ComputeStripeHash(T record)
558 var shardKeyValues = record.GetShardKeyValues();
559 if (shardKeyValues.Count > 0)
563 foreach (var (name, value) in shardKeyValues)
565 hash = hash * 31 + (name?.GetHashCode() ?? 0);
566 hash = hash * 31 + value.GetHashCode();
570 return (uint)(record?.GetHashCode() ?? 0);
589 public async Task
FlushAsync(CancellationToken cancellationToken =
default)
593 var batches =
new List<FlushJob>();
596 foreach (var queue
in _workerQueues)
598 var flushedBatches = queue.FlushAll();
599 foreach (var (stripeIndex, batch) in flushedBatches)
601 batches.Add(
new FlushJob(queue.Url, queue.WorkerIndex, stripeIndex, batch, usedBackpressure:
true));
606 foreach (var batch
in batches)
608 await EnqueueBatchWithBackpressureAsync(batch, cancellationToken);
612 private void EnqueueBatch(FlushJob job)
614 Interlocked.Increment(ref _pendingBatches);
617 if (!_flushChannel.Writer.TryWrite(job))
620 Interlocked.Decrement(ref _pendingBatches);
621 ProcessBatch(job).GetAwaiter().GetResult();
625 private async ValueTask EnqueueBatchWithBackpressureAsync(FlushJob job, CancellationToken cancellationToken)
629 await _inFlightSemaphore.WaitAsync(cancellationToken);
631 Interlocked.Increment(ref _pendingBatches);
634 if (!_flushChannel.Writer.TryWrite(job))
637 Interlocked.Decrement(ref _pendingBatches);
638 _inFlightSemaphore.Release();
639 await ProcessBatch(job);
651 private void TimedFlushCallback(
object? state)
654 if (!_timedFlushRunning || _isClosed || _isDisposed)
663 if (!_timedFlushRunning || _isClosed || _isDisposed)
667 }).ConfigureAwait(
false);
680 private async Task FlushWorkerLoop(CancellationToken cancellationToken)
684 await
foreach (var job
in _flushChannel.Reader.ReadAllAsync(cancellationToken))
688 await ProcessBatch(job);
692 Interlocked.Decrement(ref _pendingBatches);
696 if (job.UsedBackpressure)
698 _inFlightSemaphore.Release();
703 catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
709 private async Task ProcessBatch(FlushJob job)
711 if (job.Records.Count == 0)
714 var sw = Stopwatch.StartNew();
715 double encodeTimeMs = 0;
716 double networkTimeMs = 0;
717 int retryAttempt = 0;
718 Exception? lastException =
null;
719 bool haFailoverAttempted =
false;
722 var insertionAttemptTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
725 var currentWorkerUrl = job.WorkerUrl;
727 while (retryAttempt <= _options.MaxRetries)
730 if (haFailoverAttempted && _mutableWorkerQueues !=
null)
732 var effectiveQueues = GetEffectiveWorkerQueues();
733 if (job.WorkerIndex < effectiveQueues.Length)
735 currentWorkerUrl = effectiveQueues[job.WorkerIndex].Url;
742 var encodeSw = Stopwatch.StartNew();
743 var encodedRecords = EncodeRecords(job.Records);
745 encodeTimeMs = encodeSw.Elapsed.TotalMilliseconds;
748 var networkSw = Stopwatch.StartNew();
749 var rawResponse = await Task.Run(() =>
750 _kinetica.SubmitRequestRawBytes(currentWorkerUrl, requestBytes)).ConfigureAwait(
false);
752 if (rawResponse.status !=
"OK")
758 networkTimeMs = networkSw.Elapsed.TotalMilliseconds;
763 Interlocked.Increment(ref _totalBatchesSent);
768 WorkerUrl = currentWorkerUrl,
769 BatchSize = job.Records.Count,
773 EncodeTimeMs = encodeTimeMs,
774 NetworkTimeMs = networkTimeMs,
775 TotalTimeMs = sw.Elapsed.TotalMilliseconds,
776 WorkerIndex = job.WorkerIndex,
777 StripeIndex = job.StripeIndex,
778 RetryAttempt = retryAttempt
789 if (
Kinetica.IsConnectionError(ex) && !haFailoverAttempted && _dbHaRingSize > 1)
792 var shouldRetry = await HandleConnectionErrorAsync(ex, insertionAttemptTimestamp).ConfigureAwait(
false);
795 haFailoverAttempted =
true;
803 if (retryAttempt <= _options.MaxRetries)
806 await Task.Delay(TimeSpan.FromSeconds(retryAttempt)).ConfigureAwait(
false);
812 Interlocked.Increment(ref _totalBatchesFailed);
817 WorkerUrl = currentWorkerUrl,
818 RecordCount = job.Records.Count,
819 Message = lastException?.
Message ??
"Unknown error",
820 Exception = lastException,
821 RetryAttempt = retryAttempt - 1
827 WorkerUrl = currentWorkerUrl,
828 BatchSize = job.Records.Count,
830 ErrorMessage = lastException?.Message,
831 EncodeTimeMs = encodeTimeMs,
832 NetworkTimeMs = networkTimeMs,
833 TotalTimeMs = sw.Elapsed.TotalMilliseconds,
834 WorkerIndex = job.WorkerIndex,
835 StripeIndex = job.StripeIndex,
836 RetryAttempt = retryAttempt - 1
840 private List<byte[]> EncodeRecords(IReadOnlyList<T> records)
843 if (_isGenericRecord)
846 var genericRecords = (IReadOnlyList<
Records.GenericRecord>)(
object)records;
847 return _genericRecordEncoder!.EncodeManyAsList(genericRecords);
852 return _directEncoder!.EncodeManyAsList(records);
858 #region Error Handling 862 if (Interlocked.Increment(ref _errorCount) <= _options.MaxErrorQueueSize)
864 _errorQueue.Enqueue(error);
868 Interlocked.Decrement(ref _errorCount);
877 var errors =
new List<InsertError>();
878 while (_errorQueue.TryDequeue(out var error))
880 Interlocked.Decrement(ref _errorCount);
891 return _errorQueue.ToArray();
915 private Task<bool> ForceFailoverAsync(Uri oldUrl,
int oldClusterSwitchCount)
917 if (_kinetica.HAManager ==
null || _dbHaRingSize <= 1)
919 return Task.FromResult(
false);
923 var newUrl = _kinetica.ForceHAFailover(oldUrl, oldClusterSwitchCount);
927 bool isClusterHealthy =
true;
929 if (_multiHeadEnabled)
935 if (workers.Count == 0)
937 isClusterHealthy =
false;
942 isClusterHealthy =
false;
946 if (isClusterHealthy)
950 _currentHeadNodeUrl = newUrl;
951 _numClusterSwitches = _kinetica.NumClusterSwitches;
953 return Task.FromResult(
true);
957 return Task.FromResult(
false);
966 private async Task<bool> UpdateWorkerQueuesAsync(
int countClusterSwitches,
bool doReconstructWorkerUrls)
968 var reconstructWorkerUrls = doReconstructWorkerUrls && _multiHeadEnabled;
973 var shardInfo = _kinetica.adminShowShards();
974 var newShardVersion = shardInfo.version;
977 if (Interlocked.Read(ref _shardVersion) == newShardVersion)
979 var currNumClusterSwitches = _kinetica.NumClusterSwitches;
980 if (countClusterSwitches == currNumClusterSwitches)
982 if (reconstructWorkerUrls)
984 return await ReconstructWorkerUrlsAsync().ConfigureAwait(
false);
992 _numClusterSwitches = currNumClusterSwitches;
997 Interlocked.Exchange(ref _shardVersion, newShardVersion);
998 Interlocked.Exchange(ref _shardUpdateTime, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
1003 _mutableRoutingTable = shardInfo.rank;
1004 _currentHeadNodeUrl = _kinetica.URL;
1005 _numClusterSwitches = _kinetica.NumClusterSwitches;
1009 if (reconstructWorkerUrls)
1011 await ReconstructWorkerUrlsAsync().ConfigureAwait(
false);
1016 catch (Exception ex) when (
Kinetica.IsConnectionError(ex))
1026 private Task<bool> ReconstructWorkerUrlsAsync()
1031 var newWorkerList =
new WorkerList(_kinetica);
1033 if (newWorkerList.Count == 0)
1035 return Task.FromResult(
false);
1039 var newQueues =
new List<StripedWorkerQueue<T>>();
1040 int workerIndex = 0;
1041 foreach (var workerUrl
in newWorkerList)
1044 if (workerUrl ==
null)
1050 var urlStr = workerUrl.ToString().TrimEnd(
'/');
1051 var insertUrl =
new Uri($
"{urlStr}/insert/records");
1055 _options.NumStripes,
1056 _options.BatchSize));
1062 _mutableWorkerQueues = newQueues.ToArray();
1065 return Task.FromResult(
true);
1069 return Task.FromResult(
false);
1078 return _mutableWorkerQueues ?? _workerQueues;
1084 private IList<int>? GetEffectiveRoutingTable()
1086 return _mutableRoutingTable ?? _routingTable;
1096 private async Task<bool> HandleConnectionErrorAsync(Exception ex,
long insertionAttemptTimestamp)
1098 if (!
Kinetica.IsConnectionError(ex))
1103 var currUrl = _currentHeadNodeUrl;
1104 var currentCountClusterSwitches = _numClusterSwitches;
1106 bool didFailoverSucceed =
false;
1108 if (currUrl !=
null && _dbHaRingSize > 1)
1110 didFailoverSucceed = await ForceFailoverAsync(currUrl, currentCountClusterSwitches).ConfigureAwait(
false);
1114 var updatedWorkerQueues = await UpdateWorkerQueuesAsync(currentCountClusterSwitches,
true).ConfigureAwait(
false);
1116 var shardUpdateTime = Interlocked.Read(ref _shardUpdateTime);
1117 var retry = didFailoverSucceed || updatedWorkerQueues || insertionAttemptTimestamp < shardUpdateTime;
1139 public async Task
CloseAsync(CancellationToken cancellationToken =
default)
1147 _timedFlushRunning =
false;
1148 _flushTimer?.Dispose();
1155 _flushChannel.Writer.Complete();
1163 await Task.WhenAll(_flushWorkers).WaitAsync(cancellationToken);
1165 catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
1172 await Task.WhenAll(_flushWorkers).WaitAsync(TimeSpan.FromSeconds(1));
1182 var remainingBatches = Interlocked.Read(ref _pendingBatches);
1183 if (remainingBatches != 0)
1186 throw new InvalidOperationException(
1187 $
"BulkInserter.CloseAsync completed but {remainingBatches} batches are still pending. " +
1188 "This indicates a bug in the batch processing logic.");
1196 private void ThrowIfDisposed()
1235 _inFlightSemaphore.Dispose();
1236 _flushTimer?.Dispose();
1243 #region Nested Types 1245 private readonly
struct FlushJob
1247 public Uri WorkerUrl {
get; }
1248 public int WorkerIndex {
get; }
1249 public int StripeIndex {
get; }
1250 public IReadOnlyList<T>
Records {
get; }
1251 public bool UsedBackpressure {
get; }
1253 public FlushJob(Uri workerUrl,
int workerIndex,
int stripeIndex, IReadOnlyList<T> records,
bool usedBackpressure =
false)
1255 WorkerUrl = workerUrl;
1256 WorkerIndex = workerIndex;
1257 StripeIndex = stripeIndex;
1259 UsedBackpressure = usedBackpressure;
A list of worker URLs to use for multi-head ingest.
int AvailablePermits
Number of available permits in the semaphore.
string Message
Error message.
long TotalBatchesSent
Gets the total number of batches sent.
async Task CloseAsync(CancellationToken cancellationToken=default)
Flushes all records and waits for all pending batches to complete.
void InsertBatch(IReadOnlyList< T > records)
Inserts multiple records with parallel routing computation.
IReadOnlyList< InsertError > PeekErrors()
Peeks at errors without removing them.
double UtilizationPercent
Utilization percentage (in-flight / max * 100).
void Close()
Flushes all records and waits for all pending batches to complete.
async Task FlushAsync(CancellationToken cancellationToken=default)
Flushes all queued records to Kinetica asynchronously.
int count_inserted
The number of records inserted.
async ValueTask DisposeAsync()
Disposes the BulkInserter asynchronously.
int ErrorCount
Gets the number of errors in the error queue.
int NumWorkers
Gets the number of workers.
A worker queue with multiple stripes to reduce lock contention.
List< InsertError > DrainErrors()
Drains and returns all errors from the error queue.
long PendingBatches
Number of batches pending (queued + in-flight).
int InFlightBatches
Number of batches currently in-flight.
High-performance Avro encoder for GenericRecord instances.
A set of results returned by Kinetica.insertRecords.
int MaxInFlightBatches
Maximum number of in-flight batches allowed.
long CountUpdated
Gets the total count of records updated.
High-performance bulk inserter for Kinetica with support for multi-head ingest,
BackpressureMetrics GetBackpressureMetrics()
Gets backpressure metrics.
void Dispose()
Disposes the BulkInserter, closing it if not already closed.
Direct binary encoder for RawInsertRecordsRequest that bypasses the Apache Avro library.
static DirectAvroEncoder< T > GetOrCreate(KineticaType ktype)
Gets or creates a DirectAvroEncoder for the specified type and KineticaType.
int NumClusterSwitches
Gets the number of times the cluster has been switched due to failover.
void Flush()
Flushes all queued records to Kinetica.
Error information for failed insertions.
async ValueTask InsertBatchAsync(IReadOnlyList< T > records, CancellationToken cancellationToken=default)
Inserts multiple records with async backpressure control.
async ValueTask InsertAsync(T record, CancellationToken cancellationToken=default)
Inserts a single record with async backpressure control.
long CountInserted
Gets the total count of records inserted.
int count_updated
The number of records updated.
Backpressure metrics for monitoring the BulkInserter.
int HARingSize
Gets the HA ring size (number of clusters).
BulkInserter(Kinetica kinetica, string tableName, KineticaType ktype, BulkInserterOptions? options=null)
Creates a new BulkInserter for the specified table.
void Insert(T record)
Inserts a single record.
Ultra-high-performance Avro encoder that writes directly to binary format without using GenericRecord...
long TotalBatchesFailed
Gets the total number of batches that failed.
Failover to clusters in a random order (default)
static byte [] Encode(string tableName, IReadOnlyList< byte[]> records, IDictionary< string, string >? options)
Encodes a RawInsertRecordsRequest directly to Avro binary format.
long PendingBatches
Gets the number of batches currently pending (queued or in-flight).
bool IsTimedFlushRunning
Gets whether timed flush is currently running.
string TableName
Gets the table name.
static GenericRecordEncoder GetOrCreate(KineticaType ktype)
Gets or creates a GenericRecordEncoder for the specified KineticaType.
Result of a batch insertion operation.
Configuration options for the BulkInserter<T>.