Kinetica   C#   API  Version 7.2.3.1
BulkInserterDebugTest.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Threading.Tasks;
5 using Xunit;
6 using Xunit.Abstractions;
7 using Kinetica.Tests.Common;
8 using kinetica;
9 using kinetica.Records;
10 
12 {
13  [Trait("Category", "Integration")]
14  public class BulkInserterDebugTest
15  {
16  private readonly ITestOutputHelper _output;
17 
18  public BulkInserterDebugTest(ITestOutputHelper output)
19  {
20  _output = output;
21  }
22 
24  {
25  public int id { get; set; }
26  public string name { get; set; } = string.Empty;
27  public double value { get; set; }
28  public long timestamp { get; set; }
29 
31  {
32  return new ShardKeyValues();
33  }
34  }
35 
36  [Fact]
37  public async Task DebugBulkInserterErrors()
38  {
39  using var ctx = new TestContext("bulk_debug");
40 
41  // Create table using SQL for simpler setup
42  var tableName = ctx.QualifiedTable("debug_table");
43  ctx.Kinetica.executeSql($"CREATE TABLE {tableName} (id INT NOT NULL, name VARCHAR(64), value DOUBLE, timestamp LONG, PRIMARY KEY (id))");
44 
45  var ktype = KineticaType.fromTable(ctx.Kinetica, tableName);
46  _output.WriteLine($"KineticaType schema: {ktype.getSchema()}");
47 
48  // Performance test: 50,000 records with batch size 10,000
49  const int recordCount = 50_000;
50  const int batchSize = 10_000;
51 
52  var options = new BulkInserterOptions
53  {
54  BatchSize = batchSize,
55  MaxRetries = 0,
56  MaxInFlightBatches = 20
57  };
58 
59  // Generate records first
60  var records = new List<TestRecord>(recordCount);
61  var baseTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
62  for (int i = 0; i < recordCount; i++)
63  {
64  records.Add(new TestRecord
65  {
66  id = i,
67  name = $"test_{i}",
68  value = i * 1.5,
69  timestamp = baseTimestamp + i
70  });
71  }
72 
73  // Warm up
74  await using (var warmup = new BulkInserter<TestRecord>(ctx.Kinetica, tableName, ktype, options))
75  {
76  warmup.InsertBatch(records.Take(100).ToList());
77  await warmup.CloseAsync();
78  }
79 
80  // Clear the table
81  ctx.Kinetica.executeSql($"DELETE FROM {tableName}");
82 
83  // Time the insertion
84  var sw = System.Diagnostics.Stopwatch.StartNew();
85 
86  await using var inserter = new BulkInserter<TestRecord>(ctx.Kinetica, tableName, ktype, options);
87 
88  // Use batch insert for maximum throughput
89  inserter.InsertBatch(records);
90 
91  await inserter.CloseAsync();
92  sw.Stop();
93 
94  var elapsedMs = sw.Elapsed.TotalMilliseconds;
95  var throughput = recordCount / (elapsedMs / 1000.0);
96 
97  _output.WriteLine($"Records: {recordCount}");
98  _output.WriteLine($"Batch size: {batchSize}");
99  _output.WriteLine($"Elapsed: {elapsedMs:F2} ms");
100  _output.WriteLine($"Throughput: {throughput:F0} records/sec");
101 
102  // Check errors
103  var errors = inserter.DrainErrors();
104  _output.WriteLine($"Total errors: {errors.Count}");
105  foreach (var error in errors)
106  {
107  _output.WriteLine($"Error: {error.Message}");
108  if (error.Exception != null)
109  {
110  _output.WriteLine($"Exception: {error.Exception}");
111  }
112  }
113 
114  _output.WriteLine($"Count inserted: {inserter.CountInserted}");
115  _output.WriteLine($"Count updated: {inserter.CountUpdated}");
116  _output.WriteLine($"Batches sent: {inserter.TotalBatchesSent}");
117  _output.WriteLine($"Batches failed: {inserter.TotalBatchesFailed}");
118 
119  // Also try the legacy ingestor to compare
120  _output.WriteLine("\n--- Testing Legacy Ingestor ---");
121  var legacyIngestor = new KineticaIngestor<TestRecord>(
122  ctx.Kinetica,
123  tableName,
124  10,
125  ktype);
126 
127  try
128  {
129  for (int i = 100; i < 115; i++)
130  {
131  legacyIngestor.insert(new TestRecord
132  {
133  id = i,
134  name = $"test_{i}",
135  value = i * 1.5,
136  timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
137  });
138  }
139  legacyIngestor.flush();
140  _output.WriteLine($"Legacy inserted: {legacyIngestor.getCountInserted()}");
141  }
142  catch (Exception ex)
143  {
144  _output.WriteLine($"Legacy error: {ex.Message}");
145  _output.WriteLine($"Legacy exception: {ex}");
146  }
147  }
148 
149  [Fact]
150  public async Task CompareHttpImplementations()
151  {
152  using var ctx = new TestContext("http_compare");
153 
154  // Create table using SQL for simpler setup
155  var tableName = ctx.QualifiedTable("http_compare_table");
156  ctx.Kinetica.executeSql($"CREATE TABLE {tableName} (id INT NOT NULL, name VARCHAR(64), value DOUBLE, timestamp LONG, PRIMARY KEY (id))");
157 
158  var ktype = KineticaType.fromTable(ctx.Kinetica, tableName);
159 
160  const int recordCount = 50_000;
161  const int batchSize = 10_000;
162 
163  // Generate records
164  var records = new List<TestRecord>(recordCount);
165  var baseTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
166  for (int i = 0; i < recordCount; i++)
167  {
168  records.Add(new TestRecord
169  {
170  id = i,
171  name = $"test_{i}",
172  value = i * 1.5,
173  timestamp = baseTimestamp + i
174  });
175  }
176 
177  _output.WriteLine("=== BulkInserter Performance Test ===");
178  _output.WriteLine($"Records: {recordCount}, Batch size: {batchSize}");
179  _output.WriteLine("");
180 
181  var options = new BulkInserterOptions
182  {
183  BatchSize = batchSize,
184  MaxRetries = 0,
185  MaxInFlightBatches = 20
186  };
187 
188  // Warmup
189  await using (var warmup = new BulkInserter<TestRecord>(ctx.Kinetica, tableName, ktype, options))
190  {
191  warmup.InsertBatch(records.Take(100).ToList());
192  await warmup.CloseAsync();
193  }
194  ctx.Kinetica.executeSql($"DELETE FROM {tableName}");
195 
196  var sw = System.Diagnostics.Stopwatch.StartNew();
197  await using (var inserter = new BulkInserter<TestRecord>(ctx.Kinetica, tableName, ktype, options))
198  {
199  inserter.InsertBatch(records);
200  await inserter.CloseAsync();
201  }
202  sw.Stop();
203  var elapsedMs = sw.Elapsed.TotalMilliseconds;
204  var throughput = recordCount / (elapsedMs / 1000.0);
205  _output.WriteLine($"Elapsed: {elapsedMs:F2} ms");
206  _output.WriteLine($"Throughput: {throughput:F0} records/sec");
207  }
208  }
209 }
ShardKeyValues GetShardKeyValues()
Returns shard key column names and their typed values.
High-performance bulk inserter for Kinetica with support for multi-head ingest,
Definition: BulkInserter.cs:28
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
Test context that manages schema and cleanup for integration tests.
Definition: TestContext.cs:11
Collection of shard key column names and values.
Interface for extracting shard key values from a record.
Configuration options for the BulkInserter<T>.
Manages the insertion into GPUdb of large numbers of records in bulk, with automatic batch management...