Kinetica   C#   API  Version 7.2.3.1
BulkInserterBenchmarks.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 using System.Threading.Tasks;
4 using BenchmarkDotNet.Attributes;
5 using BenchmarkDotNet.Configs;
6 using BenchmarkDotNet.Jobs;
7 using kinetica;
8 using kinetica.Records;
9 
10 namespace Kinetica.Benchmarks
11 {
16  [Config(typeof(BenchmarkConfig))]
17  [MemoryDiagnoser]
18  [SimpleJob(RuntimeMoniker.Net80, warmupCount: 1, iterationCount: 3)]
20  {
21  private kinetica.Kinetica _kinetica = null!;
22  private string _tableName = null!;
23  private KineticaType _ktype = null!;
24  private List<BenchRecord> _records = null!;
25 
26  #region Setup
27 
28  [GlobalSetup]
29  public void GlobalSetup()
30  {
31  var url = Environment.GetEnvironmentVariable("KINETICA_URL") ?? "http://localhost:9191";
32  var username = Environment.GetEnvironmentVariable("KINETICA_USER") ?? "admin";
33  var password = Environment.GetEnvironmentVariable("KINETICA_PASSWORD") ?? "secret";
34 
35  var options = new kinetica.Kinetica.Options
36  {
37  Username = username,
38  Password = password,
39  UseSnappy = false
40  };
41 
42  _kinetica = new kinetica.Kinetica(url, options);
43 
44  // Generate records for benchmarks
45  _records = GenerateRecords(RecordCount);
46  }
47 
49  public void IterationSetup()
50  {
51  // Create a fresh table for each iteration
52  var schemaName = $"bench_{Guid.NewGuid():N}".Substring(0, 20);
53 
54  try
55  {
56  _kinetica.createSchema(schemaName, new Dictionary<string, string> { { "no_error_if_exists", "true" } });
57  }
58  catch { }
59 
60  var typeDef = @"{""type"":""record"",""name"":""bench_record"",""fields"":[" +
61  @"{""name"":""id"",""type"":""int""}," +
62  @"{""name"":""thread_id"",""type"":""int""}," +
63  @"{""name"":""timestamp"",""type"":""long""}," +
64  @"{""name"":""name"",""type"":""string""}," +
65  @"{""name"":""score"",""type"":""double""}]}";
66 
67  var properties = new Dictionary<string, IList<string>>
68  {
69  { "id", new List<string> { "primary_key" } },
70  { "thread_id", new List<string>() },
71  { "timestamp", new List<string>() },
72  { "name", new List<string>() },
73  { "score", new List<string>() }
74  };
75 
76  var typeResp = _kinetica.createType(typeDef, "bench_type", properties, new Dictionary<string, string>());
77  _tableName = $"{schemaName}.bench_table";
78  _kinetica.createTable(_tableName, typeResp.type_id, new Dictionary<string, string>());
79  _ktype = KineticaType.fromTable(_kinetica, _tableName);
80  }
81 
83  public void IterationCleanup()
84  {
85  try
86  {
87  var schemaName = _tableName.Split('.')[0];
88  _kinetica.executeSql($"DROP SCHEMA IF EXISTS {schemaName} CASCADE", 0, -9999);
89  }
90  catch { }
91  }
92 
94  public void GlobalCleanup()
95  {
96  // Cleanup handled by IterationCleanup
97  }
98 
99  #endregion
100 
101  #region Parameters
102 
103  [Params(10000, 50000)]
104  public int RecordCount { get; set; }
105 
106  [Params(1000, 10000)]
107  public int BatchSize { get; set; }
108 
109  #endregion
110 
111  #region Record Generation
112 
114  {
115  public int id { get; set; }
116  public int thread_id { get; set; }
117  public long timestamp { get; set; }
118  public string name { get; set; } = string.Empty;
119  public double score { get; set; }
120 
122  {
123  return new ShardKeyValues();
124  }
125  }
126 
127  private List<BenchRecord> GenerateRecords(int count)
128  {
129  var records = new List<BenchRecord>(count);
130  var baseTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
131 
132  for (int i = 0; i < count; i++)
133  {
134  records.Add(new BenchRecord
135  {
136  id = i,
137  thread_id = Environment.CurrentManagedThreadId,
138  timestamp = baseTimestamp + i,
139  name = $"record_{i:D8}",
140  score = i * 1.5
141  });
142  }
143 
144  return records;
145  }
146 
147  #endregion
148 
149  #region Benchmarks
150 
151  [Benchmark(Baseline = true)]
152  public async Task BulkInserter_InsertBatch()
153  {
154  var options = new BulkInserterOptions
155  {
157  };
158 
159  await using var inserter = new BulkInserter<BenchRecord>(_kinetica, _tableName, _ktype, options);
160 
161  inserter.InsertBatch(_records);
162 
163  await inserter.CloseAsync();
164  }
165 
166  [Benchmark]
167  public async Task BulkInserter_InsertBatchAsync()
168  {
169  var options = new BulkInserterOptions
170  {
172  MaxInFlightBatches = 20
173  };
174 
175  await using var inserter = new BulkInserter<BenchRecord>(_kinetica, _tableName, _ktype, options);
176 
177  await inserter.InsertBatchAsync(_records);
178 
179  await inserter.CloseAsync();
180  }
181 
182  [Benchmark]
183  public async Task BulkInserter_SingleInserts()
184  {
185  var options = new BulkInserterOptions
186  {
188  };
189 
190  await using var inserter = new BulkInserter<BenchRecord>(_kinetica, _tableName, _ktype, options);
191 
192  foreach (var record in _records)
193  {
194  inserter.Insert(record);
195  }
196 
197  await inserter.CloseAsync();
198  }
199 
200  [Benchmark]
201  public async Task BulkInserter_ConcurrentInserts()
202  {
203  var options = new BulkInserterOptions
204  {
206  NumStripes = 8,
207  MaxInFlightBatches = 20
208  };
209 
210  await using var inserter = new BulkInserter<BenchRecord>(_kinetica, _tableName, _ktype, options);
211 
212  var chunkSize = _records.Count / 4;
213  var tasks = new Task[4];
214 
215  for (int t = 0; t < 4; t++)
216  {
217  var start = t * chunkSize;
218  var end = (t == 3) ? _records.Count : (t + 1) * chunkSize;
219  var chunk = _records.GetRange(start, end - start);
220 
221  tasks[t] = Task.Run(async () =>
222  {
223  foreach (var record in chunk)
224  {
225  await inserter.InsertAsync(record);
226  }
227  });
228  }
229 
230  await Task.WhenAll(tasks);
231  await inserter.CloseAsync();
232  }
233 
234  [Benchmark]
236  {
237  var ingestor = new KineticaIngestor<BenchRecord>(
238  _kinetica,
239  _tableName,
240  BatchSize,
241  _ktype);
242 
243  foreach (var record in _records)
244  {
245  ingestor.insert(record);
246  }
247 
248  ingestor.flush();
249  }
250 
251  [Benchmark]
253  {
254  var ingestor = new KineticaIngestor<BenchRecord>(
255  _kinetica,
256  _tableName,
257  BatchSize,
258  _ktype);
259 
260  ingestor.insert(_records);
261  ingestor.flush();
262  }
263 
264  #endregion
265  }
266 
267  public class BenchmarkConfig : ManualConfig
268  {
270  {
271  AddJob(Job.Default
272  .WithWarmupCount(1)
273  .WithIterationCount(3));
274  }
275  }
276 }
CreateTableResponse createTable(CreateTableRequest request_)
Creates a new table with the given type (definition of columns).
string? Username
Optional: User Name for Kinetica security
Definition: Kinetica.cs:157
ExecuteSqlResponse executeSql(ExecuteSqlRequest request_)
Execute a SQL statement (query, DML, or DDL).
High-performance bulk inserter for Kinetica with support for multi-head ingest,
Definition: BulkInserter.cs:28
Benchmarks for the BulkInserter comparing different batch sizes, insertion methods,...
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
bool UseSnappy
Use Snappy
Definition: Kinetica.cs:177
CreateSchemaResponse createSchema(CreateSchemaRequest request_)
Creates a SQL-style schema.
Collection of shard key column names and values.
Connection Options
Definition: Kinetica.cs:50
CreateTypeResponse createType(CreateTypeRequest request_)
Creates a new type describing the columns of a table.
ShardKeyValues GetShardKeyValues()
Returns shard key column names and their typed values.
Interface for extracting shard key values from a record.
Configuration options for the BulkInserter<T>.
API to talk to Kinetica Database
Definition: Kinetica.cs:40
Manages the insertion into GPUdb of large numbers of records in bulk, with automatic batch management...