2 using System.Collections.Generic;
3 using System.Diagnostics;
5 using System.Threading;
6 using System.Threading.Tasks;
8 using Xunit.Abstractions;
20 [Trait(
"Category",
"Integration")]
23 private readonly ITestOutputHelper _output;
30 #region Test Record Type 34 public int id {
get;
set; }
35 public string name {
get;
set; } =
string.Empty;
36 public double value {
get;
set; }
48 public int id {
get;
set; }
49 public string shard_key {
get;
set; } =
string.Empty;
50 public double value {
get;
set; }
60 #region Helper Methods 65 Dictionary<string, IList<string>> properties;
70 typeDef =
@"{""type"":""record"",""name"":""sharded_record"",""fields"":[" +
71 @"{""name"":""id"",""type"":""int""}," +
72 @"{""name"":""shard_key"",""type"":""string""}," +
73 @"{""name"":""value"",""type"":[""double"",""null""]}]}";
75 properties =
new Dictionary<string, IList<string>>
77 {
"id",
new List<string> {
"primary_key" } },
78 {
"shard_key",
new List<string> {
"primary_key",
"shard_key" } },
79 {
"value",
new List<string> {
"nullable" } }
84 typeDef =
@"{""type"":""record"",""name"":""test_record"",""fields"":[" +
85 @"{""name"":""id"",""type"":""int""}," +
86 @"{""name"":""name"",""type"":[""string"",""null""]}," +
87 @"{""name"":""value"",""type"":[""double"",""null""]}," +
88 @"{""name"":""timestamp"",""type"":[""long"",""null""]}]}";
90 properties =
new Dictionary<string, IList<string>>
92 {
"id",
new List<string> {
"primary_key" } },
93 {
"name",
new List<string> {
"nullable" } },
94 {
"value",
new List<string> {
"nullable" } },
95 {
"timestamp",
new List<string> {
"nullable" } }
99 var typeResp = ctx.
Kinetica.
createType(typeDef,
"bulk_test_type", properties,
new Dictionary<string, string>());
104 return (tableName, ktype);
107 private List<TestRecord> GenerateTestRecords(
int count,
int startId = 0)
109 var records =
new List<TestRecord>(count);
110 var baseTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
112 for (
int i = 0; i < count; i++)
114 records.Add(
new TestRecord
117 name = $
"record_{startId + i:D8}",
118 value = (startId + i) * 1.5,
119 timestamp = baseTimestamp + i
126 private List<ShardedRecord> GenerateShardedRecords(
int count,
int startId = 0)
128 var records =
new List<ShardedRecord>(count);
130 for (
int i = 0; i < count; i++)
132 records.Add(
new ShardedRecord
135 shard_key = $
"shard_{(startId + i) % 100:D3}",
136 value = (startId + i) * 2.5
145 #region Basic Insert Tests 151 var (tableName, ktype) = SetupTestTable(ctx);
160 var records = GenerateTestRecords(500);
162 foreach (var record
in records)
164 inserter.Insert(record);
167 await inserter.CloseAsync();
169 Assert.Equal(500, inserter.CountInserted);
172 var response = ctx.
Kinetica.
executeSql($
"SELECT COUNT(*) AS cnt FROM {tableName}", 0, -9999);
173 Assert.Equal(1, response.total_number_of_records);
180 var (tableName, ktype) = SetupTestTable(ctx);
189 var records = GenerateTestRecords(5000);
192 inserter.InsertBatch(records);
194 await inserter.CloseAsync();
196 _output.WriteLine($
"Inserted: {inserter.CountInserted}, Updated: {inserter.CountUpdated}");
197 _output.WriteLine($
"Total batches sent: {inserter.TotalBatchesSent}");
199 Assert.Equal(5000, inserter.CountInserted);
206 var (tableName, ktype) = SetupTestTable(ctx);
211 MaxInFlightBatches = 10
216 var records = GenerateTestRecords(1000);
219 foreach (var record
in records)
221 await inserter.InsertAsync(record);
224 await inserter.CloseAsync();
226 Assert.Equal(1000, inserter.CountInserted);
232 using var ctx =
new TestContext(
"bulk_async_batch");
233 var (tableName, ktype) = SetupTestTable(ctx);
238 MaxInFlightBatches = 5
243 var records = GenerateTestRecords(2500);
245 await inserter.InsertBatchAsync(records);
247 await inserter.CloseAsync();
249 Assert.Equal(2500, inserter.CountInserted);
254 #region Shard Key Tests 260 var (tableName, ktype) = SetupTestTable(ctx, withShardKey:
true);
269 var records = GenerateShardedRecords(1000);
271 inserter.InsertBatch(records);
273 await inserter.CloseAsync();
275 Assert.Equal(1000, inserter.CountInserted);
278 _output.WriteLine($
"Workers: {inserter.NumWorkers}");
279 _output.WriteLine($
"Batches sent: {inserter.TotalBatchesSent}");
284 #region Batch Listener Tests 290 var (tableName, ktype) = SetupTestTable(ctx);
292 var listener =
new TestBatchListener();
297 BatchListener = listener
302 var records = GenerateTestRecords(350);
303 inserter.InsertBatch(records);
305 await inserter.CloseAsync();
307 _output.WriteLine($
"Batches completed: {listener.BatchesCompleted}");
308 _output.WriteLine($
"Total inserted via listener: {listener.TotalInserted}");
309 _output.WriteLine($
"Avg encode time: {listener.AverageEncodeTimeMs:F2}ms");
310 _output.WriteLine($
"Avg network time: {listener.AverageNetworkTimeMs:F2}ms");
312 Assert.True(listener.BatchesCompleted >= 3, $
"Expected at least 3 batches, got {listener.BatchesCompleted}");
313 Assert.Equal(350, listener.TotalInserted);
314 Assert.True(listener.AllSucceeded);
319 private int _batchesCompleted;
320 private long _totalInserted;
321 private double _totalEncodeTime;
322 private double _totalNetworkTime;
323 private bool _allSucceeded =
true;
325 public int BatchesCompleted => _batchesCompleted;
326 public long TotalInserted => _totalInserted;
327 public double AverageEncodeTimeMs => _batchesCompleted > 0 ? _totalEncodeTime / _batchesCompleted : 0;
328 public double AverageNetworkTimeMs => _batchesCompleted > 0 ? _totalNetworkTime / _batchesCompleted : 0;
329 public bool AllSucceeded => _allSucceeded;
333 Interlocked.Increment(ref _batchesCompleted);
344 _allSucceeded =
false;
350 #region Backpressure Tests 355 using var ctx =
new TestContext(
"bulk_backpressure");
356 var (tableName, ktype) = SetupTestTable(ctx);
361 MaxInFlightBatches = 5
366 var records = GenerateTestRecords(500);
369 var maxUtilization = 0.0;
371 foreach (var record
in records)
373 await inserter.InsertAsync(record);
375 var metrics = inserter.GetBackpressureMetrics();
376 if (metrics.UtilizationPercent > maxUtilization)
377 maxUtilization = metrics.UtilizationPercent;
380 await inserter.CloseAsync();
382 _output.WriteLine($
"Max backpressure utilization: {maxUtilization:F1}%");
383 _output.WriteLine($
"Final metrics: {inserter.GetBackpressureMetrics().InFlightBatches} in-flight");
385 Assert.Equal(500, inserter.CountInserted);
390 #region Concurrent Insert Tests 395 using var ctx =
new TestContext(
"bulk_concurrent");
396 var (tableName, ktype) = SetupTestTable(ctx);
401 MaxInFlightBatches = 20,
407 const int numThreads = 4;
408 const int recordsPerThread = 500;
410 var tasks =
new Task[numThreads];
412 for (
int t = 0; t < numThreads; t++)
415 tasks[t] = Task.Run(async () =>
417 var records = GenerateTestRecords(recordsPerThread, threadId * recordsPerThread);
418 foreach (var record
in records)
420 await inserter.InsertAsync(record);
425 await Task.WhenAll(tasks);
426 await inserter.CloseAsync();
428 _output.WriteLine($
"Total inserted: {inserter.CountInserted}");
429 _output.WriteLine($
"Total batches: {inserter.TotalBatchesSent}");
431 Assert.Equal(numThreads * recordsPerThread, inserter.CountInserted);
442 var (tableName, ktype) = SetupTestTable(ctx);
452 var records = GenerateTestRecords(250);
453 foreach (var record
in records)
455 inserter.Insert(record);
459 Assert.Equal(0, inserter.CountInserted);
462 await inserter.FlushAsync();
465 await inserter.CloseAsync();
467 Assert.Equal(250, inserter.CountInserted);
472 #region Error Handling Tests 478 var (tableName, ktype) = SetupTestTable(ctx);
484 MaxErrorQueueSize = 100
490 var records = GenerateTestRecords(200);
491 inserter.InsertBatch(records);
493 await inserter.CloseAsync();
495 var errors = inserter.DrainErrors();
496 _output.WriteLine($
"Errors: {errors.Count}");
499 Assert.Empty(errors);
500 Assert.Equal(200, inserter.CountInserted);
505 #region Metrics Tests 511 var (tableName, ktype) = SetupTestTable(ctx);
520 var records = GenerateTestRecords(500);
521 inserter.InsertBatch(records);
523 await inserter.CloseAsync();
525 _output.WriteLine($
"Count Inserted: {inserter.CountInserted}");
526 _output.WriteLine($
"Count Updated: {inserter.CountUpdated}");
527 _output.WriteLine($
"Total Batches Sent: {inserter.TotalBatchesSent}");
528 _output.WriteLine($
"Total Batches Failed: {inserter.TotalBatchesFailed}");
529 _output.WriteLine($
"Pending Batches: {inserter.PendingBatches}");
530 _output.WriteLine($
"Error Count: {inserter.ErrorCount}");
532 Assert.Equal(500, inserter.CountInserted);
533 Assert.Equal(0, inserter.CountUpdated);
534 Assert.True(inserter.TotalBatchesSent >= 5);
535 Assert.Equal(0, inserter.TotalBatchesFailed);
536 Assert.Equal(0, inserter.PendingBatches);
537 Assert.Equal(0, inserter.ErrorCount);
542 #region Large Batch Tests 548 var (tableName, ktype) = SetupTestTable(ctx);
553 MaxInFlightBatches = 10
558 const int totalRecords = 50000;
559 var records = GenerateTestRecords(totalRecords);
561 var sw = Stopwatch.StartNew();
562 inserter.InsertBatch(records);
563 await inserter.CloseAsync();
566 var recordsPerSecond = totalRecords / sw.Elapsed.TotalSeconds;
568 _output.WriteLine($
"Inserted {totalRecords} records in {sw.Elapsed.TotalSeconds:F2}s");
569 _output.WriteLine($
"Throughput: {recordsPerSecond:F0} records/second");
570 _output.WriteLine($
"Total batches: {inserter.TotalBatchesSent}");
572 Assert.Equal(totalRecords, inserter.CountInserted);
ShardKeyValues GetShardKeyValues()
Returns shard key column names and their typed values.
async Task TestBulkInserterWithShardKey()
async Task TestBulkInserterAsyncInsert()
async Task TestBulkInserterBackpressure()
async Task TestBulkInserterLargeBatch()
async Task TestBulkInserterMetrics()
bool Success
Whether the batch insertion succeeded.
CreateTableResponse createTable(CreateTableRequest request_)
Creates a new table with the given type (definition of columns).
async Task TestBulkInserterAsyncBatchInsert()
async Task TestBulkInserterBatchInsert()
async Task TestBulkInserterConcurrentInserts()
long CountInserted
Number of records successfully inserted.
ExecuteSqlResponse executeSql(ExecuteSqlRequest request_)
Execute a SQL statement (query, DML, or DDL).
High-performance bulk inserter for Kinetica with support for multi-head ingest,
async Task TestBulkInserterBasicInsert()
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
kinetica.Kinetica Kinetica
async Task TestBulkInserterErrorQueue()
double NetworkTimeMs
Time spent on network I/O in milliseconds.
Test context that manages schema and cleanup for integration tests.
Collection of shard key column names and values.
CreateTypeResponse createType(CreateTypeRequest request_)
Creates a new type describing the columns of a table.
Integration tests for the high-performance BulkInserter.
async Task TestBulkInserterManualFlush()
ShardKeyValues GetShardKeyValues()
Returns shard key column names and their typed values.
BulkInserterTests(ITestOutputHelper output)
double EncodeTimeMs
Time spent encoding records in milliseconds.
static ShardKeyValue String(string value)
Creates a string shard key value.
A typed value for shard key computation.
async Task TestBulkInserterWithListener()
string QualifiedTable(string tableName)
Get a qualified table name (schema.table).
Result of a batch insertion operation.
Configuration options for the BulkInserter<T>.
Listener interface for batch insertion events.