Kinetica   C#   API  Version 7.2.3.1
GenericRecordBulkInserterTests.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 using System.Diagnostics;
4 using System.Threading.Tasks;
5 using Xunit;
6 using Xunit.Abstractions;
7 using Kinetica.Tests.Common;
8 using kinetica;
9 using kinetica.Records;
10 
12 {
18  [Trait("Category", "Integration")]
20  {
21  private readonly ITestOutputHelper _output;
22 
23  public GenericRecordBulkInserterTests(ITestOutputHelper output)
24  {
25  _output = output;
26  }
27 
28  #region Helper Methods
29 
30  private (string tableName, KineticaType ktype, kinetica.Records.Type recordType) SetupTestTable(
31  TestContext ctx,
32  string tableSuffix = "generic_test",
33  bool withShardKey = false)
34  {
35  var tableName = ctx.QualifiedTable(tableSuffix);
36 
37  string createSql;
38  if (withShardKey)
39  {
40  // Kinetica syntax: shard_key must be specified inline, not as separate clause
41  createSql = $@"CREATE TABLE {tableName} (
42  id INT NOT NULL,
43  partition_key VARCHAR(64) NOT NULL SHARD_KEY,
44  name VARCHAR(128),
45  value DOUBLE,
46  PRIMARY KEY (id, partition_key)
47  )";
48  }
49  else
50  {
51  createSql = $@"CREATE TABLE {tableName} (
52  id INT NOT NULL,
53  name VARCHAR(128),
54  value DOUBLE,
55  timestamp LONG,
56  PRIMARY KEY (id)
57  )";
58  }
59 
60  ctx.Kinetica.executeSql(createSql);
61 
62  var ktype = KineticaType.fromTable(ctx.Kinetica, tableName);
63  var recordType = kinetica.Records.Type.FromTable(ctx.Kinetica, tableName);
64 
65  return (tableName, ktype, recordType);
66  }
67 
68  private List<GenericRecord> GenerateGenericRecords(kinetica.Records.Type recordType, int count, int startId = 0)
69  {
70  var records = new List<GenericRecord>(count);
71  var baseTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
72 
73  for (int i = 0; i < count; i++)
74  {
75  var record = recordType.NewInstance();
76  record.Put("id", startId + i);
77  record.Put("name", $"generic_{startId + i:D8}");
78  record.Put("value", (startId + i) * 1.5);
79  record.Put("timestamp", baseTimestamp + i);
80  records.Add(record);
81  }
82 
83  return records;
84  }
85 
86  private List<GenericRecord> GenerateShardedGenericRecords(kinetica.Records.Type recordType, int count, int startId = 0)
87  {
88  var records = new List<GenericRecord>(count);
89 
90  for (int i = 0; i < count; i++)
91  {
92  var record = recordType.NewInstance();
93  record.Put("id", startId + i);
94  record.Put("partition_key", $"shard_{(startId + i) % 100:D3}");
95  record.Put("name", $"sharded_{startId + i:D8}");
96  record.Put("value", (startId + i) * 2.5);
97  records.Add(record);
98  }
99 
100  return records;
101  }
102 
103  #endregion
104 
105  #region Basic GenericRecord Tests
106 
107  [Fact]
108  public async Task TestGenericRecordBasicInsert()
109  {
110  using var ctx = new TestContext("generic_basic");
111  var (tableName, ktype, recordType) = SetupTestTable(ctx);
112 
113  _output.WriteLine($"Table: {tableName}");
114  _output.WriteLine($"Schema: {ktype.getSchemaString()}");
115 
116  var options = new BulkInserterOptions
117  {
118  BatchSize = 100
119  };
120 
121  await using var inserter = new BulkInserter<GenericRecord>(ctx.Kinetica, tableName, ktype, options);
122 
123  var records = GenerateGenericRecords(recordType, 500);
124 
125  foreach (var record in records)
126  {
127  inserter.Insert(record);
128  }
129 
130  await inserter.CloseAsync();
131 
132  _output.WriteLine($"Inserted: {inserter.CountInserted}");
133  _output.WriteLine($"Batches: {inserter.TotalBatchesSent}");
134 
135  Assert.Equal(500, inserter.CountInserted);
136 
137  // Verify in database
138  var response = ctx.Kinetica.executeSql($"SELECT COUNT(*) AS cnt FROM {tableName}", 0, -9999);
139  Assert.Equal(1, response.total_number_of_records);
140  }
141 
142  [Fact]
143  public async Task TestGenericRecordBatchInsert()
144  {
145  using var ctx = new TestContext("generic_batch");
146  var (tableName, ktype, recordType) = SetupTestTable(ctx);
147 
148  var options = new BulkInserterOptions
149  {
150  BatchSize = 1000
151  };
152 
153  await using var inserter = new BulkInserter<GenericRecord>(ctx.Kinetica, tableName, ktype, options);
154 
155  var records = GenerateGenericRecords(recordType, 5000);
156 
157  inserter.InsertBatch(records);
158 
159  await inserter.CloseAsync();
160 
161  _output.WriteLine($"Inserted: {inserter.CountInserted}");
162  _output.WriteLine($"Batches: {inserter.TotalBatchesSent}");
163 
164  Assert.Equal(5000, inserter.CountInserted);
165  }
166 
167  [Fact]
168  public async Task TestGenericRecordAsyncInsert()
169  {
170  using var ctx = new TestContext("generic_async");
171  var (tableName, ktype, recordType) = SetupTestTable(ctx);
172 
173  var options = new BulkInserterOptions
174  {
175  BatchSize = 100,
176  MaxInFlightBatches = 10
177  };
178 
179  await using var inserter = new BulkInserter<GenericRecord>(ctx.Kinetica, tableName, ktype, options);
180 
181  var records = GenerateGenericRecords(recordType, 1000);
182 
183  foreach (var record in records)
184  {
185  await inserter.InsertAsync(record);
186  }
187 
188  await inserter.CloseAsync();
189 
190  Assert.Equal(1000, inserter.CountInserted);
191  }
192 
193  #endregion
194 
195  #region Shard Key Tests
196 
197  // Note: Shard key routing with GenericRecord is already tested in the POCO BulkInserterTests.
198  // GenericRecord routing uses the same RecordKeyBuilder path once GetShardKeyValues() returns values.
199  // Since our Type.FromTable correctly populates shard key indices, routing will work correctly.
200 
201  #endregion
202 
203  #region Performance Tests
204 
205  [Fact]
206  public async Task TestGenericRecordLargeBatch()
207  {
208  using var ctx = new TestContext("generic_large");
209  var (tableName, ktype, recordType) = SetupTestTable(ctx, "large_test");
210 
211  var options = new BulkInserterOptions
212  {
213  BatchSize = 10000,
214  MaxInFlightBatches = 10
215  };
216 
217  await using var inserter = new BulkInserter<GenericRecord>(ctx.Kinetica, tableName, ktype, options);
218 
219  const int totalRecords = 50000;
220  var records = GenerateGenericRecords(recordType, totalRecords);
221 
222  var sw = Stopwatch.StartNew();
223  inserter.InsertBatch(records);
224  await inserter.CloseAsync();
225  sw.Stop();
226 
227  var recordsPerSecond = totalRecords / sw.Elapsed.TotalSeconds;
228 
229  _output.WriteLine($"Inserted {totalRecords} GenericRecords in {sw.Elapsed.TotalSeconds:F2}s");
230  _output.WriteLine($"Throughput: {recordsPerSecond:F0} records/second");
231  _output.WriteLine($"Total batches: {inserter.TotalBatchesSent}");
232 
233  Assert.Equal(totalRecords, inserter.CountInserted);
234  }
235 
236  #endregion
237 
238  #region Data Verification Tests
239 
240  [Fact]
241  public async Task TestGenericRecordDataIntegrity()
242  {
243  using var ctx = new TestContext("generic_integrity");
244  var (tableName, ktype, recordType) = SetupTestTable(ctx, "integrity_test");
245 
246  var options = new BulkInserterOptions
247  {
248  BatchSize = 10
249  };
250 
251  await using var inserter = new BulkInserter<GenericRecord>(ctx.Kinetica, tableName, ktype, options);
252 
253  // Insert specific test data
254  for (int i = 0; i < 10; i++)
255  {
256  var record = recordType.NewInstance();
257  record.Put("id", i);
258  record.Put("name", $"test_name_{i}");
259  record.Put("value", i * 100.5);
260  record.Put("timestamp", 1000000L + i);
261  inserter.Insert(record);
262  }
263 
264  await inserter.CloseAsync();
265 
266  // Verify data integrity by reading back
267  var response = ctx.Kinetica.executeSql(
268  $"SELECT id, name, value, timestamp FROM {tableName} ORDER BY id",
269  0, -9999);
270 
271  Assert.Equal(10, response.total_number_of_records);
272 
273  // Check specific values
274  var idResponse = ctx.Kinetica.executeSql(
275  $"SELECT name, value FROM {tableName} WHERE id = 5",
276  0, -9999);
277  Assert.Equal(1, idResponse.total_number_of_records);
278  }
279 
280  [Fact]
281  public async Task TestGenericRecordNullValues()
282  {
283  using var ctx = new TestContext("generic_null");
284  var (tableName, ktype, recordType) = SetupTestTable(ctx, "null_test");
285 
286  var options = new BulkInserterOptions
287  {
288  BatchSize = 10
289  };
290 
291  await using var inserter = new BulkInserter<GenericRecord>(ctx.Kinetica, tableName, ktype, options);
292 
293  // Insert records with null values
294  for (int i = 0; i < 10; i++)
295  {
296  var record = recordType.NewInstance();
297  record.Put("id", i);
298 
299  // Set some values to null
300  if (i % 2 == 0)
301  {
302  record.PutNull("name");
303  }
304  else
305  {
306  record.Put("name", $"not_null_{i}");
307  }
308 
309  record.Put("value", i * 10.0);
310  record.PutNull("timestamp");
311 
312  inserter.Insert(record);
313  }
314 
315  await inserter.CloseAsync();
316 
317  _output.WriteLine($"Inserted: {inserter.CountInserted}");
318  Assert.Equal(10, inserter.CountInserted);
319 
320  // Verify null values
321  var response = ctx.Kinetica.executeSql(
322  $"SELECT COUNT(*) FROM {tableName} WHERE name IS NULL",
323  0, -9999);
324  Assert.Equal(1, response.total_number_of_records);
325  }
326 
327  #endregion
328 
329  #region Concurrent Insert Tests
330 
331  [Fact]
333  {
334  using var ctx = new TestContext("generic_concurrent");
335  var (tableName, ktype, recordType) = SetupTestTable(ctx, "concurrent_test");
336 
337  var options = new BulkInserterOptions
338  {
339  BatchSize = 100,
340  MaxInFlightBatches = 20,
341  NumStripes = 8
342  };
343 
344  await using var inserter = new BulkInserter<GenericRecord>(ctx.Kinetica, tableName, ktype, options);
345 
346  const int numThreads = 4;
347  const int recordsPerThread = 500;
348 
349  var tasks = new Task[numThreads];
350 
351  for (int t = 0; t < numThreads; t++)
352  {
353  int threadId = t;
354  tasks[t] = Task.Run(async () =>
355  {
356  // Each thread generates its own records
357  for (int i = 0; i < recordsPerThread; i++)
358  {
359  var record = recordType.NewInstance();
360  record.Put("id", threadId * recordsPerThread + i);
361  record.Put("name", $"thread_{threadId}_record_{i}");
362  record.Put("value", (double)(threadId * 1000 + i));
363  record.Put("timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
364 
365  await inserter.InsertAsync(record);
366  }
367  });
368  }
369 
370  await Task.WhenAll(tasks);
371  await inserter.CloseAsync();
372 
373  _output.WriteLine($"Total inserted: {inserter.CountInserted}");
374  _output.WriteLine($"Total batches: {inserter.TotalBatchesSent}");
375 
376  Assert.Equal(numThreads * recordsPerThread, inserter.CountInserted);
377  }
378 
379  #endregion
380 
381  #region Type Builder Tests
382 
383  [Fact]
385  {
386  using var ctx = new TestContext("generic_builder");
387 
388  // Build a type using TypeBuilder (Rust-style API)
389  var recordType = kinetica.Records.Type.Builder("product_record")
390  .AddIntColumn("id").PrimaryKey()
391  .AddStringColumn("product_name")
392  .AddDoubleColumn("price")
393  .AddIntColumn("quantity")
394  .AddTimestampColumn("created_at")
395  .Build();
396 
397  // Create the table using the type
398  var tableName = ctx.QualifiedTable("product_table");
399 
400  // Create table via SQL based on type definition
401  ctx.Kinetica.executeSql($@"CREATE TABLE {tableName} (
402  id INT NOT NULL,
403  product_name VARCHAR(256),
404  price DOUBLE,
405  quantity INT,
406  created_at TIMESTAMP,
407  PRIMARY KEY (id)
408  )");
409 
410  var ktype = KineticaType.fromTable(ctx.Kinetica, tableName);
411  var actualRecordType = kinetica.Records.Type.FromTable(ctx.Kinetica, tableName);
412 
413  var options = new BulkInserterOptions
414  {
415  BatchSize = 50
416  };
417 
418  await using var inserter = new BulkInserter<GenericRecord>(ctx.Kinetica, tableName, ktype, options);
419 
420  // Insert product records
421  for (int i = 0; i < 100; i++)
422  {
423  var record = actualRecordType.NewInstance();
424  record.Put("id", i);
425  record.Put("product_name", $"Product_{i:D4}");
426  record.Put("price", 9.99 + i * 0.5);
427  record.Put("quantity", 10 + i);
428  record.Put("created_at", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
429 
430  inserter.Insert(record);
431  }
432 
433  await inserter.CloseAsync();
434 
435  _output.WriteLine($"Inserted: {inserter.CountInserted}");
436  Assert.Equal(100, inserter.CountInserted);
437 
438  // Verify
439  var response = ctx.Kinetica.executeSql(
440  $"SELECT SUM(quantity) FROM {tableName}",
441  0, -9999);
442  Assert.Equal(1, response.total_number_of_records);
443  }
444 
445  #endregion
446  }
447 }
Integration tests for BulkInserter with GenericRecord.
ExecuteSqlResponse executeSql(ExecuteSqlRequest request_)
Execute a SQL statement (query, DML, or DDL).
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
Test context that manages schema and cleanup for integration tests.
Definition: TestContext.cs:11
string QualifiedTable(string tableName)
Get a qualified table name (schema.table).
Definition: TestContext.cs:74
Configuration options for the BulkInserter<T>.
API to talk to Kinetica Database
Definition: Kinetica.cs:40