15 public string Url {
get;
set; } =
"http://localhost:9191";
22 public string Username {
get;
set; } =
"admin";
29 public string Password {
get;
set; } =
"secret";
35 public string TableName {
get;
set; } =
"load_generator_test";
144 config.Url = GetEnvString(
"KINETICA_URL", config.Url);
145 config.Username = GetEnvString(
"KINETICA_USER", config.Username);
146 config.Password = GetEnvString(
"KINETICA_PASSWORD", config.Password);
147 config.TableName = GetEnvString(
"KINETICA_TABLE", config.TableName);
148 config.TotalRecords = GetEnvLong(
"TOTAL_RECORDS", config.TotalRecords);
149 config.BatchSize = GetEnvInt(
"KINETICA_BATCH_SIZE", config.BatchSize);
150 config.NumThreads = GetEnvInt(
"NUM_THREADS", config.NumThreads);
151 config.NumWorkers = GetEnvInt(
"NUM_WORKERS", config.NumWorkers);
152 config.MessageSize = GetEnvInt(
"MESSAGE_SIZE", config.MessageSize);
153 config.PayloadRandom = GetEnvBool(
"PAYLOAD_RANDOM", config.PayloadRandom);
154 config.BetterSchema = GetEnvBool(
"BETTER_SCHEMA", config.BetterSchema);
155 config.UseMultihead = GetEnvBool(
"KINETICA_MULTIHEAD", config.UseMultihead);
156 config.ProgressIntervalMs = GetEnvInt(
"PROGRESS_INTERVAL_MS", config.ProgressIntervalMs);
157 config.LogEveryN = GetEnvLong(
"LOG_EVERY_N", config.LogEveryN);
158 config.CsvOutput = GetEnvBool(
"CSV_OUTPUT", config.CsvOutput);
159 config.TruncateTable = GetEnvBool(
"TRUNCATE_TABLE", config.TruncateTable);
160 config.MaxInFlightBatches = GetEnvInt(
"MAX_IN_FLIGHT_BATCHES", config.MaxInFlightBatches);
161 config.UseSnappy = GetEnvBool(
"SNAPPY", config.UseSnappy);
176 for (
int i = 0; i < args.Length; i++)
178 var arg = args[i].ToLowerInvariant();
180 if (arg ==
"--url" && i + 1 < args.Length)
181 config.Url = args[++i];
182 else if (arg ==
"--user" && i + 1 < args.Length)
183 config.Username = args[++i];
184 else if (arg ==
"--password" && i + 1 < args.Length)
185 config.Password = args[++i];
186 else if (arg ==
"--table" && i + 1 < args.Length)
187 config.TableName = args[++i];
188 else if (arg ==
"--records" && i + 1 < args.Length)
189 config.TotalRecords =
long.Parse(args[++i]);
190 else if (arg ==
"--batch-size" && i + 1 < args.Length)
191 config.BatchSize =
int.Parse(args[++i]);
192 else if (arg ==
"--threads" && i + 1 < args.Length)
193 config.NumThreads =
int.Parse(args[++i]);
194 else if (arg ==
"--workers" && i + 1 < args.Length)
195 config.NumWorkers =
int.Parse(args[++i]);
196 else if (arg ==
"--message-size" && i + 1 < args.Length)
197 config.MessageSize =
int.Parse(args[++i]);
198 else if (arg ==
"--random-payload")
199 config.PayloadRandom =
true;
200 else if (arg ==
"--better-schema")
201 config.BetterSchema =
true;
202 else if (arg ==
"--no-multihead")
203 config.UseMultihead =
false;
204 else if (arg ==
"--progress-interval" && i + 1 < args.Length)
205 config.ProgressIntervalMs =
int.Parse(args[++i]);
206 else if (arg ==
"--no-csv")
207 config.CsvOutput =
false;
208 else if (arg ==
"--truncate")
209 config.TruncateTable =
true;
210 else if (arg ==
"--max-in-flight" && i + 1 < args.Length)
211 config.MaxInFlightBatches =
int.Parse(args[++i]);
212 else if (arg ==
"--snappy")
213 config.UseSnappy =
true;
214 else if (arg ==
"--help" || arg ==
"-h")
224 private static void PrintUsage()
227 Kinetica Load Generator - High-performance bulk insert benchmark tool 229 Usage: Kinetica.LoadGenerator [options] 232 --url <url> Kinetica server URL (default: http://localhost:9191) 233 --user <username> Username (default: admin) 234 --password <password> Password (default: secret) 235 --table <name> Target table name (default: load_generator_test) 236 --records <count> Total records to insert (default: 1,000,000) 237 --batch-size <size> Records per batch (default: 10,000) 238 --threads <count> Number of producer threads (default: 1) 239 --workers <count> Number of flush workers (default: 16) 240 --message-size <bytes> Payload size in bytes (default: 1024) 241 --random-payload Randomize payload for each record 242 --better-schema Use extended schema with thread tracking 243 --no-multihead Disable multi-head ingest 244 --progress-interval <ms> Progress report interval in ms (default: 1000) 245 --no-csv Disable CSV output 246 --truncate Truncate table before inserting 247 --max-in-flight <count> Max in-flight batches (default: 100) 248 --snappy Enable Snappy compression for HTTP requests 249 --help, -h Show this help message 251 Environment Variables: 252 KINETICA_URL, KINETICA_USER, KINETICA_PASSWORD, KINETICA_TABLE, 253 TOTAL_RECORDS, KINETICA_BATCH_SIZE, NUM_THREADS, NUM_WORKERS, 254 MESSAGE_SIZE, PAYLOAD_RANDOM, BETTER_SCHEMA, KINETICA_MULTIHEAD, 255 PROGRESS_INTERVAL_MS, LOG_EVERY_N, CSV_OUTPUT, TRUNCATE_TABLE, 256 MAX_IN_FLIGHT_BATCHES, SNAPPY 258 Command-line arguments take precedence over environment variables. 264 Console.WriteLine(
"=== Load Generator Configuration ===");
265 Console.WriteLine($
" URL: {Url}");
266 Console.WriteLine($
" User: {Username}");
267 Console.WriteLine($
" Table: {TableName}");
268 Console.WriteLine($
" Total Records: {TotalRecords:N0}");
269 Console.WriteLine($
" Batch Size: {BatchSize:N0}");
270 Console.WriteLine($
" Threads: {NumThreads}");
271 Console.WriteLine($
" Workers: {NumWorkers}");
272 Console.WriteLine($
" Message Size: {MessageSize} bytes");
273 Console.WriteLine($
" Random Payload: {PayloadRandom}");
274 Console.WriteLine($
" Better Schema: {BetterSchema}");
275 Console.WriteLine($
" Multi-head: {UseMultihead}");
276 Console.WriteLine($
" Max In-Flight: {MaxInFlightBatches}");
277 Console.WriteLine($
" Snappy: {UseSnappy}");
278 Console.WriteLine($
" Progress Interval:{ProgressIntervalMs}ms");
279 Console.WriteLine($
" CSV Output: {CsvOutput}");
280 Console.WriteLine($
" Truncate Table: {TruncateTable}");
281 Console.WriteLine(
"====================================\n");
284 private static string GetEnvString(
string name,
string defaultValue) =>
285 Environment.GetEnvironmentVariable(name) ?? defaultValue;
287 private static int GetEnvInt(
string name,
int defaultValue)
289 var value = Environment.GetEnvironmentVariable(name);
290 return int.TryParse(value, out var result) ? result : defaultValue;
293 private static long GetEnvLong(
string name,
long defaultValue)
295 var value = Environment.GetEnvironmentVariable(name);
296 return long.TryParse(value, out var result) ? result : defaultValue;
299 private static bool GetEnvBool(
string name,
bool defaultValue)
301 var value = Environment.GetEnvironmentVariable(name);
302 if (
string.IsNullOrEmpty(value))
return defaultValue;
303 return value.Equals(
"true", StringComparison.OrdinalIgnoreCase) ||
304 value.Equals(
"1", StringComparison.Ordinal) ||
305 value.Equals(
"yes", StringComparison.OrdinalIgnoreCase);
Configuration for the load generator, matching Rust load_generator options.
long LogEveryN
Log every N records (0 to disable detailed logging).
int BatchSize
Number of records per batch for BulkInserter.
bool UseSnappy
Enable Snappy compression for HTTP requests.
static LoadGeneratorConfig Parse(string[] args)
Parses command-line arguments and merges with environment variables.
long TotalRecords
Total number of records to insert.
int ProgressIntervalMs
Interval in milliseconds for progress reporting.
string Url
Kinetica server URL (e.g., http://localhost:9191).
string Password
Password for Kinetica authentication.
int NumThreads
Number of producer threads.
bool TruncateTable
Truncate the table before inserting if it exists.
bool BetterSchema
Use the extended schema (IngestRecord2) with thread tracking.
int MaxInFlightBatches
Maximum number of in-flight batches for backpressure control.
bool PayloadRandom
Whether to randomize payload for each record.
string Username
Username for Kinetica authentication.
string TableName
Target table name.
bool UseMultihead
Enable multi-head ingest for improved performance.
int MessageSize
Size of the payload field in bytes.
int NumWorkers
Number of flush workers for BulkInserter.
bool CsvOutput
Output results in CSV format for aggregation.
static LoadGeneratorConfig FromEnvironment()
Creates a configuration from environment variables.