Kinetica   C#   API  Version 7.2.3.1
BulkInserterTests.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 using System.Diagnostics;
4 using System.Linq;
5 using System.Threading;
6 using System.Threading.Tasks;
7 using Xunit;
8 using Xunit.Abstractions;
9 using Kinetica.Tests.Common;
10 using kinetica;
11 using kinetica.Records;
12 
14 {
20  [Trait("Category", "Integration")]
21  public class BulkInserterTests
22  {
23  private readonly ITestOutputHelper _output;
24 
25  public BulkInserterTests(ITestOutputHelper output)
26  {
27  _output = output;
28  }
29 
30  #region Test Record Type
31 
33  {
34  public int id { get; set; }
35  public string name { get; set; } = string.Empty;
36  public double value { get; set; }
37  public long timestamp { get; set; }
38 
40  {
41  // No shard key for this test record, use id as routing hint
42  return new ShardKeyValues();
43  }
44  }
45 
47  {
48  public int id { get; set; }
49  public string shard_key { get; set; } = string.Empty;
50  public double value { get; set; }
51 
53  {
54  return new ShardKeyValues(("shard_key", ShardKeyValue.String(shard_key)));
55  }
56  }
57 
58  #endregion
59 
60  #region Helper Methods
61 
62  private (string tableName, KineticaType ktype) SetupTestTable(TestContext ctx, bool withShardKey = false)
63  {
64  string typeDef;
65  Dictionary<string, IList<string>> properties;
66 
67  if (withShardKey)
68  {
69  // Shard key columns must be part of the primary key
70  typeDef = @"{""type"":""record"",""name"":""sharded_record"",""fields"":[" +
71  @"{""name"":""id"",""type"":""int""}," +
72  @"{""name"":""shard_key"",""type"":""string""}," +
73  @"{""name"":""value"",""type"":[""double"",""null""]}]}";
74 
75  properties = new Dictionary<string, IList<string>>
76  {
77  { "id", new List<string> { "primary_key" } },
78  { "shard_key", new List<string> { "primary_key", "shard_key" } },
79  { "value", new List<string> { "nullable" } }
80  };
81  }
82  else
83  {
84  typeDef = @"{""type"":""record"",""name"":""test_record"",""fields"":[" +
85  @"{""name"":""id"",""type"":""int""}," +
86  @"{""name"":""name"",""type"":[""string"",""null""]}," +
87  @"{""name"":""value"",""type"":[""double"",""null""]}," +
88  @"{""name"":""timestamp"",""type"":[""long"",""null""]}]}";
89 
90  properties = new Dictionary<string, IList<string>>
91  {
92  { "id", new List<string> { "primary_key" } },
93  { "name", new List<string> { "nullable" } },
94  { "value", new List<string> { "nullable" } },
95  { "timestamp", new List<string> { "nullable" } }
96  };
97  }
98 
99  var typeResp = ctx.Kinetica.createType(typeDef, "bulk_test_type", properties, new Dictionary<string, string>());
100  var tableName = ctx.QualifiedTable("bulk_test");
101  ctx.Kinetica.createTable(tableName, typeResp.type_id, new Dictionary<string, string>());
102 
103  var ktype = KineticaType.fromTable(ctx.Kinetica, tableName);
104  return (tableName, ktype);
105  }
106 
107  private List<TestRecord> GenerateTestRecords(int count, int startId = 0)
108  {
109  var records = new List<TestRecord>(count);
110  var baseTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
111 
112  for (int i = 0; i < count; i++)
113  {
114  records.Add(new TestRecord
115  {
116  id = startId + i,
117  name = $"record_{startId + i:D8}",
118  value = (startId + i) * 1.5,
119  timestamp = baseTimestamp + i
120  });
121  }
122 
123  return records;
124  }
125 
126  private List<ShardedRecord> GenerateShardedRecords(int count, int startId = 0)
127  {
128  var records = new List<ShardedRecord>(count);
129 
130  for (int i = 0; i < count; i++)
131  {
132  records.Add(new ShardedRecord
133  {
134  id = startId + i,
135  shard_key = $"shard_{(startId + i) % 100:D3}",
136  value = (startId + i) * 2.5
137  });
138  }
139 
140  return records;
141  }
142 
143  #endregion
144 
145  #region Basic Insert Tests
146 
147  [Fact]
148  public async Task TestBulkInserterBasicInsert()
149  {
150  using var ctx = new TestContext("bulk_basic");
151  var (tableName, ktype) = SetupTestTable(ctx);
152 
153  var options = new BulkInserterOptions
154  {
155  BatchSize = 100
156  };
157 
158  await using var inserter = new BulkInserter<TestRecord>(ctx.Kinetica, tableName, ktype, options);
159 
160  var records = GenerateTestRecords(500);
161 
162  foreach (var record in records)
163  {
164  inserter.Insert(record);
165  }
166 
167  await inserter.CloseAsync();
168 
169  Assert.Equal(500, inserter.CountInserted);
170 
171  // Verify in database - total_number_of_records is the rows returned, not the count value
172  var response = ctx.Kinetica.executeSql($"SELECT COUNT(*) AS cnt FROM {tableName}", 0, -9999);
173  Assert.Equal(1, response.total_number_of_records); // One row returned with the count
174  }
175 
176  [Fact]
177  public async Task TestBulkInserterBatchInsert()
178  {
179  using var ctx = new TestContext("bulk_batch");
180  var (tableName, ktype) = SetupTestTable(ctx);
181 
182  var options = new BulkInserterOptions
183  {
184  BatchSize = 1000
185  };
186 
187  await using var inserter = new BulkInserter<TestRecord>(ctx.Kinetica, tableName, ktype, options);
188 
189  var records = GenerateTestRecords(5000);
190 
191  // Insert all at once using batch method
192  inserter.InsertBatch(records);
193 
194  await inserter.CloseAsync();
195 
196  _output.WriteLine($"Inserted: {inserter.CountInserted}, Updated: {inserter.CountUpdated}");
197  _output.WriteLine($"Total batches sent: {inserter.TotalBatchesSent}");
198 
199  Assert.Equal(5000, inserter.CountInserted);
200  }
201 
202  [Fact]
203  public async Task TestBulkInserterAsyncInsert()
204  {
205  using var ctx = new TestContext("bulk_async");
206  var (tableName, ktype) = SetupTestTable(ctx);
207 
208  var options = new BulkInserterOptions
209  {
210  BatchSize = 100,
211  MaxInFlightBatches = 10
212  };
213 
214  await using var inserter = new BulkInserter<TestRecord>(ctx.Kinetica, tableName, ktype, options);
215 
216  var records = GenerateTestRecords(1000);
217 
218  // Insert with backpressure
219  foreach (var record in records)
220  {
221  await inserter.InsertAsync(record);
222  }
223 
224  await inserter.CloseAsync();
225 
226  Assert.Equal(1000, inserter.CountInserted);
227  }
228 
229  [Fact]
231  {
232  using var ctx = new TestContext("bulk_async_batch");
233  var (tableName, ktype) = SetupTestTable(ctx);
234 
235  var options = new BulkInserterOptions
236  {
237  BatchSize = 500,
238  MaxInFlightBatches = 5
239  };
240 
241  await using var inserter = new BulkInserter<TestRecord>(ctx.Kinetica, tableName, ktype, options);
242 
243  var records = GenerateTestRecords(2500);
244 
245  await inserter.InsertBatchAsync(records);
246 
247  await inserter.CloseAsync();
248 
249  Assert.Equal(2500, inserter.CountInserted);
250  }
251 
252  #endregion
253 
254  #region Shard Key Tests
255 
256  [Fact]
257  public async Task TestBulkInserterWithShardKey()
258  {
259  using var ctx = new TestContext("bulk_shard");
260  var (tableName, ktype) = SetupTestTable(ctx, withShardKey: true);
261 
262  var options = new BulkInserterOptions
263  {
264  BatchSize = 100
265  };
266 
267  await using var inserter = new BulkInserter<ShardedRecord>(ctx.Kinetica, tableName, ktype, options);
268 
269  var records = GenerateShardedRecords(1000);
270 
271  inserter.InsertBatch(records);
272 
273  await inserter.CloseAsync();
274 
275  Assert.Equal(1000, inserter.CountInserted);
276 
277  // Verify shard distribution (if multi-head is enabled)
278  _output.WriteLine($"Workers: {inserter.NumWorkers}");
279  _output.WriteLine($"Batches sent: {inserter.TotalBatchesSent}");
280  }
281 
282  #endregion
283 
284  #region Batch Listener Tests
285 
286  [Fact]
287  public async Task TestBulkInserterWithListener()
288  {
289  using var ctx = new TestContext("bulk_listener");
290  var (tableName, ktype) = SetupTestTable(ctx);
291 
292  var listener = new TestBatchListener();
293 
294  var options = new BulkInserterOptions
295  {
296  BatchSize = 100,
297  BatchListener = listener
298  };
299 
300  await using var inserter = new BulkInserter<TestRecord>(ctx.Kinetica, tableName, ktype, options);
301 
302  var records = GenerateTestRecords(350);
303  inserter.InsertBatch(records);
304 
305  await inserter.CloseAsync();
306 
307  _output.WriteLine($"Batches completed: {listener.BatchesCompleted}");
308  _output.WriteLine($"Total inserted via listener: {listener.TotalInserted}");
309  _output.WriteLine($"Avg encode time: {listener.AverageEncodeTimeMs:F2}ms");
310  _output.WriteLine($"Avg network time: {listener.AverageNetworkTimeMs:F2}ms");
311 
312  Assert.True(listener.BatchesCompleted >= 3, $"Expected at least 3 batches, got {listener.BatchesCompleted}");
313  Assert.Equal(350, listener.TotalInserted);
314  Assert.True(listener.AllSucceeded);
315  }
316 
317  private class TestBatchListener : IBatchInsertionListener
318  {
319  private int _batchesCompleted;
320  private long _totalInserted;
321  private double _totalEncodeTime;
322  private double _totalNetworkTime;
323  private bool _allSucceeded = true;
324 
325  public int BatchesCompleted => _batchesCompleted;
326  public long TotalInserted => _totalInserted;
327  public double AverageEncodeTimeMs => _batchesCompleted > 0 ? _totalEncodeTime / _batchesCompleted : 0;
328  public double AverageNetworkTimeMs => _batchesCompleted > 0 ? _totalNetworkTime / _batchesCompleted : 0;
329  public bool AllSucceeded => _allSucceeded;
330 
331  public void OnBatchInserted(BatchInsertionResult result)
332  {
333  Interlocked.Increment(ref _batchesCompleted);
334  Interlocked.Add(ref _totalInserted, result.CountInserted);
335 
336  // Thread-safe accumulation
337  lock (this)
338  {
339  _totalEncodeTime += result.EncodeTimeMs;
340  _totalNetworkTime += result.NetworkTimeMs;
341  }
342 
343  if (!result.Success)
344  _allSucceeded = false;
345  }
346  }
347 
348  #endregion
349 
350  #region Backpressure Tests
351 
352  [Fact]
353  public async Task TestBulkInserterBackpressure()
354  {
355  using var ctx = new TestContext("bulk_backpressure");
356  var (tableName, ktype) = SetupTestTable(ctx);
357 
358  var options = new BulkInserterOptions
359  {
360  BatchSize = 50,
361  MaxInFlightBatches = 5
362  };
363 
364  await using var inserter = new BulkInserter<TestRecord>(ctx.Kinetica, tableName, ktype, options);
365 
366  var records = GenerateTestRecords(500);
367 
368  // Monitor backpressure during insert
369  var maxUtilization = 0.0;
370 
371  foreach (var record in records)
372  {
373  await inserter.InsertAsync(record);
374 
375  var metrics = inserter.GetBackpressureMetrics();
376  if (metrics.UtilizationPercent > maxUtilization)
377  maxUtilization = metrics.UtilizationPercent;
378  }
379 
380  await inserter.CloseAsync();
381 
382  _output.WriteLine($"Max backpressure utilization: {maxUtilization:F1}%");
383  _output.WriteLine($"Final metrics: {inserter.GetBackpressureMetrics().InFlightBatches} in-flight");
384 
385  Assert.Equal(500, inserter.CountInserted);
386  }
387 
388  #endregion
389 
390  #region Concurrent Insert Tests
391 
392  [Fact]
394  {
395  using var ctx = new TestContext("bulk_concurrent");
396  var (tableName, ktype) = SetupTestTable(ctx);
397 
398  var options = new BulkInserterOptions
399  {
400  BatchSize = 100,
401  MaxInFlightBatches = 20,
402  NumStripes = 8
403  };
404 
405  await using var inserter = new BulkInserter<TestRecord>(ctx.Kinetica, tableName, ktype, options);
406 
407  const int numThreads = 4;
408  const int recordsPerThread = 500;
409 
410  var tasks = new Task[numThreads];
411 
412  for (int t = 0; t < numThreads; t++)
413  {
414  int threadId = t;
415  tasks[t] = Task.Run(async () =>
416  {
417  var records = GenerateTestRecords(recordsPerThread, threadId * recordsPerThread);
418  foreach (var record in records)
419  {
420  await inserter.InsertAsync(record);
421  }
422  });
423  }
424 
425  await Task.WhenAll(tasks);
426  await inserter.CloseAsync();
427 
428  _output.WriteLine($"Total inserted: {inserter.CountInserted}");
429  _output.WriteLine($"Total batches: {inserter.TotalBatchesSent}");
430 
431  Assert.Equal(numThreads * recordsPerThread, inserter.CountInserted);
432  }
433 
434  #endregion
435 
436  #region Flush Tests
437 
438  [Fact]
439  public async Task TestBulkInserterManualFlush()
440  {
441  using var ctx = new TestContext("bulk_flush");
442  var (tableName, ktype) = SetupTestTable(ctx);
443 
444  var options = new BulkInserterOptions
445  {
446  BatchSize = 1000 // Large batch size to prevent auto-flush
447  };
448 
449  await using var inserter = new BulkInserter<TestRecord>(ctx.Kinetica, tableName, ktype, options);
450 
451  // Insert fewer records than batch size
452  var records = GenerateTestRecords(250);
453  foreach (var record in records)
454  {
455  inserter.Insert(record);
456  }
457 
458  // Records should still be pending
459  Assert.Equal(0, inserter.CountInserted);
460 
461  // Manual flush
462  await inserter.FlushAsync();
463 
464  // Wait for flush to complete
465  await inserter.CloseAsync();
466 
467  Assert.Equal(250, inserter.CountInserted);
468  }
469 
470  #endregion
471 
472  #region Error Handling Tests
473 
474  [Fact]
475  public async Task TestBulkInserterErrorQueue()
476  {
477  using var ctx = new TestContext("bulk_errors");
478  var (tableName, ktype) = SetupTestTable(ctx);
479 
480  var options = new BulkInserterOptions
481  {
482  BatchSize = 100,
483  MaxRetries = 0, // No retries for this test
484  MaxErrorQueueSize = 100
485  };
486 
487  await using var inserter = new BulkInserter<TestRecord>(ctx.Kinetica, tableName, ktype, options);
488 
489  // Insert valid records
490  var records = GenerateTestRecords(200);
491  inserter.InsertBatch(records);
492 
493  await inserter.CloseAsync();
494 
495  var errors = inserter.DrainErrors();
496  _output.WriteLine($"Errors: {errors.Count}");
497 
498  // With valid data, we expect no errors
499  Assert.Empty(errors);
500  Assert.Equal(200, inserter.CountInserted);
501  }
502 
503  #endregion
504 
505  #region Metrics Tests
506 
507  [Fact]
508  public async Task TestBulkInserterMetrics()
509  {
510  using var ctx = new TestContext("bulk_metrics");
511  var (tableName, ktype) = SetupTestTable(ctx);
512 
513  var options = new BulkInserterOptions
514  {
515  BatchSize = 100
516  };
517 
518  await using var inserter = new BulkInserter<TestRecord>(ctx.Kinetica, tableName, ktype, options);
519 
520  var records = GenerateTestRecords(500);
521  inserter.InsertBatch(records);
522 
523  await inserter.CloseAsync();
524 
525  _output.WriteLine($"Count Inserted: {inserter.CountInserted}");
526  _output.WriteLine($"Count Updated: {inserter.CountUpdated}");
527  _output.WriteLine($"Total Batches Sent: {inserter.TotalBatchesSent}");
528  _output.WriteLine($"Total Batches Failed: {inserter.TotalBatchesFailed}");
529  _output.WriteLine($"Pending Batches: {inserter.PendingBatches}");
530  _output.WriteLine($"Error Count: {inserter.ErrorCount}");
531 
532  Assert.Equal(500, inserter.CountInserted);
533  Assert.Equal(0, inserter.CountUpdated);
534  Assert.True(inserter.TotalBatchesSent >= 5);
535  Assert.Equal(0, inserter.TotalBatchesFailed);
536  Assert.Equal(0, inserter.PendingBatches);
537  Assert.Equal(0, inserter.ErrorCount);
538  }
539 
540  #endregion
541 
542  #region Large Batch Tests
543 
544  [Fact]
545  public async Task TestBulkInserterLargeBatch()
546  {
547  using var ctx = new TestContext("bulk_large");
548  var (tableName, ktype) = SetupTestTable(ctx);
549 
550  var options = new BulkInserterOptions
551  {
552  BatchSize = 10000,
553  MaxInFlightBatches = 10
554  };
555 
556  await using var inserter = new BulkInserter<TestRecord>(ctx.Kinetica, tableName, ktype, options);
557 
558  const int totalRecords = 50000;
559  var records = GenerateTestRecords(totalRecords);
560 
561  var sw = Stopwatch.StartNew();
562  inserter.InsertBatch(records);
563  await inserter.CloseAsync();
564  sw.Stop();
565 
566  var recordsPerSecond = totalRecords / sw.Elapsed.TotalSeconds;
567 
568  _output.WriteLine($"Inserted {totalRecords} records in {sw.Elapsed.TotalSeconds:F2}s");
569  _output.WriteLine($"Throughput: {recordsPerSecond:F0} records/second");
570  _output.WriteLine($"Total batches: {inserter.TotalBatchesSent}");
571 
572  Assert.Equal(totalRecords, inserter.CountInserted);
573  }
574 
575  #endregion
576  }
577 }
ShardKeyValues GetShardKeyValues()
Returns shard key column names and their typed values.
bool Success
Whether the batch insertion succeeded.
CreateTableResponse createTable(CreateTableRequest request_)
Creates a new table with the given type (definition of columns).
long CountInserted
Number of records successfully inserted.
ExecuteSqlResponse executeSql(ExecuteSqlRequest request_)
Execute a SQL statement (query, DML, or DDL).
High-performance bulk inserter for Kinetica with support for multi-head ingest,
Definition: BulkInserter.cs:28
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
double NetworkTimeMs
Time spent on network I/O in milliseconds.
Test context that manages schema and cleanup for integration tests.
Definition: TestContext.cs:11
Collection of shard key column names and values.
CreateTypeResponse createType(CreateTypeRequest request_)
Creates a new type describing the columns of a table.
Integration tests for the high-performance BulkInserter.
ShardKeyValues GetShardKeyValues()
Returns shard key column names and their typed values.
double EncodeTimeMs
Time spent encoding records in milliseconds.
static ShardKeyValue String(string value)
Creates a string shard key value.
A typed value for shard key computation.
Interface for extracting shard key values from a record.
string QualifiedTable(string tableName)
Get a qualified table name (schema.table).
Definition: TestContext.cs:74
Result of a batch insertion operation.
Configuration options for the BulkInserter<T>.
Listener interface for batch insertion events.