Kinetica   C#   API  Version 7.2.3.1
AsyncInsertRecordsTests.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 using System.Threading.Tasks;
4 using Xunit;
6 using kinetica;
7 
9 {
14  [Trait("Category", "Integration")]
15  [Trait("Category", "Async")]
17  {
18  [Fact]
19  public async Task TestInsertRecordsJsonAsync()
20  {
21  using var ctx = new TestContext("async_insert_json");
22 
23  // Create type
24  var typeDef = @"{""type"":""record"",""name"":""test_rec"",""fields"":[{""name"":""id"",""type"":""int""},{""name"":""name"",""type"":""string""},{""name"":""value"",""type"":""double""}]}";
25  var props = new Dictionary<string, IList<string>>
26  {
27  { "id", new List<string> { "int", "primary_key" } }
28  };
29 
30  var typeResp = await ctx.Kinetica.CreateTypeAsync(typeDef, "async_insert_test_type", props, new Dictionary<string, string>());
31 
32  // Create table asynchronously
33  var tableName = ctx.QualifiedTable("test_table_async");
34  await ctx.Kinetica.CreateTableAsync(tableName, typeResp.type_id, new Dictionary<string, string>());
35 
36  // Insert records using async SQL execution
37  await ctx.Kinetica.ExecuteSqlAsync($"INSERT INTO {tableName} (id, name, value) VALUES (1, 'Alice', 1.1)");
38  await ctx.Kinetica.ExecuteSqlAsync($"INSERT INTO {tableName} (id, name, value) VALUES (2, 'Bob', 2.2)");
39  await ctx.Kinetica.ExecuteSqlAsync($"INSERT INTO {tableName} (id, name, value) VALUES (3, 'Charlie', 3.3)");
40 
41  // Verify records were inserted using async showTable
42  var showTableResp = await ctx.Kinetica.ShowTableAsync(tableName, new Dictionary<string, string> { { "get_sizes", "true" } });
43  Assert.Equal(3, showTableResp.total_size);
44  }
45 
46  [Fact]
48  {
49  using var ctx = new TestContext("async_insert_update_pk");
50 
51  // Create type with primary key
52  var typeDef = @"{""type"":""record"",""name"":""pk_rec"",""fields"":[{""name"":""id"",""type"":""int""},{""name"":""value"",""type"":""string""}]}";
53  var props = new Dictionary<string, IList<string>>
54  {
55  { "id", new List<string> { "int", "primary_key" } }
56  };
57 
58  var typeResp = await ctx.Kinetica.CreateTypeAsync(typeDef, "async_pk_test_type", props, new Dictionary<string, string>());
59 
60  var tableName = ctx.QualifiedTable("pk_table_async");
61  await ctx.Kinetica.CreateTableAsync(tableName, typeResp.type_id, new Dictionary<string, string>());
62 
63  // Insert initial records asynchronously
64  await ctx.Kinetica.ExecuteSqlAsync($"INSERT INTO {tableName} (id, value) VALUES (1, 'original')");
65  await ctx.Kinetica.ExecuteSqlAsync($"INSERT INTO {tableName} (id, value) VALUES (2, 'data')");
66 
67  // Verify 2 records exist
68  var showTableResp = await ctx.Kinetica.ShowTableAsync(tableName, new Dictionary<string, string> { { "get_sizes", "true" } });
69  Assert.Equal(2, showTableResp.total_size);
70 
71  // Update existing record
72  await ctx.Kinetica.ExecuteSqlAsync($"UPDATE {tableName} SET value = 'updated' WHERE id = 1");
73 
74  // Should still have 2 records
75  var showTableResp2 = await ctx.Kinetica.ShowTableAsync(tableName, new Dictionary<string, string> { { "get_sizes", "true" } });
76  Assert.Equal(2, showTableResp2.total_size);
77 
78  // Verify the value was updated
79  var selectResp = await ctx.Kinetica.ExecuteSqlAsync($"SELECT value FROM {tableName} WHERE id = 1", 0, -9999);
80  Assert.Equal(1, selectResp.total_number_of_records);
81  }
82 
83  [Fact]
85  {
86  using var ctx = new TestContext("async_insert_large");
87 
88  // Create table using async SQL
89  await ctx.Kinetica.ExecuteSqlAsync($"CREATE TABLE {ctx.QualifiedTable("batch_table_async")} (id INT NOT NULL, x DOUBLE, y DOUBLE, PRIMARY KEY (id))");
90 
91  var tableName = ctx.QualifiedTable("batch_table_async");
92 
93  // Insert 100 records concurrently
94  var numRecords = 100;
95  var insertTasks = new List<Task>();
96 
97  for (int i = 0; i < numRecords; i++)
98  {
99  int recordId = i; // Capture loop variable
100  insertTasks.Add(ctx.Kinetica.ExecuteSqlAsync($"INSERT INTO {tableName} (id, x, y) VALUES ({recordId}, {recordId * 0.1}, {recordId * 0.2})"));
101 
102  // Insert in batches of 10 to avoid overwhelming the server
103  if (insertTasks.Count >= 10 || i == numRecords - 1)
104  {
105  await Task.WhenAll(insertTasks);
106  insertTasks.Clear();
107  }
108  }
109 
110  // Verify count using async showTable
111  var showTableResp = await ctx.Kinetica.ShowTableAsync(tableName, new Dictionary<string, string> { { "get_sizes", "true" } });
112  Assert.Equal(numRecords, showTableResp.total_size);
113  }
114 
115  [Fact]
117  {
118  using var ctx = new TestContext("async_parallel_insert");
119 
120  // Create type
121  var typeDef = @"{""type"":""record"",""name"":""parallel_rec"",""fields"":[{""name"":""id"",""type"":""int""},{""name"":""data"",""type"":""string""}]}";
122  var props = new Dictionary<string, IList<string>>
123  {
124  { "id", new List<string> { "int", "primary_key" } }
125  };
126 
127  var typeResp = await ctx.Kinetica.CreateTypeAsync(typeDef, "parallel_type", props, new Dictionary<string, string>());
128 
129  // Create and populate multiple tables in parallel
130  var tasks = new List<Task>();
131  for (int tableNum = 0; tableNum < 5; tableNum++)
132  {
133  int capturedTableNum = tableNum;
134  tasks.Add(Task.Run(async () =>
135  {
136  var tableName = ctx.QualifiedTable($"parallel_table_{capturedTableNum}");
137  await ctx.Kinetica.CreateTableAsync(tableName, typeResp.type_id, new Dictionary<string, string>());
138 
139  // Insert 10 records into each table
140  for (int i = 0; i < 10; i++)
141  {
142  await ctx.Kinetica.ExecuteSqlAsync($"INSERT INTO {tableName} (id, data) VALUES ({i}, 'data_{i}')");
143  }
144 
145  // Verify count
146  var showResp = await ctx.Kinetica.ShowTableAsync(tableName, new Dictionary<string, string> { { "get_sizes", "true" } });
147  Assert.Equal(10, showResp.total_size);
148  }));
149  }
150 
151  await Task.WhenAll(tasks);
152  }
153  }
154 }
Test context that manages schema and cleanup for integration tests.
Definition: TestContext.cs:11
Async tests for insert_records and related endpoints.