Kinetica   C#   API  Version 7.2.3.1
AvroDecodeBenchmarks.cs
Go to the documentation of this file.
1 
11 using System;
12 using System.Collections.Generic;
13 using System.Diagnostics;
14 using System.Linq;
15 using System.Threading.Tasks;
16 using BenchmarkDotNet.Attributes;
17 using BenchmarkDotNet.Jobs;
18 using kinetica;
19 
21 {
25  public class AvroDecodeRecord
26  {
27  public int id { get; set; }
28  public string category { get; set; } = "";
29  public double value { get; set; }
30  public long timestamp { get; set; }
31  public string payload { get; set; } = "";
32  }
33 
34  [SimpleJob(RuntimeMoniker.Net80)]
35  [MemoryDiagnoser]
36  [RPlotExporter]
37  public class AvroDecodeBenchmarks
38  {
39  private kinetica.Kinetica? _kinetica;
40  private string _tableName = "";
41  private int _totalRecords;
42 
43  [Params(1000, 10000, 50000)]
44  public int RecordCount { get; set; }
45 
46  [Params(64, 256)]
47  public int PayloadSize { get; set; }
48 
49  [GlobalSetup]
50  public void Setup()
51  {
52  var url = Environment.GetEnvironmentVariable("KINETICA_URL") ?? "http://localhost:9191";
53  var user = Environment.GetEnvironmentVariable("KINETICA_USER") ?? "admin";
54  var password = Environment.GetEnvironmentVariable("KINETICA_PASSWORD") ?? "secret";
55 
56  _kinetica = new kinetica.Kinetica(url, new kinetica.Kinetica.Options
57  {
58  Username = user,
59  Password = password
60  });
61 
62  // Create test table
63  var schemaName = "bench_schema";
64  _tableName = $"{schemaName}.avro_decode_test";
65 
66  try { _kinetica.executeSql($"CREATE SCHEMA IF NOT EXISTS {schemaName}"); } catch { }
67  try { _kinetica.executeSql($"DROP TABLE IF EXISTS {_tableName}"); } catch { }
68 
69  _kinetica.executeSql($@"
70  CREATE TABLE {_tableName} (
71  id INT NOT NULL,
72  category VARCHAR(64),
73  value DOUBLE,
74  timestamp LONG,
75  payload VARCHAR({PayloadSize}),
76  PRIMARY KEY (id),
77  SHARD KEY (id)
78  )
79  ");
80 
81  // Generate and insert test records using SQL (avoids type registration issues)
82  var payloadTemplate = new string('x', PayloadSize);
83  var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
84 
85  // Insert in batches using SQL
86  const int sqlBatchSize = 1000;
87  for (int batch = 0; batch < RecordCount; batch += sqlBatchSize)
88  {
89  var batchEnd = Math.Min(batch + sqlBatchSize, RecordCount);
90  var values = new List<string>();
91  for (int i = batch; i < batchEnd; i++)
92  {
93  values.Add($"({i}, 'cat_{i % 100}', {i * 0.01}, {timestamp}, '{payloadTemplate}')");
94  }
95  _kinetica.executeSql($"INSERT INTO {_tableName} (id, category, value, timestamp, payload) VALUES {string.Join(",", values)}");
96  }
97 
98  _totalRecords = RecordCount;
99  }
100 
101  [GlobalCleanup]
102  public void Cleanup()
103  {
104  try
105  {
106  _kinetica?.executeSql($"DROP TABLE IF EXISTS {_tableName}");
107  }
108  catch { }
109  }
110 
111  [Benchmark(Baseline = true)]
112  public List<AvroDecodeRecord> SequentialDecode()
113  {
114  if (_kinetica == null)
115  throw new InvalidOperationException("Setup not complete");
116 
117  // Fetch records using getRecords which includes decoding
118  var response = _kinetica.getRecords<AvroDecodeRecord>(
119  new GetRecordsRequest(_tableName, 0, _totalRecords, null));
120 
121  return response.data.ToList();
122  }
123 
124  [Benchmark]
125  public List<AvroDecodeRecord> ParallelFetchDecode()
126  {
127  if (_kinetica == null)
128  throw new InvalidOperationException("Setup not complete");
129 
130  // Parallel fetch by splitting into multiple requests
131  var batchSize = Math.Max(1000, _totalRecords / Environment.ProcessorCount);
132  var batches = new List<(int offset, int limit)>();
133 
134  for (int offset = 0; offset < _totalRecords; offset += batchSize)
135  {
136  var limit = Math.Min(batchSize, _totalRecords - offset);
137  batches.Add((offset, limit));
138  }
139 
140  var results = new List<AvroDecodeRecord>[batches.Count];
141 
142  Parallel.For(0, batches.Count, i =>
143  {
144  var (offset, limit) = batches[i];
145  var response = _kinetica!.getRecords<AvroDecodeRecord>(
146  new GetRecordsRequest(_tableName, offset, limit, null));
147  results[i] = response.data.ToList();
148  });
149 
150  // Combine results
151  var combined = new List<AvroDecodeRecord>(_totalRecords);
152  foreach (var batch in results)
153  {
154  combined.AddRange(batch);
155  }
156 
157  return combined;
158  }
159  }
160 
165  public static class AvroDecodeBenchmarkRunner
166  {
167  public static void Run()
168  {
169  Console.WriteLine("====================================================================");
170  Console.WriteLine(" AVRO DECODING PERFORMANCE BENCHMARK");
171  Console.WriteLine("====================================================================");
172  Console.WriteLine();
173  Console.WriteLine(" Comparing:");
174  Console.WriteLine(" * Sequential fetch + decode");
175  Console.WriteLine(" * Parallel fetch + decode (multiple requests)");
176  Console.WriteLine();
177 
178  var url = Environment.GetEnvironmentVariable("KINETICA_URL") ?? "http://localhost:9191";
179  var user = Environment.GetEnvironmentVariable("KINETICA_USER") ?? "admin";
180  var password = Environment.GetEnvironmentVariable("KINETICA_PASSWORD") ?? "secret";
181 
182  Console.WriteLine($"Connected to: {url}");
183  Console.WriteLine();
184 
186  {
187  Username = user,
188  Password = password
189  });
190 
191  // Test configurations: (recordCount, payloadSize)
192  var testConfigs = new (int records, int payload)[]
193  {
194  (1000, 64),
195  (10000, 64),
196  (50000, 64),
197  (100000, 64),
198  (100000, 256),
199  };
200 
201  var schemaName = "decode_bench";
202  var tableName = $"{schemaName}.decode_test";
203 
204  foreach (var (recordCount, payloadSize) in testConfigs)
205  {
206  Console.WriteLine("--------------------------------------------------------------------");
207  Console.WriteLine($" Test: {recordCount:N0} records, {payloadSize} byte payload");
208  Console.WriteLine("--------------------------------------------------------------------");
209 
210  // Setup
211  try { kinetica.executeSql($"DROP SCHEMA IF EXISTS {schemaName} CASCADE"); } catch { }
212  try { kinetica.executeSql($"CREATE SCHEMA {schemaName}"); } catch { }
213 
214  kinetica.executeSql($@"
215  CREATE TABLE {tableName} (
216  id INT NOT NULL,
217  category VARCHAR(64),
218  value DOUBLE,
219  timestamp LONG,
220  payload VARCHAR({payloadSize}),
221  PRIMARY KEY (id)
222  )
223  ");
224 
225  // Insert records using SQL for simplicity (avoids type registration issues)
226  Console.WriteLine($" Inserting {recordCount:N0} records...");
227  var payloadTemplate = new string('x', payloadSize);
228  var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
229 
230  // Insert in batches using SQL
231  const int sqlBatchSize = 1000;
232  for (int batch = 0; batch < recordCount; batch += sqlBatchSize)
233  {
234  var batchEnd = Math.Min(batch + sqlBatchSize, recordCount);
235  var values = new List<string>();
236  for (int i = batch; i < batchEnd; i++)
237  {
238  values.Add($"({i}, 'cat_{i % 100}', {i * 0.01}, {timestamp}, '{payloadTemplate}')");
239  }
240  kinetica.executeSql($"INSERT INTO {tableName} (id, category, value, timestamp, payload) VALUES {string.Join(",", values)}");
241  }
242 
243  // Warm up
244  Console.WriteLine(" Warming up...");
245  kinetica.getRecords<AvroDecodeRecord>(new GetRecordsRequest(tableName, 0, 100, null));
246 
247  // Benchmark: Sequential
248  Console.WriteLine(" Running sequential benchmark...");
249  var sw = Stopwatch.StartNew();
250  var seqResponse = kinetica.getRecords<AvroDecodeRecord>(
251  new GetRecordsRequest(tableName, 0, recordCount, null));
252  var seqTime = sw.Elapsed;
253  var seqThroughput = recordCount / seqTime.TotalSeconds;
254 
255  // Benchmark: Parallel
256  Console.WriteLine(" Running parallel benchmark...");
257  sw.Restart();
258  var batchSize = Math.Max(1000, recordCount / Environment.ProcessorCount);
259  var batches = new List<(int offset, int limit)>();
260 
261  for (int offset = 0; offset < recordCount; offset += batchSize)
262  {
263  var limit = Math.Min(batchSize, recordCount - offset);
264  batches.Add((offset, limit));
265  }
266 
267  var parResults = new IList<AvroDecodeRecord>[batches.Count];
268  Parallel.For(0, batches.Count, i =>
269  {
270  var (offset, limit) = batches[i];
271  var response = kinetica.getRecords<AvroDecodeRecord>(
272  new GetRecordsRequest(tableName, offset, limit, null));
273  parResults[i] = response.data;
274  });
275  var parTime = sw.Elapsed;
276  var parThroughput = recordCount / parTime.TotalSeconds;
277 
278  // Results
279  Console.WriteLine();
280  Console.WriteLine($" {"Method",-45} | {"Records",-8} | {"Time",-12} | {"Throughput",-15}");
281  Console.WriteLine($" {new string('-', 85)}");
282  Console.WriteLine($" {"Sequential",-45} | {recordCount,-8:N0} | {seqTime.TotalMilliseconds,-8:F2} ms | {seqThroughput,-12:N0} rec/s");
283  Console.WriteLine($" {"Parallel (multi-request)",-45} | {recordCount,-8:N0} | {parTime.TotalMilliseconds,-8:F2} ms | {parThroughput,-12:N0} rec/s");
284  Console.WriteLine();
285  Console.WriteLine($" Speedup: {seqTime.TotalMilliseconds / parTime.TotalMilliseconds:F2}x");
286  Console.WriteLine();
287  }
288 
289  // Cleanup
290  try { kinetica.executeSql($"DROP SCHEMA IF EXISTS {schemaName} CASCADE"); } catch { }
291 
292  Console.WriteLine("====================================================================");
293  Console.WriteLine(" BENCHMARK COMPLETE");
294  Console.WriteLine("====================================================================");
295  Console.WriteLine();
296  Console.WriteLine("Summary:");
297  Console.WriteLine(" * Parallel fetching can improve throughput for large result sets");
298  Console.WriteLine(" * Network latency dominates for small result sets");
299  Console.WriteLine(" * Use parallel fetch when results exceed 10K records");
300  Console.WriteLine();
301  }
302  }
303 }
Failover to clusters in sequential order
ExecuteSqlResponse executeSql(ExecuteSqlRequest request_)
Execute a SQL statement (query, DML, or DDL).
Time in HH:MM:SS.mmm format
Standalone Avro decode benchmark runner (without BenchmarkDotNet).
Avro decode benchmark record type.
kinetica.Records Records
Definition: BulkInserter.cs:10
A set of parameters for Kinetica.getRecords.
Definition: GetRecords.cs:24
Connection Options
Definition: Kinetica.cs:50
API to talk to Kinetica Database
Definition: Kinetica.cs:40
List< AvroDecodeRecord > ParallelFetchDecode()