29 using System.Collections.Concurrent;
30 using System.Collections.Generic;
31 using System.Diagnostics;
33 using System.Threading;
34 using System.Threading.Tasks;
57 BatchSize = GetEnvInt(
"BATCH_SIZE", 10_000),
63 private static int GetEnvInt(
string name,
int defaultValue)
65 var value = Environment.GetEnvironmentVariable(name);
66 return int.TryParse(value, out var result) ? result : defaultValue;
93 #region Producer Statistics 97 private long _recordsGenerated;
98 private long _recordsQueued;
99 private long _batchesSent;
105 public void AddGenerated(
int count) => Interlocked.Add(ref _recordsGenerated, count);
106 public void AddQueued(
int count) => Interlocked.Add(ref _recordsQueued, count);
117 private long _prevInserted;
118 private Stopwatch _prevTimestamp;
122 _inserter = inserter;
123 _prevTimestamp = Stopwatch.StartNew();
126 public (
long inserted,
long updated,
long pending,
double utilization,
int errors,
double rate) GetSnapshot()
128 var
inserted = _inserter.CountInserted;
129 var updated = _inserter.CountUpdated;
130 var bp = _inserter.GetBackpressureMetrics();
131 var errors = _inserter.ErrorCount;
133 var elapsed = _prevTimestamp.Elapsed.TotalSeconds;
134 _prevTimestamp.Restart();
136 var rate = elapsed > 0 ? (
inserted - _prevInserted) / elapsed : 0;
139 return (
inserted, updated, bp.PendingBatches, bp.UtilizationPercent, errors, rate);
145 #region Full Integration Example 149 private static readonly
string[] Locations = {
150 "warehouse-a",
"warehouse-b",
"factory-1",
"factory-2",
151 "office-hq",
"office-branch",
"datacenter-1",
"datacenter-2" 158 Console.WriteLine(
"+==============================================================================+");
159 Console.WriteLine(
"| BULKINSERTER FULL INTEGRATION EXAMPLE |");
160 Console.WriteLine(
"+==============================================================================+");
161 Console.WriteLine(
"| |");
162 Console.WriteLine(
"| This example demonstrates: |");
163 Console.WriteLine(
"| * Multiple concurrent producers inserting data |");
164 Console.WriteLine(
"| * Real-time monitoring of throughput and queue depth |");
165 Console.WriteLine(
"| * Backpressure control to bound memory usage |");
166 Console.WriteLine(
"| * Graceful shutdown with Ctrl+C |");
167 Console.WriteLine(
"| |");
168 Console.WriteLine(
"+==============================================================================+");
172 var url = Environment.GetEnvironmentVariable(
"KINETICA_URL") ??
"http://localhost:9191";
173 var user = Environment.GetEnvironmentVariable(
"KINETICA_USER") ??
"admin";
174 var password = Environment.GetEnvironmentVariable(
"KINETICA_PASSWORD") ??
"secret";
176 Console.WriteLine(
"Configuration:");
177 Console.WriteLine($
" Kinetica URL: {url}");
178 Console.WriteLine($
" Producers: {config.NumProducers}");
179 Console.WriteLine($
" Records per producer: {config.RecordsPerProducer:N0}");
180 Console.WriteLine($
" Total records: {config.NumProducers * config.RecordsPerProducer:N0}");
181 Console.WriteLine($
" Batch size: {config.BatchSize:N0}");
182 Console.WriteLine($
" Max in-flight batches: {config.MaxInFlight}");
193 var schemaName =
"test_schema";
194 var tableName = $
"{schemaName}.integration_test";
197 try {
kinetica.executeSql($
"CREATE SCHEMA IF NOT EXISTS {schemaName}"); }
catch { }
200 try {
kinetica.executeSql($
"DROP TABLE IF EXISTS {tableName}"); }
catch { }
204 CREATE TABLE {tableName} ( 205 sensor_id INT NOT NULL, 206 location VARCHAR(64), 210 SHARD KEY (sensor_id, location) 214 Console.WriteLine($
"+ Table created: {tableName}");
217 var showTableResponse =
kinetica.showTable(tableName,
null);
218 var typeId = showTableResponse.type_ids[0];
225 MaxInFlightBatches = config.MaxInFlight,
226 FlushIntervalSeconds = 30,
230 kinetica, tableName, ktype, options);
232 Console.WriteLine($
"+ BulkInserter created");
233 Console.WriteLine($
" Workers: {inserter.NumWorkers}");
237 var cts =
new CancellationTokenSource();
240 Console.CancelKeyPress += (s, e) =>
243 Console.WriteLine(
"\n\n! Shutdown signal received (Ctrl+C)");
250 var startTime = Stopwatch.StartNew();
252 var monitorTask = Task.Run(async () =>
254 while (running && !cts.Token.IsCancellationRequested)
256 var (inserted, updated, pending, util, errors, rate) = monitor.GetSnapshot();
258 Console.Write($
"\r[{startTime.Elapsed.TotalSeconds,6:F1}s] " +
259 $
"Inserted: {FormatNumber(inserted),10} | " +
260 $
"Rate: {rate,10:F0}/s | " +
261 $
"Queue: {pending,3} ({util,5:F1}%) | " +
262 $
"Errors: {errors} ");
266 await Task.Delay(config.MonitorIntervalMs, cts.Token);
268 catch (OperationCanceledException)
276 Console.WriteLine($
"Starting {config.NumProducers} producers...");
280 for (
int i = 0; i < config.NumProducers; i++)
283 var producerTasks =
new Task<int>[config.NumProducers];
284 for (
int i = 0; i < config.NumProducers; i++)
287 var stats = producerStats[i];
288 var recordCount = config.RecordsPerProducer;
290 producerTasks[i] = Task.Run(() =>
291 RunProducer(producerId, inserter, recordCount, cts.Token, stats));
295 var producerResults = await Task.WhenAll(producerTasks);
296 var totalProduced = producerResults.Sum();
298 var produceDuration = startTime.Elapsed;
301 Console.WriteLine(
"\n\nProducers complete. Flushing remaining batches...");
305 try { await monitorTask; }
catch { }
307 var flushStart = Stopwatch.StartNew();
308 await inserter.CloseAsync();
309 var flushDuration = flushStart.Elapsed;
310 var totalDuration = startTime.Elapsed;
314 Console.WriteLine(
"+==============================================================================+");
315 Console.WriteLine(
"| FINAL RESULTS |");
316 Console.WriteLine(
"+==============================================================================+");
318 var inserted = inserter.CountInserted;
319 var updated = inserter.CountUpdated;
320 var totalDb = inserted + updated;
322 Console.WriteLine(
"| RECORDS |");
323 Console.WriteLine($
"| Generated: {FormatNumber(totalProduced),12} |");
324 Console.WriteLine($
"| Inserted: {FormatNumber(inserted),12} |");
325 Console.WriteLine($
"| Updated: {FormatNumber(updated),12} |");
326 Console.WriteLine($
"| Total in DB: {FormatNumber(totalDb),12} |");
328 Console.WriteLine(
"+------------------------------------------------------------------------------+");
329 Console.WriteLine(
"| TIMING |");
330 Console.WriteLine($
"| Produce time: {produceDuration.TotalSeconds,10:F2}s |");
331 Console.WriteLine($
"| Flush time: {flushDuration.TotalSeconds,10:F2}s |");
332 Console.WriteLine($
"| Total time: {totalDuration.TotalSeconds,10:F2}s |");
334 var throughput = totalDuration.TotalSeconds > 0 ? totalDb / totalDuration.TotalSeconds : 0;
335 Console.WriteLine(
"+------------------------------------------------------------------------------+");
336 Console.WriteLine(
"| THROUGHPUT |");
337 Console.WriteLine($
"| Overall: {throughput,10:F0} rec/s |");
339 var errors = inserter.DrainErrors();
340 Console.WriteLine(
"+------------------------------------------------------------------------------+");
341 Console.WriteLine(
"| ERRORS |");
342 Console.WriteLine($
"| Errors: {errors.Count,12} |");
344 if (errors.Count > 0)
346 Console.WriteLine(
"| |");
347 Console.WriteLine(
"| Recent errors: |");
348 foreach (var err
in errors.Take(5))
350 var msg = err.Message.Length > 40 ? err.Message[..37] +
"..." : err.Message;
351 Console.WriteLine($
"| - {msg} ({err.RecordCount} records)");
355 Console.WriteLine(
"+==============================================================================+");
358 var recordsLost = totalProduced - totalDb;
362 Console.WriteLine(
"! WARNING: Some records may not have been inserted!");
363 Console.WriteLine($
" Generated: {totalProduced:N0} Inserted: {totalDb:N0} Lost: {recordsLost:N0}");
368 Console.WriteLine($
"+ DATA INTEGRITY CHECK PASSED: All {totalProduced:N0} records inserted successfully.");
372 Console.WriteLine(
"\nCleaning up...");
373 try {
kinetica.executeSql($
"DROP TABLE IF EXISTS {tableName}"); }
catch { }
374 Console.WriteLine(
"+ Cleanup complete");
377 Console.WriteLine(
"Integration test finished!");
380 private static async Task<int> RunProducer(
383 int recordsToGenerate,
384 CancellationToken cancellationToken,
387 const int batchSize = 1000;
389 var sensorIdBase = producerId * 10000;
391 while (totalSent < recordsToGenerate && !cancellationToken.IsCancellationRequested)
393 var recordsInBatch = Math.Min(batchSize, recordsToGenerate - totalSent);
395 var batch =
new List<SensorReading>(recordsInBatch);
396 for (
int i = 0; i < recordsInBatch; i++)
399 batch.Add(
new SensorReading
401 sensor_id = sensorIdBase,
402 location = Locations[i % Locations.Length],
403 temperature = 20.0 + (sensorIdBase % 15),
404 humidity = 40.0 + (sensorIdBase % 30),
405 timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
409 stats.AddGenerated(recordsInBatch);
414 stats.AddQueued(recordsInBatch);
415 stats.IncrementBatches();
416 totalSent += recordsInBatch;
418 catch (OperationCanceledException)
424 Console.Error.WriteLine($
"\nProducer {producerId} error: {ex.Message}");
428 if (totalSent % 10000 == 0)
435 private static string FormatNumber(
long n)
438 return $
"{n / 1_000_000.0:F2}M";
440 return $
"{n / 1_000.0:F2}K";
IntegrationMonitor(BulkInserter< T > inserter)
int BatchSize
Number of records per batch before triggering a flush.
High-performance bulk inserter for Kinetica with support for multi-head ingest,
static KineticaType fromTypeID(Kinetica kinetica, string typeId)
Create a KineticaType object based on an existing type in the database.
ShardKeyValues GetShardKeyValues()
Returns shard key column names and their typed values.
static IntegrationConfig FromEnvironment()
async ValueTask InsertBatchAsync(IReadOnlyList< T > records, CancellationToken cancellationToken=default)
Inserts multiple records with async backpressure control.
Collection of shard key column names and values.
void AddQueued(int count)
void AddGenerated(int count)
static async Task RunAsync()
Configuration options for the BulkInserter<T>.
Sensor reading from IoT devices - test record type.