2 using System.Collections.Generic;
3 using System.Diagnostics;
4 using System.Threading;
5 using System.Threading.Tasks;
19 private const int ProgressBatch = 10;
21 public static async Task
Main(
string[] args)
23 Console.WriteLine(
"======================================");
24 Console.WriteLine(
"= Kinetica Load Generator for C# =");
25 Console.WriteLine(
"======================================\n");
33 if (config.BetterSchema)
35 await RunLoadGenerator<IngestRecord2>(config);
39 await RunLoadGenerator<IngestRecord>(config);
44 Console.WriteLine($
"\nERROR: {ex.Message}");
45 Console.WriteLine(ex.StackTrace);
53 Console.WriteLine($
"Connecting to Kinetica at {config.Url}...");
57 Password = config.Password,
62 Console.WriteLine(
"Connected successfully.\n");
65 await EnsureTableExists<T>(
kinetica, config);
68 Console.WriteLine(
"Creating BulkInserter...");
75 MaxFlushWorkers = config.NumWorkers,
76 MaxInFlightBatches = config.MaxInFlightBatches,
81 Console.WriteLine($
"BulkInserter created. Multi-head: {inserter.NumWorkers > 1}, Workers: {inserter.NumWorkers}\n");
84 var globalCounter =
new Counter();
85 var startTime = Stopwatch.StartNew();
86 var startTimestamp =
DateTime.UtcNow;
89 using var progressCts =
new CancellationTokenSource();
90 Task? progressTask =
null;
92 if (config.ProgressIntervalMs > 0)
94 progressTask = StartProgressReporter(globalCounter, startTime, config.ProgressIntervalMs, config.TotalRecords, progressCts.Token);
98 var basePerThread = config.TotalRecords / config.NumThreads;
99 var remainder = config.TotalRecords % config.NumThreads;
101 Console.WriteLine($
"Starting {config.NumThreads} producer threads...\n");
104 using var barrier =
new Barrier(config.NumThreads);
107 var producerTasks =
new List<Task>();
108 for (
int t = 0; t < config.NumThreads; t++)
111 var threadRecords = basePerThread + (threadId < remainder ? 1 : 0);
113 var task = Task.Run(() => ProducerThread<T>(
114 inserter, config, threadId, threadRecords, barrier, globalCounter));
115 producerTasks.Add(task);
119 await Task.WhenAll(producerTasks);
121 Console.WriteLine(
"\nAll producer threads completed. Closing BulkInserter (flush + wait)...");
124 await inserter.CloseAsync();
127 progressCts.Cancel();
128 if (progressTask !=
null)
130 try { await progressTask; }
catch (OperationCanceledException) { }
136 var elapsedMs = startTime.Elapsed.TotalMilliseconds;
137 var elapsedSec = startTime.Elapsed.TotalSeconds;
138 var finalTotal = inserter.CountInserted;
139 var avgRate = finalTotal / elapsedSec;
142 Console.WriteLine(
"\n======================================");
143 Console.WriteLine(
"= RESULTS =");
144 Console.WriteLine(
"======================================");
145 Console.WriteLine($
" Start Time: {startTimestamp:yyyy-MM-dd HH:mm:ss}");
146 Console.WriteLine($
" End Time: {endTimestamp:yyyy-MM-dd HH:mm:ss}");
147 Console.WriteLine($
" Elapsed: {elapsedSec:F2} seconds");
148 Console.WriteLine($
" Records: {finalTotal:N0}");
149 Console.WriteLine($
" Throughput: {avgRate:N2} records/second");
150 Console.WriteLine($
" Updated: {inserter.CountUpdated:N0}");
151 Console.WriteLine($
" Errors: {inserter.ErrorCount}");
152 Console.WriteLine(
"======================================\n");
155 if (config.CsvOutput)
157 var hostname = Environment.MachineName;
158 var csvLine = $
"{startTimestamp:yyyy-MM-dd HH:mm:ss}," +
161 $
"{config.TableName}," +
163 $
"{config.TotalRecords}," +
164 $
"{config.MessageSize}," +
165 $
"{config.NumThreads}," +
166 $
"{config.BatchSize}," +
167 $
"{elapsedSec:F2}," +
170 Console.WriteLine(
"CSV Output:");
171 Console.WriteLine(
"start_ts,hostname,url,table,api,total_records,message_size,num_threads,batch_size,elapsed_sec,avg_rate");
172 Console.WriteLine(csvLine);
176 var errors = inserter.DrainErrors();
177 if (errors.Count > 0)
179 Console.WriteLine($
"\nEncountered {errors.Count} errors:");
180 foreach (var error
in errors.Take(10))
182 Console.WriteLine($
" - {error.Message} (records: {error.RecordCount})");
184 if (errors.Count > 10)
186 Console.WriteLine($
" ... and {errors.Count - 10} more errors");
191 private static void ProducerThread<T>(
193 LoadGeneratorConfig config,
200 var rng =
new Random(threadId * 12345 + Environment.TickCount);
201 var payloadChars =
new char[config.MessageSize];
202 const string chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
203 for (
int i = 0; i < config.MessageSize; i++)
205 payloadChars[i] = chars[rng.Next(chars.Length)];
207 var payload =
new string(payloadChars);
210 long lastReported = 0;
213 barrier.SignalAndWait();
215 for (
long j = 0; j < threadRecords; j++)
217 var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
218 var nano = Stopwatch.GetTimestamp() * 1_000_000_000L / Stopwatch.Frequency;
221 if (config.PayloadRandom)
223 for (
int i = 0; i < config.MessageSize; i++)
225 payloadChars[i] = chars[rng.Next(chars.Length)];
227 payload =
new string(payloadChars);
232 if (typeof(T) == typeof(IngestRecord))
234 var r =
new IngestRecord
236 TimestampMillis = now,
237 Source = config.MessageSize,
240 record = (T)(
object)r;
244 var r =
new IngestRecord2
246 TimestampMillis = now,
247 Source = config.MessageSize,
249 LocalCount = localCount,
253 record = (T)(
object)r;
258 inserter.Insert(record);
262 Console.WriteLine($
"Thread {threadId} insert error: {ex.Message}");
268 if (localCount - lastReported >= ProgressBatch)
270 globalCounter.Add(localCount - lastReported);
271 lastReported = localCount;
275 if (config.LogEveryN > 0 && localCount % config.LogEveryN == 0)
277 Console.WriteLine($
"[Thread {threadId}] Inserted {localCount:N0} records");
282 var leftover = localCount - lastReported;
285 globalCounter.Add(leftover);
289 private static async Task StartProgressReporter(
290 Counter globalCounter,
294 CancellationToken cancellationToken)
300 while (!cancellationToken.IsCancellationRequested)
302 await Task.Delay(intervalMs, cancellationToken);
304 var total = globalCounter.Value;
305 var delta = total - lastCount;
308 var rate = delta / (intervalMs / 1000.0);
309 var elapsedSec = startTime.Elapsed.TotalSeconds;
310 var overallRate = total / Math.Max(elapsedSec, 0.001);
311 var percentComplete = (double)total / totalRecords * 100;
313 Console.WriteLine($
"[PROGRESS] {DateTime.UtcNow:HH:mm:ss} elapsed={elapsedSec:F1}s total={total:N0} " +
314 $
"rate={rate:N0} rec/s overall={overallRate:N0} rec/s ({percentComplete:F1}%)");
317 catch (OperationCanceledException)
325 Console.WriteLine($
"Checking if table '{config.TableName}' exists...");
328 var hasTableResponse =
kinetica.hasTable(config.TableName);
329 var tableExists = hasTableResponse.table_exists;
333 Console.WriteLine($
"Table '{config.TableName}' exists.");
335 if (config.TruncateTable)
337 Console.WriteLine($
"Truncating table '{config.TableName}'...");
338 kinetica.clearTable(config.TableName,
"",
new Dictionary<string, string>
340 [ClearTableRequest.Options.NO_ERROR_IF_NOT_EXISTS] = ClearTableRequest.Options.TRUE
342 Console.WriteLine(
"Table truncated.");
347 Console.WriteLine($
"Table '{config.TableName}' does not exist. Creating...");
350 var tableParts = config.TableName.Split(
'.');
351 if (tableParts.Length > 1)
353 var schemaName = tableParts[0];
356 kinetica.createSchema(schemaName,
new Dictionary<string, string>
360 Console.WriteLine($
"Schema '{schemaName}' created or already exists.");
364 Console.WriteLine($
"Warning: Could not create schema '{schemaName}': {ex.Message}");
369 var typeProperties = GetTypeProperties<T>();
372 Console.WriteLine($
"Type created with ID: {typeId}");
375 kinetica.createTable(config.TableName, typeId,
new Dictionary<string, string>());
376 Console.WriteLine($
"Table '{config.TableName}' created.\n");
379 return Task.CompletedTask;
382 private static Dictionary<string, IList<string>> GetTypeProperties<T>()
384 if (typeof(T) == typeof(IngestRecord))
386 return IngestRecord.GetTypeProperties();
388 else if (typeof(T) == typeof(IngestRecord2))
390 return IngestRecord2.GetTypeProperties();
394 return new Dictionary<string, IList<string>>();
401 private class Counter
405 public long Value => Interlocked.Read(ref _value);
407 public void Add(
long amount) => Interlocked.Add(ref _value, amount);
Configuration for the load generator, matching Rust load_generator options.
int BatchSize
Number of records per batch before triggering a flush.
static LoadGeneratorConfig Parse(string[] args)
Parses command-line arguments and merges with environment variables.
const string NO_ERROR_IF_EXISTS
If TRUE, prevents an error from occurring if the schema already exists.
string? Username
Optional: User Name for Kinetica security
High-performance bulk inserter for Kinetica with support for multi-head ingest,
static async Task Main(string[] args)
High-performance load generator for Kinetica bulk insert benchmarking.
static KineticaType fromClass(Type recordClass, IDictionary< string, IList< string >> properties=null)
Create a KineticaType object from properties of a record class and Kinetica column properties.
string create(Kinetica kinetica)
Given a handle to the server, creates a type in the database based on this data type.
Failover to clusters in a random order (default)
A set of parameters for Kinetica.createSchema.
DateTime in YYYY-MM-DD HH:MM:SS.mmm format
A set of string constants for the parameter options.
Configuration options for the BulkInserter<T>.
API to talk to Kinetica Database