2 using System.Collections.Generic;
3 using System.Diagnostics;
4 using System.Threading.Tasks;
6 using Xunit.Abstractions;
18 [Trait(
"Category",
"Integration")]
21 private readonly ITestOutputHelper _output;
28 #region Helper Methods 32 string tableSuffix =
"generic_test",
33 bool withShardKey =
false)
41 createSql = $
@"CREATE TABLE {tableName} ( 43 partition_key VARCHAR(64) NOT NULL SHARD_KEY, 46 PRIMARY KEY (id, partition_key) 51 createSql = $
@"CREATE TABLE {tableName} ( 65 return (tableName, ktype, recordType);
68 private List<GenericRecord> GenerateGenericRecords(
kinetica.Records.Type recordType,
int count,
int startId = 0)
70 var records =
new List<GenericRecord>(count);
71 var baseTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
73 for (
int i = 0; i < count; i++)
75 var record = recordType.NewInstance();
76 record.Put(
"id", startId + i);
77 record.Put(
"name", $
"generic_{startId + i:D8}");
78 record.Put(
"value", (startId + i) * 1.5);
79 record.Put(
"timestamp", baseTimestamp + i);
86 private List<GenericRecord> GenerateShardedGenericRecords(
kinetica.Records.Type recordType,
int count,
int startId = 0)
88 var records =
new List<GenericRecord>(count);
90 for (
int i = 0; i < count; i++)
92 var record = recordType.NewInstance();
93 record.Put(
"id", startId + i);
94 record.Put(
"partition_key", $
"shard_{(startId + i) % 100:D3}");
95 record.Put(
"name", $
"sharded_{startId + i:D8}");
96 record.Put(
"value", (startId + i) * 2.5);
105 #region Basic GenericRecord Tests 111 var (tableName, ktype, recordType) = SetupTestTable(ctx);
113 _output.WriteLine($
"Table: {tableName}");
114 _output.WriteLine($
"Schema: {ktype.getSchemaString()}");
123 var records = GenerateGenericRecords(recordType, 500);
125 foreach (var record
in records)
127 inserter.Insert(record);
130 await inserter.CloseAsync();
132 _output.WriteLine($
"Inserted: {inserter.CountInserted}");
133 _output.WriteLine($
"Batches: {inserter.TotalBatchesSent}");
135 Assert.Equal(500, inserter.CountInserted);
138 var response = ctx.
Kinetica.
executeSql($
"SELECT COUNT(*) AS cnt FROM {tableName}", 0, -9999);
139 Assert.Equal(1, response.total_number_of_records);
146 var (tableName, ktype, recordType) = SetupTestTable(ctx);
155 var records = GenerateGenericRecords(recordType, 5000);
157 inserter.InsertBatch(records);
159 await inserter.CloseAsync();
161 _output.WriteLine($
"Inserted: {inserter.CountInserted}");
162 _output.WriteLine($
"Batches: {inserter.TotalBatchesSent}");
164 Assert.Equal(5000, inserter.CountInserted);
171 var (tableName, ktype, recordType) = SetupTestTable(ctx);
176 MaxInFlightBatches = 10
181 var records = GenerateGenericRecords(recordType, 1000);
183 foreach (var record
in records)
185 await inserter.InsertAsync(record);
188 await inserter.CloseAsync();
190 Assert.Equal(1000, inserter.CountInserted);
195 #region Shard Key Tests 203 #region Performance Tests 209 var (tableName, ktype, recordType) = SetupTestTable(ctx,
"large_test");
214 MaxInFlightBatches = 10
219 const int totalRecords = 50000;
220 var records = GenerateGenericRecords(recordType, totalRecords);
222 var sw = Stopwatch.StartNew();
223 inserter.InsertBatch(records);
224 await inserter.CloseAsync();
227 var recordsPerSecond = totalRecords / sw.Elapsed.TotalSeconds;
229 _output.WriteLine($
"Inserted {totalRecords} GenericRecords in {sw.Elapsed.TotalSeconds:F2}s");
230 _output.WriteLine($
"Throughput: {recordsPerSecond:F0} records/second");
231 _output.WriteLine($
"Total batches: {inserter.TotalBatchesSent}");
233 Assert.Equal(totalRecords, inserter.CountInserted);
238 #region Data Verification Tests 243 using var ctx =
new TestContext(
"generic_integrity");
244 var (tableName, ktype, recordType) = SetupTestTable(ctx,
"integrity_test");
254 for (
int i = 0; i < 10; i++)
256 var record = recordType.NewInstance();
258 record.Put(
"name", $
"test_name_{i}");
259 record.Put(
"value", i * 100.5);
260 record.Put(
"timestamp", 1000000L + i);
261 inserter.Insert(record);
264 await inserter.CloseAsync();
267 var response = ctx.Kinetica.executeSql(
268 $
"SELECT id, name, value, timestamp FROM {tableName} ORDER BY id",
271 Assert.Equal(10, response.total_number_of_records);
274 var idResponse = ctx.Kinetica.executeSql(
275 $
"SELECT name, value FROM {tableName} WHERE id = 5",
277 Assert.Equal(1, idResponse.total_number_of_records);
284 var (tableName, ktype, recordType) = SetupTestTable(ctx,
"null_test");
294 for (
int i = 0; i < 10; i++)
296 var record = recordType.NewInstance();
302 record.PutNull(
"name");
306 record.Put(
"name", $
"not_null_{i}");
309 record.Put(
"value", i * 10.0);
310 record.PutNull(
"timestamp");
312 inserter.Insert(record);
315 await inserter.CloseAsync();
317 _output.WriteLine($
"Inserted: {inserter.CountInserted}");
318 Assert.Equal(10, inserter.CountInserted);
321 var response = ctx.Kinetica.executeSql(
322 $
"SELECT COUNT(*) FROM {tableName} WHERE name IS NULL",
324 Assert.Equal(1, response.total_number_of_records);
329 #region Concurrent Insert Tests 334 using var ctx =
new TestContext(
"generic_concurrent");
335 var (tableName, ktype, recordType) = SetupTestTable(ctx,
"concurrent_test");
340 MaxInFlightBatches = 20,
346 const int numThreads = 4;
347 const int recordsPerThread = 500;
349 var tasks =
new Task[numThreads];
351 for (
int t = 0; t < numThreads; t++)
354 tasks[t] = Task.Run(async () =>
357 for (
int i = 0; i < recordsPerThread; i++)
359 var record = recordType.NewInstance();
360 record.Put(
"id", threadId * recordsPerThread + i);
361 record.Put(
"name", $
"thread_{threadId}_record_{i}");
362 record.Put(
"value", (
double)(threadId * 1000 + i));
363 record.Put(
"timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
365 await inserter.InsertAsync(record);
370 await Task.WhenAll(tasks);
371 await inserter.CloseAsync();
373 _output.WriteLine($
"Total inserted: {inserter.CountInserted}");
374 _output.WriteLine($
"Total batches: {inserter.TotalBatchesSent}");
376 Assert.Equal(numThreads * recordsPerThread, inserter.CountInserted);
381 #region Type Builder Tests 386 using var ctx =
new TestContext(
"generic_builder");
389 var recordType =
kinetica.Records.Type.Builder(
"product_record")
390 .AddIntColumn(
"id").PrimaryKey()
391 .AddStringColumn(
"product_name")
392 .AddDoubleColumn(
"price")
393 .AddIntColumn(
"quantity")
394 .AddTimestampColumn(
"created_at")
398 var tableName = ctx.QualifiedTable(
"product_table");
403 product_name VARCHAR(256), 406 created_at TIMESTAMP, 411 var actualRecordType =
kinetica.Records.Type.FromTable(ctx.Kinetica, tableName);
421 for (
int i = 0; i < 100; i++)
423 var record = actualRecordType.NewInstance();
425 record.Put(
"product_name", $
"Product_{i:D4}");
426 record.Put(
"price", 9.99 + i * 0.5);
427 record.Put(
"quantity", 10 + i);
428 record.Put(
"created_at", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
430 inserter.Insert(record);
433 await inserter.CloseAsync();
435 _output.WriteLine($
"Inserted: {inserter.CountInserted}");
436 Assert.Equal(100, inserter.CountInserted);
439 var response = ctx.Kinetica.executeSql(
440 $
"SELECT SUM(quantity) FROM {tableName}",
442 Assert.Equal(1, response.total_number_of_records);
Integration tests for BulkInserter with GenericRecord.
async Task TestGenericRecordAsyncInsert()
ExecuteSqlResponse executeSql(ExecuteSqlRequest request_)
Execute a SQL statement (query, DML, or DDL).
async Task TestGenericRecordNullValues()
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
kinetica.Kinetica Kinetica
Test context that manages schema and cleanup for integration tests.
async Task TestGenericRecordBatchInsert()
async Task TestGenericRecordLargeBatch()
async Task TestGenericRecordWithTypeBuilder()
GenericRecordBulkInserterTests(ITestOutputHelper output)
string QualifiedTable(string tableName)
Get a qualified table name (schema.table).
async Task TestGenericRecordConcurrentInserts()
async Task TestGenericRecordDataIntegrity()
Configuration options for the BulkInserter<T>.
API to talk to Kinetica Database
async Task TestGenericRecordBasicInsert()