2 using System.Diagnostics;
3 using System.Threading.Tasks;
18 private const string TestSchema =
"ado_batch_example";
20 public static async Task
RunAsync(
string serverUrl,
string username,
string password)
22 Console.WriteLine(
"==========================================");
23 Console.WriteLine(
"= ADO.NET Batch Insert Example - Running =");
24 Console.WriteLine(
"==========================================");
27 var connectionString = $
"Server={serverUrl};Username={username};Password={password}";
32 await SetupAsync(connectionString);
35 await BasicBatchInsertExample(connectionString);
38 await ConnectionStringBatchInsertExample(serverUrl, username, password);
41 await PerformanceComparisonExample(connectionString);
44 await ExplicitFlushExample(connectionString);
47 await VariousDataTypesExample(connectionString);
50 await AutoFlushExample(connectionString);
53 await CleanupAsync(connectionString);
56 Console.WriteLine(
"==========================================");
57 Console.WriteLine(
"= ADO.NET Batch Insert Example - Done =");
58 Console.WriteLine(
"==========================================");
62 Console.WriteLine($
"Error: {ex.Message}");
63 Console.WriteLine(ex.StackTrace);
67 private static async Task SetupAsync(
string connectionString)
69 Console.WriteLine(
"Setting up test schema and tables...");
72 await connection.OpenAsync();
75 using (var cmd =
new KineticaCommand($
"DROP SCHEMA IF EXISTS {TestSchema} CASCADE", connection))
77 cmd.ExecuteNonQuery();
79 using (var cmd =
new KineticaCommand($
"CREATE SCHEMA {TestSchema}", connection))
81 cmd.ExecuteNonQuery();
87 $
@"CREATE TABLE {TestSchema}.batch_basic ( 93 $
@"CREATE TABLE {TestSchema}.batch_perf ( 98 $
@"CREATE TABLE {TestSchema}.batch_txn ( 104 $
@"CREATE TABLE {TestSchema}.batch_types ( 110 string_col VARCHAR(256), 114 $
@"CREATE TABLE {TestSchema}.batch_autoflush ( 121 foreach (var tableSql
in tables)
124 cmd.ExecuteNonQuery();
127 Console.WriteLine(
"Setup complete.\n");
133 private static async Task BasicBatchInsertExample(
string connectionString)
135 Console.WriteLine(
"Example 1: Basic Batch Insert");
136 Console.WriteLine(
"-----------------------------");
141 connection.BatchInsertMode =
true;
142 connection.BatchSize = 1000;
144 await connection.OpenAsync();
146 Console.WriteLine($
" Batch mode enabled: {connection.BatchInsertMode}");
147 Console.WriteLine($
" Batch size: {connection.BatchSize}");
149 var tableName = $
"{TestSchema}.batch_basic";
152 using (var clearCmd =
new KineticaCommand($
"DELETE FROM {tableName}", connection))
154 clearCmd.ExecuteNonQuery();
158 var sw = Stopwatch.StartNew();
159 for (
int i = 0; i < 500; i++)
162 $
"INSERT INTO {tableName} (id, name, value) VALUES ({i}, 'Item_{i}', {i * 1.5})",
164 await cmd.ExecuteNonQueryAsync();
167 Console.WriteLine($
" Records queued: 500");
168 Console.WriteLine($
" Pending count: {connection.PendingBatchCount}");
171 var flushed = await connection.FlushBatchAsync();
174 Console.WriteLine($
" Records flushed: {flushed}");
175 Console.WriteLine($
" Time: {sw.ElapsedMilliseconds}ms");
178 using (var verifyCmd =
new KineticaCommand($
"SELECT COUNT(*) FROM {tableName}", connection))
180 using var reader = verifyCmd.ExecuteReader();
183 Console.WriteLine($
" Verified records: {reader.GetValue(0)}");
193 private static async Task ConnectionStringBatchInsertExample(
string serverUrl,
string username,
string password)
195 Console.WriteLine(
"Example 2: Connection String Configuration");
196 Console.WriteLine(
"------------------------------------------");
199 var batchConnectionString = $
"Server={serverUrl};Username={username};Password={password};" +
200 "Batch Insert Mode=true;Batch Size=500;Batch Update On Existing Pk=false";
203 await connection.OpenAsync();
205 Console.WriteLine($
" Connection string batch mode: {connection.BatchInsertMode}");
206 Console.WriteLine($
" Connection string batch size: {connection.BatchSize}");
208 var tableName = $
"{TestSchema}.batch_basic";
211 for (
int i = 1000; i < 1100; i++)
214 $
"INSERT INTO {tableName} (id, name, value) VALUES ({i}, 'ConnStr_{i}', {i * 2.0})",
216 await cmd.ExecuteNonQueryAsync();
219 await connection.FlushBatchAsync();
220 Console.WriteLine(
" Inserted 100 records via connection string batch mode\n");
226 private static async Task PerformanceComparisonExample(
string connectionString)
228 Console.WriteLine(
"Example 3: Performance Comparison");
229 Console.WriteLine(
"---------------------------------");
231 const int recordCount = 1000;
232 var tableName = $
"{TestSchema}.batch_perf";
237 await connection.OpenAsync();
240 using (var clearCmd =
new KineticaCommand($
"DELETE FROM {tableName}", connection))
242 clearCmd.ExecuteNonQuery();
245 var sw = Stopwatch.StartNew();
246 for (
int i = 0; i < recordCount; i++)
249 $
"INSERT INTO {tableName} (id, data) VALUES ({i}, 'Regular_Insert_{i}')",
251 await cmd.ExecuteNonQueryAsync();
255 Console.WriteLine($
" Regular inserts ({recordCount} records): {sw.ElapsedMilliseconds}ms");
256 Console.WriteLine($
" Rate: {recordCount * 1000.0 / sw.ElapsedMilliseconds:F0} records/sec");
262 connection.BatchInsertMode =
true;
263 connection.BatchSize = 5000;
264 await connection.OpenAsync();
267 using (var clearCmd =
new KineticaCommand($
"DELETE FROM {tableName}", connection))
269 clearCmd.ExecuteNonQuery();
272 var sw = Stopwatch.StartNew();
273 for (
int i = 0; i < recordCount; i++)
276 $
"INSERT INTO {tableName} (id, data) VALUES ({i}, 'Batch_Insert_{i}')",
278 await cmd.ExecuteNonQueryAsync();
280 await connection.FlushBatchAsync();
283 Console.WriteLine($
" Batch inserts ({recordCount} records): {sw.ElapsedMilliseconds}ms");
284 Console.WriteLine($
" Rate: {recordCount * 1000.0 / sw.ElapsedMilliseconds:F0} records/sec");
294 private static async Task ExplicitFlushExample(
string connectionString)
296 Console.WriteLine(
"Example 4: Explicit Flush Control");
297 Console.WriteLine(
"----------------------------------");
299 var tableName = $
"{TestSchema}.batch_txn";
302 connection.BatchInsertMode =
true;
303 connection.BatchSize = 1000;
304 await connection.OpenAsync();
307 using (var clearCmd =
new KineticaCommand($
"DELETE FROM {tableName}", connection))
309 clearCmd.ExecuteNonQuery();
313 for (
int i = 0; i < 50; i++)
316 $
"INSERT INTO {tableName} (id, status, amount) VALUES ({i}, 'Flushed', {i * 10.0})",
318 await cmd.ExecuteNonQueryAsync();
321 Console.WriteLine($
" Before flush - Pending: {connection.PendingBatchCount}");
324 var flushedCount = await connection.FlushBatchAsync();
326 Console.WriteLine($
" After flush - Pending: {connection.PendingBatchCount}");
327 Console.WriteLine($
" Records flushed: {flushedCount}");
330 using (var verifyCmd =
new KineticaCommand($
"SELECT COUNT(*) FROM {tableName}", connection))
332 using var reader = verifyCmd.ExecuteReader();
335 Console.WriteLine($
" Records in table: {reader.GetValue(0)}");
345 private static async Task VariousDataTypesExample(
string connectionString)
347 Console.WriteLine(
"Example 5: Various Data Types");
348 Console.WriteLine(
"-----------------------------");
350 var tableName = $
"{TestSchema}.batch_types";
353 connection.BatchInsertMode =
true;
354 connection.BatchSize = 1000;
355 await connection.OpenAsync();
358 using (var clearCmd =
new KineticaCommand($
"DELETE FROM {tableName}", connection))
360 clearCmd.ExecuteNonQuery();
364 for (
int i = 0; i < 100; i++)
366 var boolVal = i % 2 == 0 ? 1 : 0;
367 var sql = $
@"INSERT INTO {tableName} 368 (id, int_col, long_col, float_col, double_col, string_col, bool_col) 369 VALUES ({i}, {i * 10}, {i * 100000L}, {i * 1.5f}, {i * 2.5}, 'String_{i}', {boolVal})";
372 await cmd.ExecuteNonQueryAsync();
376 for (
int i = 100; i < 110; i++)
378 var sql = $
@"INSERT INTO {tableName} 379 (id, int_col, long_col, float_col, double_col, string_col, bool_col) 380 VALUES ({i}, NULL, NULL, NULL, NULL, NULL, NULL)";
383 await cmd.ExecuteNonQueryAsync();
386 var flushed = await connection.FlushBatchAsync();
387 Console.WriteLine($
" Inserted {flushed} records with various data types");
391 $
"SELECT id, int_col, string_col, bool_col FROM {tableName} WHERE id < 5 ORDER BY id", connection))
393 using var reader = verifyCmd.ExecuteReader();
394 Console.WriteLine(
" Sample data:");
395 while (reader.Read())
397 Console.WriteLine($
" id={reader["id"]}, int_col={reader["int_col
"]}, " +
398 $
"string_col={reader["string_col
"]}, bool_col={reader["bool_col
"]}");
408 private static async Task AutoFlushExample(
string connectionString)
410 Console.WriteLine(
"Example 6: Auto-Flush on Batch Size");
411 Console.WriteLine(
"-----------------------------------");
413 var tableName = $
"{TestSchema}.batch_autoflush";
416 connection.BatchInsertMode =
true;
417 connection.BatchSize = 50;
418 await connection.OpenAsync();
421 using (var clearCmd =
new KineticaCommand($
"DELETE FROM {tableName}", connection))
423 clearCmd.ExecuteNonQuery();
426 Console.WriteLine($
" Batch size set to: {connection.BatchSize}");
429 for (
int i = 0; i < 120; i++)
432 $
"INSERT INTO {tableName} (id, data) VALUES ({i}, 'AutoFlush_{i}')",
434 await cmd.ExecuteNonQueryAsync();
437 if ((i + 1) % 50 == 0)
439 Console.WriteLine($
" After {i + 1} inserts - Pending: {connection.PendingBatchCount}");
443 Console.WriteLine($
" Final pending count: {connection.PendingBatchCount}");
446 var remaining = await connection.FlushBatchAsync();
447 Console.WriteLine($
" Flushed remaining: {remaining}");
450 using (var verifyCmd =
new KineticaCommand($
"SELECT COUNT(*) FROM {tableName}", connection))
452 using var reader = verifyCmd.ExecuteReader();
455 Console.WriteLine($
" Total records in table: {reader.GetValue(0)}");
462 private static async Task CleanupAsync(
string connectionString)
464 Console.WriteLine(
"Cleaning up test schema...");
467 await connection.OpenAsync();
469 using var cmd =
new KineticaCommand($
"DROP SCHEMA IF EXISTS {TestSchema} CASCADE", connection);
470 cmd.ExecuteNonQuery();
472 Console.WriteLine(
"Cleanup complete.");
static async Task RunAsync(string serverUrl, string username, string password)
Demonstrates the high-performance batch insert feature of the Kinetica ADO.NET driver.