2 using System.Collections.Generic;
4 using System.Threading.Tasks;
6 using Xunit.Abstractions;
13 [Trait(
"Category",
"Integration")]
16 private readonly ITestOutputHelper _output;
25 public int id {
get;
set; }
26 public string name {
get;
set; } =
string.Empty;
27 public double value {
get;
set; }
42 var tableName = ctx.QualifiedTable(
"debug_table");
43 ctx.Kinetica.executeSql($
"CREATE TABLE {tableName} (id INT NOT NULL, name VARCHAR(64), value DOUBLE, timestamp LONG, PRIMARY KEY (id))");
46 _output.WriteLine($
"KineticaType schema: {ktype.getSchema()}");
49 const int recordCount = 50_000;
50 const int batchSize = 10_000;
54 BatchSize = batchSize,
56 MaxInFlightBatches = 20
60 var records =
new List<TestRecord>(recordCount);
61 var baseTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
62 for (
int i = 0; i < recordCount; i++)
69 timestamp = baseTimestamp + i
76 warmup.InsertBatch(records.Take(100).ToList());
77 await warmup.CloseAsync();
81 ctx.Kinetica.executeSql($
"DELETE FROM {tableName}");
84 var sw = System.Diagnostics.Stopwatch.StartNew();
89 inserter.InsertBatch(records);
91 await inserter.CloseAsync();
94 var elapsedMs = sw.Elapsed.TotalMilliseconds;
95 var throughput = recordCount / (elapsedMs / 1000.0);
97 _output.WriteLine($
"Records: {recordCount}");
98 _output.WriteLine($
"Batch size: {batchSize}");
99 _output.WriteLine($
"Elapsed: {elapsedMs:F2} ms");
100 _output.WriteLine($
"Throughput: {throughput:F0} records/sec");
103 var errors = inserter.DrainErrors();
104 _output.WriteLine($
"Total errors: {errors.Count}");
105 foreach (var error
in errors)
107 _output.WriteLine($
"Error: {error.Message}");
108 if (error.Exception !=
null)
110 _output.WriteLine($
"Exception: {error.Exception}");
114 _output.WriteLine($
"Count inserted: {inserter.CountInserted}");
115 _output.WriteLine($
"Count updated: {inserter.CountUpdated}");
116 _output.WriteLine($
"Batches sent: {inserter.TotalBatchesSent}");
117 _output.WriteLine($
"Batches failed: {inserter.TotalBatchesFailed}");
120 _output.WriteLine(
"\n--- Testing Legacy Ingestor ---");
129 for (
int i = 100; i < 115; i++)
136 timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
139 legacyIngestor.flush();
140 _output.WriteLine($
"Legacy inserted: {legacyIngestor.getCountInserted()}");
144 _output.WriteLine($
"Legacy error: {ex.Message}");
145 _output.WriteLine($
"Legacy exception: {ex}");
155 var tableName = ctx.QualifiedTable(
"http_compare_table");
156 ctx.Kinetica.executeSql($
"CREATE TABLE {tableName} (id INT NOT NULL, name VARCHAR(64), value DOUBLE, timestamp LONG, PRIMARY KEY (id))");
160 const int recordCount = 50_000;
161 const int batchSize = 10_000;
164 var records =
new List<TestRecord>(recordCount);
165 var baseTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
166 for (
int i = 0; i < recordCount; i++)
173 timestamp = baseTimestamp + i
177 _output.WriteLine(
"=== BulkInserter Performance Test ===");
178 _output.WriteLine($
"Records: {recordCount}, Batch size: {batchSize}");
179 _output.WriteLine(
"");
183 BatchSize = batchSize,
185 MaxInFlightBatches = 20
191 warmup.InsertBatch(records.Take(100).ToList());
192 await warmup.CloseAsync();
194 ctx.Kinetica.executeSql($
"DELETE FROM {tableName}");
196 var sw = System.Diagnostics.Stopwatch.StartNew();
199 inserter.InsertBatch(records);
200 await inserter.CloseAsync();
203 var elapsedMs = sw.Elapsed.TotalMilliseconds;
204 var throughput = recordCount / (elapsedMs / 1000.0);
205 _output.WriteLine($
"Elapsed: {elapsedMs:F2} ms");
206 _output.WriteLine($
"Throughput: {throughput:F0} records/sec");
ShardKeyValues GetShardKeyValues()
Returns shard key column names and their typed values.
async Task DebugBulkInserterErrors()
async Task CompareHttpImplementations()
High-performance bulk inserter for Kinetica with support for multi-head ingest,
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
BulkInserterDebugTest(ITestOutputHelper output)
Test context that manages schema and cleanup for integration tests.
Collection of shard key column names and values.
Configuration options for the BulkInserter<T>.
Manages the insertion into GPUdb of large numbers of records in bulk, with automatic batch management...