12 using System.Collections.Generic;
13 using System.Diagnostics;
15 using System.Threading.Tasks;
16 using BenchmarkDotNet.Attributes;
17 using BenchmarkDotNet.Jobs;
27 public int id {
get;
set; }
29 public double value {
get;
set; }
34 [SimpleJob(RuntimeMoniker.Net80)]
40 private string _tableName =
"";
41 private int _totalRecords;
43 [Params(1000, 10000, 50000)]
52 var url = Environment.GetEnvironmentVariable(
"KINETICA_URL") ??
"http://localhost:9191";
53 var user = Environment.GetEnvironmentVariable(
"KINETICA_USER") ??
"admin";
54 var password = Environment.GetEnvironmentVariable(
"KINETICA_PASSWORD") ??
"secret";
63 var schemaName =
"bench_schema";
64 _tableName = $
"{schemaName}.avro_decode_test";
66 try { _kinetica.
executeSql($
"CREATE SCHEMA IF NOT EXISTS {schemaName}"); }
catch { }
67 try { _kinetica.
executeSql($
"DROP TABLE IF EXISTS {_tableName}"); }
catch { }
70 CREATE TABLE {_tableName} ( 75 payload VARCHAR({PayloadSize}), 83 var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
86 const int sqlBatchSize = 1000;
87 for (
int batch = 0; batch <
RecordCount; batch += sqlBatchSize)
89 var batchEnd = Math.Min(batch + sqlBatchSize,
RecordCount);
90 var values =
new List<string>();
91 for (
int i = batch; i < batchEnd; i++)
93 values.Add($
"({i}, 'cat_{i % 100}', {i * 0.01}, {timestamp}, '{payloadTemplate}')");
95 _kinetica.
executeSql($
"INSERT INTO {_tableName} (id, category, value, timestamp, payload) VALUES {string.Join(",
", values)}");
106 _kinetica?.
executeSql($
"DROP TABLE IF EXISTS {_tableName}");
111 [Benchmark(Baseline =
true)]
114 if (_kinetica ==
null)
115 throw new InvalidOperationException(
"Setup not complete");
121 return response.data.ToList();
127 if (_kinetica ==
null)
128 throw new InvalidOperationException(
"Setup not complete");
131 var batchSize = Math.Max(1000, _totalRecords / Environment.ProcessorCount);
132 var batches =
new List<(int offset, int limit)>();
134 for (
int offset = 0; offset < _totalRecords; offset += batchSize)
136 var limit = Math.Min(batchSize, _totalRecords - offset);
137 batches.Add((offset, limit));
140 var results =
new List<AvroDecodeRecord>[batches.Count];
142 Parallel.For(0, batches.Count, i =>
144 var (offset, limit) = batches[i];
145 var response = _kinetica!.getRecords<AvroDecodeRecord>(
146 new GetRecordsRequest(_tableName, offset, limit, null));
147 results[i] = response.data.ToList();
151 var combined =
new List<AvroDecodeRecord>(_totalRecords);
152 foreach (var batch
in results)
154 combined.AddRange(batch);
169 Console.WriteLine(
"====================================================================");
170 Console.WriteLine(
" AVRO DECODING PERFORMANCE BENCHMARK");
171 Console.WriteLine(
"====================================================================");
173 Console.WriteLine(
" Comparing:");
174 Console.WriteLine(
" * Sequential fetch + decode");
175 Console.WriteLine(
" * Parallel fetch + decode (multiple requests)");
178 var url = Environment.GetEnvironmentVariable(
"KINETICA_URL") ??
"http://localhost:9191";
179 var user = Environment.GetEnvironmentVariable(
"KINETICA_USER") ??
"admin";
180 var password = Environment.GetEnvironmentVariable(
"KINETICA_PASSWORD") ??
"secret";
182 Console.WriteLine($
"Connected to: {url}");
192 var testConfigs =
new (
int records,
int payload)[]
201 var schemaName =
"decode_bench";
202 var tableName = $
"{schemaName}.decode_test";
204 foreach (var (recordCount, payloadSize) in testConfigs)
206 Console.WriteLine(
"--------------------------------------------------------------------");
207 Console.WriteLine($
" Test: {recordCount:N0} records, {payloadSize} byte payload");
208 Console.WriteLine(
"--------------------------------------------------------------------");
211 try {
kinetica.executeSql($
"DROP SCHEMA IF EXISTS {schemaName} CASCADE"); }
catch { }
212 try {
kinetica.executeSql($
"CREATE SCHEMA {schemaName}"); }
catch { }
215 CREATE TABLE {tableName} ( 217 category VARCHAR(64), 220 payload VARCHAR({payloadSize}), 226 Console.WriteLine($
" Inserting {recordCount:N0} records...");
227 var payloadTemplate =
new string(
'x', payloadSize);
228 var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
231 const int sqlBatchSize = 1000;
232 for (
int batch = 0; batch < recordCount; batch += sqlBatchSize)
234 var batchEnd = Math.Min(batch + sqlBatchSize, recordCount);
235 var values =
new List<string>();
236 for (
int i = batch; i < batchEnd; i++)
238 values.Add($
"({i}, 'cat_{i % 100}', {i * 0.01}, {timestamp}, '{payloadTemplate}')");
240 kinetica.executeSql($
"INSERT INTO {tableName} (id, category, value, timestamp, payload) VALUES {string.Join(",
", values)}");
244 Console.WriteLine(
" Warming up...");
248 Console.WriteLine(
" Running sequential benchmark...");
249 var sw = Stopwatch.StartNew();
252 var seqTime = sw.Elapsed;
253 var seqThroughput = recordCount / seqTime.TotalSeconds;
256 Console.WriteLine(
" Running parallel benchmark...");
258 var batchSize = Math.Max(1000, recordCount / Environment.ProcessorCount);
259 var batches =
new List<(int offset, int limit)>();
261 for (
int offset = 0; offset < recordCount; offset += batchSize)
263 var limit = Math.Min(batchSize, recordCount - offset);
264 batches.Add((offset, limit));
267 var parResults =
new IList<AvroDecodeRecord>[batches.Count];
268 Parallel.For(0, batches.Count, i =>
270 var (offset, limit) = batches[i];
271 var response = kinetica.getRecords<AvroDecodeRecord>(
272 new GetRecordsRequest(tableName, offset, limit, null));
273 parResults[i] = response.data;
275 var parTime = sw.Elapsed;
276 var parThroughput = recordCount / parTime.TotalSeconds;
280 Console.WriteLine($
" {"Method
",-45} | {"Records",-8} | {"Time",-12} | {"Throughput
",-15}");
281 Console.WriteLine($
" {new string('-', 85)}");
282 Console.WriteLine($
" {"Sequential",-45} | {recordCount,-8:N0} | {seqTime.TotalMilliseconds,-8:F2} ms | {seqThroughput,-12:N0} rec/s");
283 Console.WriteLine($
" {"Parallel (multi-request)
",-45} | {recordCount,-8:N0} | {parTime.TotalMilliseconds,-8:F2} ms | {parThroughput,-12:N0} rec/s");
285 Console.WriteLine($
" Speedup: {seqTime.TotalMilliseconds / parTime.TotalMilliseconds:F2}x");
290 try {
kinetica.executeSql($
"DROP SCHEMA IF EXISTS {schemaName} CASCADE"); }
catch { }
292 Console.WriteLine(
"====================================================================");
293 Console.WriteLine(
" BENCHMARK COMPLETE");
294 Console.WriteLine(
"====================================================================");
296 Console.WriteLine(
"Summary:");
297 Console.WriteLine(
" * Parallel fetching can improve throughput for large result sets");
298 Console.WriteLine(
" * Network latency dominates for small result sets");
299 Console.WriteLine(
" * Use parallel fetch when results exceed 10K records");
Failover to clusters in sequential order
List< AvroDecodeRecord > SequentialDecode()
ExecuteSqlResponse executeSql(ExecuteSqlRequest request_)
Execute a SQL statement (query, DML, or DDL).
Time in HH:MM:SS.mmm format
Standalone Avro decode benchmark runner (without BenchmarkDotNet).
Avro decode benchmark record type.
A set of parameters for Kinetica.getRecords.
API to talk to Kinetica Database
List< AvroDecodeRecord > ParallelFetchDecode()