Kinetica   C#   API  Version 7.2.3.1
AdoBatchInsertExample.cs
Go to the documentation of this file.
1 using System;
2 using System.Diagnostics;
3 using System.Threading.Tasks;
4 using KineticaAdo;
5 
6 namespace Example
7 {
16  public static class AdoBatchInsertExample
17  {
18  private const string TestSchema = "ado_batch_example";
19 
20  public static async Task RunAsync(string serverUrl, string username, string password)
21  {
22  Console.WriteLine("==========================================");
23  Console.WriteLine("= ADO.NET Batch Insert Example - Running =");
24  Console.WriteLine("==========================================");
25  Console.WriteLine();
26 
27  var connectionString = $"Server={serverUrl};Username={username};Password={password}";
28 
29  try
30  {
31  // Setup: Create schema and tables
32  await SetupAsync(connectionString);
33 
34  // Example 1: Basic batch insert with connection properties
35  await BasicBatchInsertExample(connectionString);
36 
37  // Example 2: Batch insert with connection string configuration
38  await ConnectionStringBatchInsertExample(serverUrl, username, password);
39 
40  // Example 3: Performance comparison
41  await PerformanceComparisonExample(connectionString);
42 
43  // Example 4: Explicit flush with FlushBatchAsync
44  await ExplicitFlushExample(connectionString);
45 
46  // Example 5: Batch insert with various data types
47  await VariousDataTypesExample(connectionString);
48 
49  // Example 6: Auto-flush on batch size
50  await AutoFlushExample(connectionString);
51 
52  // Cleanup
53  await CleanupAsync(connectionString);
54 
55  Console.WriteLine();
56  Console.WriteLine("==========================================");
57  Console.WriteLine("= ADO.NET Batch Insert Example - Done =");
58  Console.WriteLine("==========================================");
59  }
60  catch (Exception ex)
61  {
62  Console.WriteLine($"Error: {ex.Message}");
63  Console.WriteLine(ex.StackTrace);
64  }
65  }
66 
67  private static async Task SetupAsync(string connectionString)
68  {
69  Console.WriteLine("Setting up test schema and tables...");
70 
71  using var connection = new KineticaConnection(connectionString);
72  await connection.OpenAsync();
73 
74  // Drop and recreate schema to ensure clean state
75  using (var cmd = new KineticaCommand($"DROP SCHEMA IF EXISTS {TestSchema} CASCADE", connection))
76  {
77  cmd.ExecuteNonQuery();
78  }
79  using (var cmd = new KineticaCommand($"CREATE SCHEMA {TestSchema}", connection))
80  {
81  cmd.ExecuteNonQuery();
82  }
83 
84  // Create test tables (schema is fresh, no need for IF NOT EXISTS)
85  var tables = new[]
86  {
87  $@"CREATE TABLE {TestSchema}.batch_basic (
88  id INT NOT NULL,
89  name VARCHAR(64),
90  value DOUBLE,
91  PRIMARY KEY (id)
92  )",
93  $@"CREATE TABLE {TestSchema}.batch_perf (
94  id INT NOT NULL,
95  data VARCHAR(128),
96  PRIMARY KEY (id)
97  )",
98  $@"CREATE TABLE {TestSchema}.batch_txn (
99  id INT NOT NULL,
100  status VARCHAR(32),
101  amount DOUBLE,
102  PRIMARY KEY (id)
103  )",
104  $@"CREATE TABLE {TestSchema}.batch_types (
105  id INT NOT NULL,
106  int_col INT,
107  long_col LONG,
108  float_col FLOAT,
109  double_col DOUBLE,
110  string_col VARCHAR(256),
111  bool_col TINYINT,
112  PRIMARY KEY (id)
113  )",
114  $@"CREATE TABLE {TestSchema}.batch_autoflush (
115  id INT NOT NULL,
116  data VARCHAR(64),
117  PRIMARY KEY (id)
118  )"
119  };
120 
121  foreach (var tableSql in tables)
122  {
123  using var cmd = new KineticaCommand(tableSql, connection);
124  cmd.ExecuteNonQuery();
125  }
126 
127  Console.WriteLine("Setup complete.\n");
128  }
129 
133  private static async Task BasicBatchInsertExample(string connectionString)
134  {
135  Console.WriteLine("Example 1: Basic Batch Insert");
136  Console.WriteLine("-----------------------------");
137 
138  using var connection = new KineticaConnection(connectionString);
139 
140  // Enable batch mode via properties
141  connection.BatchInsertMode = true;
142  connection.BatchSize = 1000;
143 
144  await connection.OpenAsync();
145 
146  Console.WriteLine($" Batch mode enabled: {connection.BatchInsertMode}");
147  Console.WriteLine($" Batch size: {connection.BatchSize}");
148 
149  var tableName = $"{TestSchema}.batch_basic";
150 
151  // Clear existing data
152  using (var clearCmd = new KineticaCommand($"DELETE FROM {tableName}", connection))
153  {
154  clearCmd.ExecuteNonQuery();
155  }
156 
157  // Insert 500 records
158  var sw = Stopwatch.StartNew();
159  for (int i = 0; i < 500; i++)
160  {
161  using var cmd = new KineticaCommand(
162  $"INSERT INTO {tableName} (id, name, value) VALUES ({i}, 'Item_{i}', {i * 1.5})",
163  connection);
164  await cmd.ExecuteNonQueryAsync();
165  }
166 
167  Console.WriteLine($" Records queued: 500");
168  Console.WriteLine($" Pending count: {connection.PendingBatchCount}");
169 
170  // Flush the batch
171  var flushed = await connection.FlushBatchAsync();
172  sw.Stop();
173 
174  Console.WriteLine($" Records flushed: {flushed}");
175  Console.WriteLine($" Time: {sw.ElapsedMilliseconds}ms");
176 
177  // Verify
178  using (var verifyCmd = new KineticaCommand($"SELECT COUNT(*) FROM {tableName}", connection))
179  {
180  using var reader = verifyCmd.ExecuteReader();
181  if (reader.Read())
182  {
183  Console.WriteLine($" Verified records: {reader.GetValue(0)}");
184  }
185  }
186 
187  Console.WriteLine();
188  }
189 
193  private static async Task ConnectionStringBatchInsertExample(string serverUrl, string username, string password)
194  {
195  Console.WriteLine("Example 2: Connection String Configuration");
196  Console.WriteLine("------------------------------------------");
197 
198  // Configure batch settings in connection string
199  var batchConnectionString = $"Server={serverUrl};Username={username};Password={password};" +
200  "Batch Insert Mode=true;Batch Size=500;Batch Update On Existing Pk=false";
201 
202  using var connection = new KineticaConnection(batchConnectionString);
203  await connection.OpenAsync();
204 
205  Console.WriteLine($" Connection string batch mode: {connection.BatchInsertMode}");
206  Console.WriteLine($" Connection string batch size: {connection.BatchSize}");
207 
208  var tableName = $"{TestSchema}.batch_basic";
209 
210  // Insert some records using the connection string configuration
211  for (int i = 1000; i < 1100; i++)
212  {
213  using var cmd = new KineticaCommand(
214  $"INSERT INTO {tableName} (id, name, value) VALUES ({i}, 'ConnStr_{i}', {i * 2.0})",
215  connection);
216  await cmd.ExecuteNonQueryAsync();
217  }
218 
219  await connection.FlushBatchAsync();
220  Console.WriteLine(" Inserted 100 records via connection string batch mode\n");
221  }
222 
226  private static async Task PerformanceComparisonExample(string connectionString)
227  {
228  Console.WriteLine("Example 3: Performance Comparison");
229  Console.WriteLine("---------------------------------");
230 
231  const int recordCount = 1000;
232  var tableName = $"{TestSchema}.batch_perf";
233 
234  // Regular inserts (no batch mode)
235  using (var connection = new KineticaConnection(connectionString))
236  {
237  await connection.OpenAsync();
238 
239  // Clear table
240  using (var clearCmd = new KineticaCommand($"DELETE FROM {tableName}", connection))
241  {
242  clearCmd.ExecuteNonQuery();
243  }
244 
245  var sw = Stopwatch.StartNew();
246  for (int i = 0; i < recordCount; i++)
247  {
248  using var cmd = new KineticaCommand(
249  $"INSERT INTO {tableName} (id, data) VALUES ({i}, 'Regular_Insert_{i}')",
250  connection);
251  await cmd.ExecuteNonQueryAsync();
252  }
253  sw.Stop();
254 
255  Console.WriteLine($" Regular inserts ({recordCount} records): {sw.ElapsedMilliseconds}ms");
256  Console.WriteLine($" Rate: {recordCount * 1000.0 / sw.ElapsedMilliseconds:F0} records/sec");
257  }
258 
259  // Batch inserts
260  using (var connection = new KineticaConnection(connectionString))
261  {
262  connection.BatchInsertMode = true;
263  connection.BatchSize = 5000;
264  await connection.OpenAsync();
265 
266  // Clear table
267  using (var clearCmd = new KineticaCommand($"DELETE FROM {tableName}", connection))
268  {
269  clearCmd.ExecuteNonQuery();
270  }
271 
272  var sw = Stopwatch.StartNew();
273  for (int i = 0; i < recordCount; i++)
274  {
275  using var cmd = new KineticaCommand(
276  $"INSERT INTO {tableName} (id, data) VALUES ({i}, 'Batch_Insert_{i}')",
277  connection);
278  await cmd.ExecuteNonQueryAsync();
279  }
280  await connection.FlushBatchAsync();
281  sw.Stop();
282 
283  Console.WriteLine($" Batch inserts ({recordCount} records): {sw.ElapsedMilliseconds}ms");
284  Console.WriteLine($" Rate: {recordCount * 1000.0 / sw.ElapsedMilliseconds:F0} records/sec");
285  }
286 
287  Console.WriteLine();
288  }
289 
294  private static async Task ExplicitFlushExample(string connectionString)
295  {
296  Console.WriteLine("Example 4: Explicit Flush Control");
297  Console.WriteLine("----------------------------------");
298 
299  var tableName = $"{TestSchema}.batch_txn";
300 
301  using var connection = new KineticaConnection(connectionString);
302  connection.BatchInsertMode = true;
303  connection.BatchSize = 1000; // Large batch size so auto-flush won't trigger
304  await connection.OpenAsync();
305 
306  // Clear table
307  using (var clearCmd = new KineticaCommand($"DELETE FROM {tableName}", connection))
308  {
309  clearCmd.ExecuteNonQuery();
310  }
311 
312  // Insert records - they will be buffered
313  for (int i = 0; i < 50; i++)
314  {
315  using var cmd = new KineticaCommand(
316  $"INSERT INTO {tableName} (id, status, amount) VALUES ({i}, 'Flushed', {i * 10.0})",
317  connection);
318  await cmd.ExecuteNonQueryAsync();
319  }
320 
321  Console.WriteLine($" Before flush - Pending: {connection.PendingBatchCount}");
322 
323  // Explicitly flush the batch to send all pending records to the database
324  var flushedCount = await connection.FlushBatchAsync();
325 
326  Console.WriteLine($" After flush - Pending: {connection.PendingBatchCount}");
327  Console.WriteLine($" Records flushed: {flushedCount}");
328 
329  // Verify
330  using (var verifyCmd = new KineticaCommand($"SELECT COUNT(*) FROM {tableName}", connection))
331  {
332  using var reader = verifyCmd.ExecuteReader();
333  if (reader.Read())
334  {
335  Console.WriteLine($" Records in table: {reader.GetValue(0)}");
336  }
337  }
338 
339  Console.WriteLine();
340  }
341 
345  private static async Task VariousDataTypesExample(string connectionString)
346  {
347  Console.WriteLine("Example 5: Various Data Types");
348  Console.WriteLine("-----------------------------");
349 
350  var tableName = $"{TestSchema}.batch_types";
351 
352  using var connection = new KineticaConnection(connectionString);
353  connection.BatchInsertMode = true;
354  connection.BatchSize = 1000;
355  await connection.OpenAsync();
356 
357  // Clear table
358  using (var clearCmd = new KineticaCommand($"DELETE FROM {tableName}", connection))
359  {
360  clearCmd.ExecuteNonQuery();
361  }
362 
363  // Insert records with various data types
364  for (int i = 0; i < 100; i++)
365  {
366  var boolVal = i % 2 == 0 ? 1 : 0;
367  var sql = $@"INSERT INTO {tableName}
368  (id, int_col, long_col, float_col, double_col, string_col, bool_col)
369  VALUES ({i}, {i * 10}, {i * 100000L}, {i * 1.5f}, {i * 2.5}, 'String_{i}', {boolVal})";
370 
371  using var cmd = new KineticaCommand(sql, connection);
372  await cmd.ExecuteNonQueryAsync();
373  }
374 
375  // Test NULL values
376  for (int i = 100; i < 110; i++)
377  {
378  var sql = $@"INSERT INTO {tableName}
379  (id, int_col, long_col, float_col, double_col, string_col, bool_col)
380  VALUES ({i}, NULL, NULL, NULL, NULL, NULL, NULL)";
381 
382  using var cmd = new KineticaCommand(sql, connection);
383  await cmd.ExecuteNonQueryAsync();
384  }
385 
386  var flushed = await connection.FlushBatchAsync();
387  Console.WriteLine($" Inserted {flushed} records with various data types");
388 
389  // Verify
390  using (var verifyCmd = new KineticaCommand(
391  $"SELECT id, int_col, string_col, bool_col FROM {tableName} WHERE id < 5 ORDER BY id", connection))
392  {
393  using var reader = verifyCmd.ExecuteReader();
394  Console.WriteLine(" Sample data:");
395  while (reader.Read())
396  {
397  Console.WriteLine($" id={reader["id"]}, int_col={reader["int_col"]}, " +
398  $"string_col={reader["string_col"]}, bool_col={reader["bool_col"]}");
399  }
400  }
401 
402  Console.WriteLine();
403  }
404 
408  private static async Task AutoFlushExample(string connectionString)
409  {
410  Console.WriteLine("Example 6: Auto-Flush on Batch Size");
411  Console.WriteLine("-----------------------------------");
412 
413  var tableName = $"{TestSchema}.batch_autoflush";
414 
415  using var connection = new KineticaConnection(connectionString);
416  connection.BatchInsertMode = true;
417  connection.BatchSize = 50; // Small batch size to trigger auto-flush
418  await connection.OpenAsync();
419 
420  // Clear table
421  using (var clearCmd = new KineticaCommand($"DELETE FROM {tableName}", connection))
422  {
423  clearCmd.ExecuteNonQuery();
424  }
425 
426  Console.WriteLine($" Batch size set to: {connection.BatchSize}");
427 
428  // Insert 120 records (should trigger 2 auto-flushes)
429  for (int i = 0; i < 120; i++)
430  {
431  using var cmd = new KineticaCommand(
432  $"INSERT INTO {tableName} (id, data) VALUES ({i}, 'AutoFlush_{i}')",
433  connection);
434  await cmd.ExecuteNonQueryAsync();
435 
436  // Log when auto-flush happens
437  if ((i + 1) % 50 == 0)
438  {
439  Console.WriteLine($" After {i + 1} inserts - Pending: {connection.PendingBatchCount}");
440  }
441  }
442 
443  Console.WriteLine($" Final pending count: {connection.PendingBatchCount}");
444 
445  // Flush remaining
446  var remaining = await connection.FlushBatchAsync();
447  Console.WriteLine($" Flushed remaining: {remaining}");
448 
449  // Verify total
450  using (var verifyCmd = new KineticaCommand($"SELECT COUNT(*) FROM {tableName}", connection))
451  {
452  using var reader = verifyCmd.ExecuteReader();
453  if (reader.Read())
454  {
455  Console.WriteLine($" Total records in table: {reader.GetValue(0)}");
456  }
457  }
458 
459  Console.WriteLine();
460  }
461 
462  private static async Task CleanupAsync(string connectionString)
463  {
464  Console.WriteLine("Cleaning up test schema...");
465 
466  using var connection = new KineticaConnection(connectionString);
467  await connection.OpenAsync();
468 
469  using var cmd = new KineticaCommand($"DROP SCHEMA IF EXISTS {TestSchema} CASCADE", connection);
470  cmd.ExecuteNonQuery();
471 
472  Console.WriteLine("Cleanup complete.");
473  }
474  }
475 }
static async Task RunAsync(string serverUrl, string username, string password)
Demonstrates the high-performance batch insert feature of the Kinetica ADO.NET driver.