Kinetica   C#   API  Version 7.2.3.1
BulkInserterFullIntegration.cs
Go to the documentation of this file.
1 
28 using System;
29 using System.Collections.Concurrent;
30 using System.Collections.Generic;
31 using System.Diagnostics;
32 using System.Linq;
33 using System.Threading;
34 using System.Threading.Tasks;
35 using kinetica;
36 using kinetica.Records;
37 using kinetica.Utils;
38 
39 namespace Example
40 {
41  #region Configuration
42 
43  public class IntegrationConfig
44  {
45  public int RecordsPerProducer { get; set; } = 1_000_000;
46  public int NumProducers { get; set; } = 3;
47  public int BatchSize { get; set; } = 10_000;
48  public int MaxInFlight { get; set; } = 50;
49  public int MonitorIntervalMs { get; set; } = 500;
50 
52  {
53  return new IntegrationConfig
54  {
55  RecordsPerProducer = GetEnvInt("RECORDS_PER_PRODUCER", 1_000_000),
56  NumProducers = GetEnvInt("NUM_PRODUCERS", 3),
57  BatchSize = GetEnvInt("BATCH_SIZE", 10_000),
58  MaxInFlight = GetEnvInt("MAX_IN_FLIGHT", 50),
59  MonitorIntervalMs = GetEnvInt("MONITOR_INTERVAL_MS", 500),
60  };
61  }
62 
63  private static int GetEnvInt(string name, int defaultValue)
64  {
65  var value = Environment.GetEnvironmentVariable(name);
66  return int.TryParse(value, out var result) ? result : defaultValue;
67  }
68  }
69 
70  #endregion
71 
72  #region Sensor Record
73 
78  {
79  public int sensor_id { get; set; }
80  public string location { get; set; } = "";
81  public double temperature { get; set; }
82  public double humidity { get; set; }
83  public long timestamp { get; set; }
84 
86  {
87  return new ShardKeyValues();
88  }
89  }
90 
91  #endregion
92 
93  #region Producer Statistics
94 
95  public class ProducerStats
96  {
97  private long _recordsGenerated;
98  private long _recordsQueued;
99  private long _batchesSent;
100 
101  public long RecordsGenerated => Interlocked.Read(ref _recordsGenerated);
102  public long RecordsQueued => Interlocked.Read(ref _recordsQueued);
103  public long BatchesSent => Interlocked.Read(ref _batchesSent);
104 
105  public void AddGenerated(int count) => Interlocked.Add(ref _recordsGenerated, count);
106  public void AddQueued(int count) => Interlocked.Add(ref _recordsQueued, count);
107  public void IncrementBatches() => Interlocked.Increment(ref _batchesSent);
108  }
109 
110  #endregion
111 
112  #region Monitor
113 
114  public class IntegrationMonitor<T> where T : IShardKeyExtractor, new()
115  {
116  private readonly BulkInserter<T> _inserter;
117  private long _prevInserted;
118  private Stopwatch _prevTimestamp;
119 
121  {
122  _inserter = inserter;
123  _prevTimestamp = Stopwatch.StartNew();
124  }
125 
126  public (long inserted, long updated, long pending, double utilization, int errors, double rate) GetSnapshot()
127  {
128  var inserted = _inserter.CountInserted;
129  var updated = _inserter.CountUpdated;
130  var bp = _inserter.GetBackpressureMetrics();
131  var errors = _inserter.ErrorCount;
132 
133  var elapsed = _prevTimestamp.Elapsed.TotalSeconds;
134  _prevTimestamp.Restart();
135 
136  var rate = elapsed > 0 ? (inserted - _prevInserted) / elapsed : 0;
137  _prevInserted = inserted;
138 
139  return (inserted, updated, bp.PendingBatches, bp.UtilizationPercent, errors, rate);
140  }
141  }
142 
143  #endregion
144 
145  #region Full Integration Example
146 
148  {
149  private static readonly string[] Locations = {
150  "warehouse-a", "warehouse-b", "factory-1", "factory-2",
151  "office-hq", "office-branch", "datacenter-1", "datacenter-2"
152  };
153 
154  public static async Task RunAsync()
155  {
156  var config = IntegrationConfig.FromEnvironment();
157 
158  Console.WriteLine("+==============================================================================+");
159  Console.WriteLine("| BULKINSERTER FULL INTEGRATION EXAMPLE |");
160  Console.WriteLine("+==============================================================================+");
161  Console.WriteLine("| |");
162  Console.WriteLine("| This example demonstrates: |");
163  Console.WriteLine("| * Multiple concurrent producers inserting data |");
164  Console.WriteLine("| * Real-time monitoring of throughput and queue depth |");
165  Console.WriteLine("| * Backpressure control to bound memory usage |");
166  Console.WriteLine("| * Graceful shutdown with Ctrl+C |");
167  Console.WriteLine("| |");
168  Console.WriteLine("+==============================================================================+");
169  Console.WriteLine();
170 
171  // Get connection settings
172  var url = Environment.GetEnvironmentVariable("KINETICA_URL") ?? "http://localhost:9191";
173  var user = Environment.GetEnvironmentVariable("KINETICA_USER") ?? "admin";
174  var password = Environment.GetEnvironmentVariable("KINETICA_PASSWORD") ?? "secret";
175 
176  Console.WriteLine("Configuration:");
177  Console.WriteLine($" Kinetica URL: {url}");
178  Console.WriteLine($" Producers: {config.NumProducers}");
179  Console.WriteLine($" Records per producer: {config.RecordsPerProducer:N0}");
180  Console.WriteLine($" Total records: {config.NumProducers * config.RecordsPerProducer:N0}");
181  Console.WriteLine($" Batch size: {config.BatchSize:N0}");
182  Console.WriteLine($" Max in-flight batches: {config.MaxInFlight}");
183  Console.WriteLine();
184 
185  // Create Kinetica connection
186  var kinetica = new Kinetica(url, new Kinetica.Options
187  {
188  Username = user,
189  Password = password
190  });
191 
192  // Setup table
193  var schemaName = "test_schema";
194  var tableName = $"{schemaName}.integration_test";
195 
196  // Create schema
197  try { kinetica.executeSql($"CREATE SCHEMA IF NOT EXISTS {schemaName}"); } catch { }
198 
199  // Drop existing table
200  try { kinetica.executeSql($"DROP TABLE IF EXISTS {tableName}"); } catch { }
201 
202  // Create table
203  kinetica.executeSql($@"
204  CREATE TABLE {tableName} (
205  sensor_id INT NOT NULL,
206  location VARCHAR(64),
207  temperature DOUBLE,
208  humidity DOUBLE,
209  timestamp LONG,
210  SHARD KEY (sensor_id, location)
211  )
212  ");
213 
214  Console.WriteLine($"+ Table created: {tableName}");
215 
216  // Get KineticaType
217  var showTableResponse = kinetica.showTable(tableName, null);
218  var typeId = showTableResponse.type_ids[0];
219  var ktype = KineticaType.fromTypeID(kinetica, typeId);
220 
221  // Create BulkInserter
222  var options = new BulkInserterOptions
223  {
224  BatchSize = config.BatchSize,
225  MaxInFlightBatches = config.MaxInFlight,
226  FlushIntervalSeconds = 30,
227  };
228 
229  var inserter = new BulkInserter<SensorReading>(
230  kinetica, tableName, ktype, options);
231 
232  Console.WriteLine($"+ BulkInserter created");
233  Console.WriteLine($" Workers: {inserter.NumWorkers}");
234  Console.WriteLine();
235 
236  // Setup shutdown handling
237  var cts = new CancellationTokenSource();
238  var running = true;
239 
240  Console.CancelKeyPress += (s, e) =>
241  {
242  e.Cancel = true;
243  Console.WriteLine("\n\n! Shutdown signal received (Ctrl+C)");
244  running = false;
245  cts.Cancel();
246  };
247 
248  // Start monitoring task
249  var monitor = new IntegrationMonitor<SensorReading>(inserter);
250  var startTime = Stopwatch.StartNew();
251 
252  var monitorTask = Task.Run(async () =>
253  {
254  while (running && !cts.Token.IsCancellationRequested)
255  {
256  var (inserted, updated, pending, util, errors, rate) = monitor.GetSnapshot();
257 
258  Console.Write($"\r[{startTime.Elapsed.TotalSeconds,6:F1}s] " +
259  $"Inserted: {FormatNumber(inserted),10} | " +
260  $"Rate: {rate,10:F0}/s | " +
261  $"Queue: {pending,3} ({util,5:F1}%) | " +
262  $"Errors: {errors} ");
263 
264  try
265  {
266  await Task.Delay(config.MonitorIntervalMs, cts.Token);
267  }
268  catch (OperationCanceledException)
269  {
270  break;
271  }
272  }
273  });
274 
275  // Start producer tasks
276  Console.WriteLine($"Starting {config.NumProducers} producers...");
277  Console.WriteLine();
278 
279  var producerStats = new ProducerStats[config.NumProducers];
280  for (int i = 0; i < config.NumProducers; i++)
281  producerStats[i] = new ProducerStats();
282 
283  var producerTasks = new Task<int>[config.NumProducers];
284  for (int i = 0; i < config.NumProducers; i++)
285  {
286  var producerId = i;
287  var stats = producerStats[i];
288  var recordCount = config.RecordsPerProducer;
289 
290  producerTasks[i] = Task.Run(() =>
291  RunProducer(producerId, inserter, recordCount, cts.Token, stats));
292  }
293 
294  // Wait for producers to complete
295  var producerResults = await Task.WhenAll(producerTasks);
296  var totalProduced = producerResults.Sum();
297 
298  var produceDuration = startTime.Elapsed;
299 
300  // Flush remaining batches
301  Console.WriteLine("\n\nProducers complete. Flushing remaining batches...");
302 
303  running = false;
304  cts.Cancel();
305  try { await monitorTask; } catch { }
306 
307  var flushStart = Stopwatch.StartNew();
308  await inserter.CloseAsync();
309  var flushDuration = flushStart.Elapsed;
310  var totalDuration = startTime.Elapsed;
311 
312  // Print results
313  Console.WriteLine();
314  Console.WriteLine("+==============================================================================+");
315  Console.WriteLine("| FINAL RESULTS |");
316  Console.WriteLine("+==============================================================================+");
317 
318  var inserted = inserter.CountInserted;
319  var updated = inserter.CountUpdated;
320  var totalDb = inserted + updated;
321 
322  Console.WriteLine("| RECORDS |");
323  Console.WriteLine($"| Generated: {FormatNumber(totalProduced),12} |");
324  Console.WriteLine($"| Inserted: {FormatNumber(inserted),12} |");
325  Console.WriteLine($"| Updated: {FormatNumber(updated),12} |");
326  Console.WriteLine($"| Total in DB: {FormatNumber(totalDb),12} |");
327 
328  Console.WriteLine("+------------------------------------------------------------------------------+");
329  Console.WriteLine("| TIMING |");
330  Console.WriteLine($"| Produce time: {produceDuration.TotalSeconds,10:F2}s |");
331  Console.WriteLine($"| Flush time: {flushDuration.TotalSeconds,10:F2}s |");
332  Console.WriteLine($"| Total time: {totalDuration.TotalSeconds,10:F2}s |");
333 
334  var throughput = totalDuration.TotalSeconds > 0 ? totalDb / totalDuration.TotalSeconds : 0;
335  Console.WriteLine("+------------------------------------------------------------------------------+");
336  Console.WriteLine("| THROUGHPUT |");
337  Console.WriteLine($"| Overall: {throughput,10:F0} rec/s |");
338 
339  var errors = inserter.DrainErrors();
340  Console.WriteLine("+------------------------------------------------------------------------------+");
341  Console.WriteLine("| ERRORS |");
342  Console.WriteLine($"| Errors: {errors.Count,12} |");
343 
344  if (errors.Count > 0)
345  {
346  Console.WriteLine("| |");
347  Console.WriteLine("| Recent errors: |");
348  foreach (var err in errors.Take(5))
349  {
350  var msg = err.Message.Length > 40 ? err.Message[..37] + "..." : err.Message;
351  Console.WriteLine($"| - {msg} ({err.RecordCount} records)");
352  }
353  }
354 
355  Console.WriteLine("+==============================================================================+");
356 
357  // Data integrity check
358  var recordsLost = totalProduced - totalDb;
359  if (recordsLost > 0)
360  {
361  Console.WriteLine();
362  Console.WriteLine("! WARNING: Some records may not have been inserted!");
363  Console.WriteLine($" Generated: {totalProduced:N0} Inserted: {totalDb:N0} Lost: {recordsLost:N0}");
364  }
365  else
366  {
367  Console.WriteLine();
368  Console.WriteLine($"+ DATA INTEGRITY CHECK PASSED: All {totalProduced:N0} records inserted successfully.");
369  }
370 
371  // Cleanup
372  Console.WriteLine("\nCleaning up...");
373  try { kinetica.executeSql($"DROP TABLE IF EXISTS {tableName}"); } catch { }
374  Console.WriteLine("+ Cleanup complete");
375 
376  Console.WriteLine();
377  Console.WriteLine("Integration test finished!");
378  }
379 
380  private static async Task<int> RunProducer(
381  int producerId,
383  int recordsToGenerate,
384  CancellationToken cancellationToken,
385  ProducerStats stats)
386  {
387  const int batchSize = 1000;
388  var totalSent = 0;
389  var sensorIdBase = producerId * 10000;
390 
391  while (totalSent < recordsToGenerate && !cancellationToken.IsCancellationRequested)
392  {
393  var recordsInBatch = Math.Min(batchSize, recordsToGenerate - totalSent);
394 
395  var batch = new List<SensorReading>(recordsInBatch);
396  for (int i = 0; i < recordsInBatch; i++)
397  {
398  sensorIdBase++;
399  batch.Add(new SensorReading
400  {
401  sensor_id = sensorIdBase,
402  location = Locations[i % Locations.Length],
403  temperature = 20.0 + (sensorIdBase % 15),
404  humidity = 40.0 + (sensorIdBase % 30),
405  timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
406  });
407  }
408 
409  stats.AddGenerated(recordsInBatch);
410 
411  try
412  {
413  await inserter.InsertBatchAsync(batch, cancellationToken);
414  stats.AddQueued(recordsInBatch);
415  stats.IncrementBatches();
416  totalSent += recordsInBatch;
417  }
418  catch (OperationCanceledException)
419  {
420  break;
421  }
422  catch (Exception ex)
423  {
424  Console.Error.WriteLine($"\nProducer {producerId} error: {ex.Message}");
425  }
426 
427  // Yield periodically
428  if (totalSent % 10000 == 0)
429  await Task.Yield();
430  }
431 
432  return totalSent;
433  }
434 
435  private static string FormatNumber(long n)
436  {
437  if (n >= 1_000_000)
438  return $"{n / 1_000_000.0:F2}M";
439  if (n >= 1_000)
440  return $"{n / 1_000.0:F2}K";
441  return n.ToString();
442  }
443  }
444 
445  #endregion
446 }
IntegrationMonitor(BulkInserter< T > inserter)
int BatchSize
Number of records per batch before triggering a flush.
High-performance bulk inserter for Kinetica with support for multi-head ingest,
Definition: BulkInserter.cs:28
static KineticaType fromTypeID(Kinetica kinetica, string typeId)
Create a KineticaType object based on an existing type in the database.
ShardKeyValues GetShardKeyValues()
Returns shard key column names and their typed values.
static IntegrationConfig FromEnvironment()
async ValueTask InsertBatchAsync(IReadOnlyList< T > records, CancellationToken cancellationToken=default)
Inserts multiple records with async backpressure control.
Collection of shard key column names and values.
void AddQueued(int count)
void AddGenerated(int count)
Interface for extracting shard key values from a record.
Configuration options for the BulkInserter<T>.
Sensor reading from IoT devices - test record type.