Kinetica   C#   API  Version 7.2.3.1
Program.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 using System.Diagnostics;
4 using System.Threading;
5 using System.Threading.Tasks;
6 using kinetica;
7 using kinetica.Records;
8 
10 {
16  public class Program
17  {
18  // Progress tracking batch size to reduce atomic operation contention
19  private const int ProgressBatch = 10;
20 
21  public static async Task Main(string[] args)
22  {
23  Console.WriteLine("======================================");
24  Console.WriteLine("= Kinetica Load Generator for C# =");
25  Console.WriteLine("======================================\n");
26 
27  // Parse configuration
28  var config = LoadGeneratorConfig.Parse(args);
29  config.Print();
30 
31  try
32  {
33  if (config.BetterSchema)
34  {
35  await RunLoadGenerator<IngestRecord2>(config);
36  }
37  else
38  {
39  await RunLoadGenerator<IngestRecord>(config);
40  }
41  }
42  catch (Exception ex)
43  {
44  Console.WriteLine($"\nERROR: {ex.Message}");
45  Console.WriteLine(ex.StackTrace);
46  Environment.Exit(1);
47  }
48  }
49 
50  private static async Task RunLoadGenerator<T>(LoadGeneratorConfig config) where T : class, IShardKeyExtractor, new()
51  {
52  // Connect to Kinetica
53  Console.WriteLine($"Connecting to Kinetica at {config.Url}...");
54  var options = new kinetica.Kinetica.Options
55  {
56  Username = config.Username,
57  Password = config.Password,
58  UseSnappy = config.UseSnappy
59  };
60 
61  var kinetica = new kinetica.Kinetica(config.Url, options);
62  Console.WriteLine("Connected successfully.\n");
63 
64  // Ensure table exists
65  await EnsureTableExists<T>(kinetica, config);
66 
67  // Create BulkInserter
68  Console.WriteLine("Creating BulkInserter...");
69  var ktype = KineticaType.fromClass(typeof(T), GetTypeProperties<T>());
70  ktype.create(kinetica);
71 
72  var bulkInserterOptions = new BulkInserterOptions
73  {
74  BatchSize = config.BatchSize,
75  MaxFlushWorkers = config.NumWorkers,
76  MaxInFlightBatches = config.MaxInFlightBatches,
77  MaxRetries = 3
78  };
79 
80  using var inserter = new BulkInserter<T>(kinetica, config.TableName, ktype, bulkInserterOptions);
81  Console.WriteLine($"BulkInserter created. Multi-head: {inserter.NumWorkers > 1}, Workers: {inserter.NumWorkers}\n");
82 
83  // Set up progress tracking
84  var globalCounter = new Counter();
85  var startTime = Stopwatch.StartNew();
86  var startTimestamp = DateTime.UtcNow;
87 
88  // Start progress reporter task
89  using var progressCts = new CancellationTokenSource();
90  Task? progressTask = null;
91 
92  if (config.ProgressIntervalMs > 0)
93  {
94  progressTask = StartProgressReporter(globalCounter, startTime, config.ProgressIntervalMs, config.TotalRecords, progressCts.Token);
95  }
96 
97  // Calculate work distribution per thread
98  var basePerThread = config.TotalRecords / config.NumThreads;
99  var remainder = config.TotalRecords % config.NumThreads;
100 
101  Console.WriteLine($"Starting {config.NumThreads} producer threads...\n");
102 
103  // Use a barrier for synchronized start
104  using var barrier = new Barrier(config.NumThreads);
105 
106  // Launch producer threads
107  var producerTasks = new List<Task>();
108  for (int t = 0; t < config.NumThreads; t++)
109  {
110  var threadId = t;
111  var threadRecords = basePerThread + (threadId < remainder ? 1 : 0);
112 
113  var task = Task.Run(() => ProducerThread<T>(
114  inserter, config, threadId, threadRecords, barrier, globalCounter));
115  producerTasks.Add(task);
116  }
117 
118  // Wait for all producers to complete
119  await Task.WhenAll(producerTasks);
120 
121  Console.WriteLine("\nAll producer threads completed. Closing BulkInserter (flush + wait)...");
122 
123  // Close the inserter - this flushes all batches and waits for completion
124  await inserter.CloseAsync();
125 
126  // Stop progress reporter
127  progressCts.Cancel();
128  if (progressTask != null)
129  {
130  try { await progressTask; } catch (OperationCanceledException) { }
131  }
132 
133  // Final metrics
134  startTime.Stop();
135  var endTimestamp = DateTime.UtcNow;
136  var elapsedMs = startTime.Elapsed.TotalMilliseconds;
137  var elapsedSec = startTime.Elapsed.TotalSeconds;
138  var finalTotal = inserter.CountInserted;
139  var avgRate = finalTotal / elapsedSec;
140 
141  // Print results
142  Console.WriteLine("\n======================================");
143  Console.WriteLine("= RESULTS =");
144  Console.WriteLine("======================================");
145  Console.WriteLine($" Start Time: {startTimestamp:yyyy-MM-dd HH:mm:ss}");
146  Console.WriteLine($" End Time: {endTimestamp:yyyy-MM-dd HH:mm:ss}");
147  Console.WriteLine($" Elapsed: {elapsedSec:F2} seconds");
148  Console.WriteLine($" Records: {finalTotal:N0}");
149  Console.WriteLine($" Throughput: {avgRate:N2} records/second");
150  Console.WriteLine($" Updated: {inserter.CountUpdated:N0}");
151  Console.WriteLine($" Errors: {inserter.ErrorCount}");
152  Console.WriteLine("======================================\n");
153 
154  // CSV output
155  if (config.CsvOutput)
156  {
157  var hostname = Environment.MachineName;
158  var csvLine = $"{startTimestamp:yyyy-MM-dd HH:mm:ss}," +
159  $"{hostname}," +
160  $"{config.Url}," +
161  $"{config.TableName}," +
162  $"CSharpAPI," +
163  $"{config.TotalRecords}," +
164  $"{config.MessageSize}," +
165  $"{config.NumThreads}," +
166  $"{config.BatchSize}," +
167  $"{elapsedSec:F2}," +
168  $"{avgRate:F2}";
169 
170  Console.WriteLine("CSV Output:");
171  Console.WriteLine("start_ts,hostname,url,table,api,total_records,message_size,num_threads,batch_size,elapsed_sec,avg_rate");
172  Console.WriteLine(csvLine);
173  }
174 
175  // Check for errors
176  var errors = inserter.DrainErrors();
177  if (errors.Count > 0)
178  {
179  Console.WriteLine($"\nEncountered {errors.Count} errors:");
180  foreach (var error in errors.Take(10))
181  {
182  Console.WriteLine($" - {error.Message} (records: {error.RecordCount})");
183  }
184  if (errors.Count > 10)
185  {
186  Console.WriteLine($" ... and {errors.Count - 10} more errors");
187  }
188  }
189  }
190 
191  private static void ProducerThread<T>(
192  BulkInserter<T> inserter,
193  LoadGeneratorConfig config,
194  int threadId,
195  long threadRecords,
196  Barrier barrier,
197  Counter globalCounter) where T : class, IShardKeyExtractor, new()
198  {
199  // Pre-generate payload as string
200  var rng = new Random(threadId * 12345 + Environment.TickCount);
201  var payloadChars = new char[config.MessageSize];
202  const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
203  for (int i = 0; i < config.MessageSize; i++)
204  {
205  payloadChars[i] = chars[rng.Next(chars.Length)];
206  }
207  var payload = new string(payloadChars);
208 
209  long localCount = 0;
210  long lastReported = 0;
211 
212  // Synchronized start
213  barrier.SignalAndWait();
214 
215  for (long j = 0; j < threadRecords; j++)
216  {
217  var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
218  var nano = Stopwatch.GetTimestamp() * 1_000_000_000L / Stopwatch.Frequency;
219 
220  // Optionally randomize payload
221  if (config.PayloadRandom)
222  {
223  for (int i = 0; i < config.MessageSize; i++)
224  {
225  payloadChars[i] = chars[rng.Next(chars.Length)];
226  }
227  payload = new string(payloadChars);
228  }
229 
230  // Build record based on schema type
231  T record;
232  if (typeof(T) == typeof(IngestRecord))
233  {
234  var r = new IngestRecord
235  {
236  TimestampMillis = now,
237  Source = config.MessageSize,
238  Payload = payload
239  };
240  record = (T)(object)r;
241  }
242  else
243  {
244  var r = new IngestRecord2
245  {
246  TimestampMillis = now,
247  Source = config.MessageSize,
248  ThreadId = threadId,
249  LocalCount = localCount,
250  Nanosecond = nano,
251  Payload = payload
252  };
253  record = (T)(object)r;
254  }
255 
256  try
257  {
258  inserter.Insert(record);
259  }
260  catch (Exception ex)
261  {
262  Console.WriteLine($"Thread {threadId} insert error: {ex.Message}");
263  }
264 
265  localCount++;
266 
267  // Periodically update global counter in batches
268  if (localCount - lastReported >= ProgressBatch)
269  {
270  globalCounter.Add(localCount - lastReported);
271  lastReported = localCount;
272  }
273 
274  // Optional detailed logging
275  if (config.LogEveryN > 0 && localCount % config.LogEveryN == 0)
276  {
277  Console.WriteLine($"[Thread {threadId}] Inserted {localCount:N0} records");
278  }
279  }
280 
281  // Final progress update
282  var leftover = localCount - lastReported;
283  if (leftover > 0)
284  {
285  globalCounter.Add(leftover);
286  }
287  }
288 
289  private static async Task StartProgressReporter(
290  Counter globalCounter,
291  Stopwatch startTime,
292  int intervalMs,
293  long totalRecords,
294  CancellationToken cancellationToken)
295  {
296  long lastCount = 0;
297 
298  try
299  {
300  while (!cancellationToken.IsCancellationRequested)
301  {
302  await Task.Delay(intervalMs, cancellationToken);
303 
304  var total = globalCounter.Value;
305  var delta = total - lastCount;
306  lastCount = total;
307 
308  var rate = delta / (intervalMs / 1000.0);
309  var elapsedSec = startTime.Elapsed.TotalSeconds;
310  var overallRate = total / Math.Max(elapsedSec, 0.001);
311  var percentComplete = (double)total / totalRecords * 100;
312 
313  Console.WriteLine($"[PROGRESS] {DateTime.UtcNow:HH:mm:ss} elapsed={elapsedSec:F1}s total={total:N0} " +
314  $"rate={rate:N0} rec/s overall={overallRate:N0} rec/s ({percentComplete:F1}%)");
315  }
316  }
317  catch (OperationCanceledException)
318  {
319  // Expected when cancellation is requested
320  }
321  }
322 
323  private static Task EnsureTableExists<T>(kinetica.Kinetica kinetica, LoadGeneratorConfig config)
324  {
325  Console.WriteLine($"Checking if table '{config.TableName}' exists...");
326 
327  // Check if table exists
328  var hasTableResponse = kinetica.hasTable(config.TableName);
329  var tableExists = hasTableResponse.table_exists;
330 
331  if (tableExists)
332  {
333  Console.WriteLine($"Table '{config.TableName}' exists.");
334 
335  if (config.TruncateTable)
336  {
337  Console.WriteLine($"Truncating table '{config.TableName}'...");
338  kinetica.clearTable(config.TableName, "", new Dictionary<string, string>
339  {
340  [ClearTableRequest.Options.NO_ERROR_IF_NOT_EXISTS] = ClearTableRequest.Options.TRUE
341  });
342  Console.WriteLine("Table truncated.");
343  }
344  }
345  else
346  {
347  Console.WriteLine($"Table '{config.TableName}' does not exist. Creating...");
348 
349  // Check if schema needs to be created
350  var tableParts = config.TableName.Split('.');
351  if (tableParts.Length > 1)
352  {
353  var schemaName = tableParts[0];
354  try
355  {
356  kinetica.createSchema(schemaName, new Dictionary<string, string>
357  {
359  });
360  Console.WriteLine($"Schema '{schemaName}' created or already exists.");
361  }
362  catch (Exception ex)
363  {
364  Console.WriteLine($"Warning: Could not create schema '{schemaName}': {ex.Message}");
365  }
366  }
367 
368  // Create the type
369  var typeProperties = GetTypeProperties<T>();
370  var ktype = KineticaType.fromClass(typeof(T), typeProperties);
371  var typeId = ktype.create(kinetica);
372  Console.WriteLine($"Type created with ID: {typeId}");
373 
374  // Create the table
375  kinetica.createTable(config.TableName, typeId, new Dictionary<string, string>());
376  Console.WriteLine($"Table '{config.TableName}' created.\n");
377  }
378 
379  return Task.CompletedTask;
380  }
381 
382  private static Dictionary<string, IList<string>> GetTypeProperties<T>()
383  {
384  if (typeof(T) == typeof(IngestRecord))
385  {
386  return IngestRecord.GetTypeProperties();
387  }
388  else if (typeof(T) == typeof(IngestRecord2))
389  {
390  return IngestRecord2.GetTypeProperties();
391  }
392  else
393  {
394  return new Dictionary<string, IList<string>>();
395  }
396  }
397 
401  private class Counter
402  {
403  private long _value;
404 
405  public long Value => Interlocked.Read(ref _value);
406 
407  public void Add(long amount) => Interlocked.Add(ref _value, amount);
408  }
409  }
410 }
Configuration for the load generator, matching Rust load_generator options.
int BatchSize
Number of records per batch before triggering a flush.
static LoadGeneratorConfig Parse(string[] args)
Parses command-line arguments and merges with environment variables.
const string NO_ERROR_IF_EXISTS
If TRUE, prevents an error from occurring if the schema already exists.
Definition: CreateSchema.cs:38
string? Username
Optional: User Name for Kinetica security
Definition: Kinetica.cs:157
High-performance bulk inserter for Kinetica with support for multi-head ingest,
Definition: BulkInserter.cs:28
static async Task Main(string[] args)
Definition: Program.cs:21
High-performance load generator for Kinetica bulk insert benchmarking.
Definition: Program.cs:16
bool UseSnappy
Use Snappy
Definition: Kinetica.cs:177
Connection Options
Definition: Kinetica.cs:50
static KineticaType fromClass(Type recordClass, IDictionary< string, IList< string >> properties=null)
Create a KineticaType object from properties of a record class and Kinetica column properties.
string create(Kinetica kinetica)
Given a handle to the server, creates a type in the database based on this data type.
Failover to clusters in a random order (default)
Interface for extracting shard key values from a record.
A set of parameters for Kinetica.createSchema.
Definition: CreateSchema.cs:18
DateTime in YYYY-MM-DD HH:MM:SS.mmm format
A set of string constants for the parameter options.
Definition: CreateSchema.cs:23
Configuration options for the BulkInserter<T>.
API to talk to Kinetica Database
Definition: Kinetica.cs:40