2 using System.Collections.Generic;
4 using System.Threading.Tasks;
37 public int id {
get;
set; }
64 public string client_ip {
get;
set; } =
string.Empty;
67 public string metadata {
get;
set; } =
string.Empty;
71 return $
"ShardedRecord {{ id={id}, region='{region_code}', category={category_id}, " +
72 $
"event='{event_name}', value={event_value:F2} }}";
88 public static async Task
RunAsync(
string serverUrl,
string username,
string password)
91 Console.WriteLine(
"=========================================================");
92 Console.WriteLine(
"= Shard Key Example - RecordRetriever Multi-Head Demo =");
93 Console.WriteLine(
"=========================================================");
101 var kdb =
new Kinetica(serverUrl, options);
103 const string tableName =
"csharp_shard_key_example";
108 Console.WriteLine(
"Step 1: Creating table with shard key columns...");
109 CreateTable(kdb, tableName);
110 Console.WriteLine($
" Table '{tableName}' created with composite shard key.");
111 Console.WriteLine(
" Shard key columns: id, region_code, category_id\n");
114 Console.WriteLine(
"Step 2: Generating test records...");
115 const int recordCount = 500;
116 var records = GenerateRecords(recordCount);
117 Console.WriteLine($
" Generated {recordCount} records across 4 regions and 5 categories.\n");
119 Console.WriteLine(
"Step 3: Inserting records using BulkInserter...");
120 var insertStats = await InsertRecordsAsync(kdb, tableName, records);
121 Console.WriteLine($
" Inserted: {insertStats.inserted}, Updated: {insertStats.updated}");
122 Console.WriteLine($
" Throughput: {insertStats.throughput:N0} records/sec\n");
125 Console.WriteLine(
"Step 4: Setting up RecordRetriever for multi-head retrieval...");
128 Console.WriteLine(
" RecordRetriever initialized with worker routing.\n");
131 Console.WriteLine(
"Step 5: Retrieving records by shard key (direct worker routing)...");
132 Console.WriteLine(
new string(
'-', 80));
135 var sampleRecords =
new[]
143 foreach (var sampleRecord
in sampleRecords)
145 Console.WriteLine($
"\n Querying by shard key: id={sampleRecord.id}, region='{sampleRecord.region_code}', " +
146 $
"category={sampleRecord.category_id}");
153 id = sampleRecord.
id,
154 region_code = sampleRecord.region_code,
155 category_id = sampleRecord.category_id
158 var response = retriever.getRecordsByKey(keyRecord);
159 Console.WriteLine($
" Found {response.data.Count} record(s):");
160 foreach (var record
in response.data)
162 Console.WriteLine($
" -> {record}");
168 var innerMsg = ex.InnerException?.Message ?? ex.Message;
169 Console.WriteLine($
" Error: {innerMsg}");
173 Console.WriteLine(
new string(
'-', 80));
176 Console.WriteLine(
"\nStep 6: Note on RecordRetriever expression limitations...");
177 Console.WriteLine(
new string(
'-', 80));
178 Console.WriteLine(
" RecordRetriever.getRecordsByKey() with additional expressions requires:");
179 Console.WriteLine(
" - Only equality (=) and AND operators");
180 Console.WriteLine(
" - Referenced columns must have attribute indexes");
181 Console.WriteLine(
" - No range operators (>, <, >=, <=)");
182 Console.WriteLine(
"");
183 Console.WriteLine(
" For complex queries, use SQL via executeSql() or filter():");
186 var sqlResponse = kdb.executeSql(
187 $
"SELECT id, region_code, category_id, event_name, event_value FROM {tableName} WHERE region_code = 'US-E' AND event_value > 40 LIMIT 3",
189 Console.WriteLine($
"\n SQL query (region='US-E' AND event_value > 40): Found {sqlResponse.total_number_of_records} records");
192 Console.WriteLine($
" -> {rec.ContentsToString()}");
194 Console.WriteLine(
new string(
'-', 80));
197 Console.WriteLine(
"\nStep 7: Showing record distribution by region...");
198 ShowDistributionByRegion(kdb, tableName);
200 Console.WriteLine(
"\nShard Key Example completed successfully!");
204 Console.WriteLine($
"Error: {ex.Message}");
205 Console.WriteLine(ex.StackTrace);
209 Console.WriteLine($
"\nCleaning up: Dropping table '{tableName}'...");
212 kdb.clearTable(tableName,
"",
new Dictionary<string, string>
216 Console.WriteLine(
" Table dropped.");
220 Console.WriteLine($
" Warning: Could not drop table: {ex.Message}");
228 private static void CreateTable(
Kinetica kdb,
string tableName)
230 var columnProperties =
new Dictionary<string, IList<string>>
245 kdb.
clearTable(tableName,
"",
new Dictionary<string, string>
252 string typeId = ktype.
create(kdb);
257 Console.WriteLine(
" Creating attribute indexes on shard key columns...");
258 foreach (var col
in new[] {
"id",
"region_code",
"category_id" })
262 kdb.
alterTable(tableName,
"create_index", col,
new Dictionary<string, string>());
271 private static List<ShardedRecord> GenerateRecords(
int count)
273 var records =
new List<ShardedRecord>(count);
274 var random =
new Random(42);
275 var baseTime =
new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc);
278 var regions =
new[] {
"US-E",
"US-W",
"EU-W",
"APAC" };
279 var eventNames =
new[] {
"click",
"view",
"purchase",
"signup",
"logout" };
281 for (
int i = 0; i < count; i++)
283 var record =
new ShardedRecord
287 region_code = regions[i % regions.Length],
288 category_id = (i % 5) + 1,
289 event_timestamp = ((DateTimeOffset)baseTime.AddMinutes(i)).ToUnixTimeMilliseconds(),
290 device_uuid = GenerateDeterministicUuid(i),
293 event_name = eventNames[random.Next(eventNames.Length)],
294 event_value = random.NextDouble() * 100,
295 client_ip = $
"{random.Next(1, 256)}.{random.Next(0, 256)}.{random.Next(0, 256)}.{random.Next(1, 256)}",
296 metadata = $
"{{\"session_id\": \"{Guid.NewGuid()}\", \"page\": \"/page/{i % 20}\"}}" 308 private static string GenerateDeterministicUuid(
int index)
311 var bytes =
new byte[16];
312 BitConverter.GetBytes(index).CopyTo(bytes, 0);
313 BitConverter.GetBytes(index * 31).CopyTo(bytes, 4);
314 BitConverter.GetBytes(index * 17).CopyTo(bytes, 8);
315 BitConverter.GetBytes(index * 13).CopyTo(bytes, 12);
316 return new Guid(bytes).ToString();
322 private static async Task<(
long inserted,
long updated,
double throughput)> InsertRecordsAsync(
323 Kinetica kdb,
string tableName, List<ShardedRecord> records)
330 MaxInFlightBatches = 5,
334 var sw = System.Diagnostics.Stopwatch.StartNew();
337 inserter.InsertBatch(records);
338 await inserter.CloseAsync();
342 var throughput = records.Count / (sw.Elapsed.TotalMilliseconds / 1000.0);
344 var errors = inserter.DrainErrors();
345 if (errors.Count > 0)
347 Console.WriteLine($
" Warning: {errors.Count} errors during insertion");
350 return (inserter.CountInserted, inserter.CountUpdated, throughput);
356 private static void ShowDistributionByRegion(
Kinetica kdb,
string tableName)
362 $
"SELECT region_code, COUNT(*) as record_count FROM {tableName} GROUP BY region_code ORDER BY region_code",
365 Console.WriteLine(
" Region distribution:");
368 Console.WriteLine($
" {record.ContentsToString()}");
373 Console.WriteLine($
" Could not get distribution: {ex.Message}");
Example demonstrating shard key usage with RecordRetriever for multi-head retrieval.
static ShardKeyValue Int(int value)
Creates a 32-bit integer shard key value.
override string ToString()
long event_timestamp
Event timestamp - stored as LONG (not part of shard key for simpler expression building)
double event_value
Event value - double (cannot be shard key)
const string INT16
This property provides optimized memory and query performance for int columns.
const string PRIMARY_KEY
This property indicates that this column will be part of (or the entire) primary key.
Record class with multiple shard key columns demonstrating allowed shard key types.
CreateTableResponse createTable(CreateTableRequest request_)
Creates a new table with the given type (definition of columns).
A set of parameters for Kinetica.clearTable.
ExecuteSqlResponse executeSql(ExecuteSqlRequest request_)
Execute a SQL statement (query, DML, or DDL).
Column properties used for Kinetica types.
string client_ip
IP address of the client
Convenience class for using Avro.Generic.GenericRecord objects.
const string TIMESTAMP
Valid only for 'long' columns.
const string JSON
Valid only for 'string' columns.
High-performance bulk inserter for Kinetica with support for multi-head ingest,
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
const string CHAR4
This property provides optimized memory, disk and query performance for string columns.
Collection of shard key column names and values.
A set of string constants for the parameter options.
string region_code
Region code - CHAR4 shard key for geographic distribution
const string IPV4
This property provides optimized memory, disk and query performance for string columns representing I...
ClearTableResponse clearTable(ClearTableRequest request_)
Clears (drops) one or all tables in the database cluster.
Manages the insertion into GPUdb of large numbers of records in bulk, with automatic batch management...
static KineticaType fromClass(Type recordClass, IDictionary< string, IList< string >> properties=null)
Create a KineticaType object from properties of a record class and Kinetica column properties.
ShardKeyValues GetShardKeyValues()
Returns shard key column names and their typed values.
const string SHARD_KEY
This property indicates that this column will be part of (or the entire) shard key.
AlterTableResponse alterTable(AlterTableRequest request_)
Apply various modifications to a table or view.
string device_uuid
Device UUID - stored for reference
int id
Primary key - integer ID (also shard key)
const string UUID
Valid only for 'string' columns.
string metadata
Additional JSON metadata
string create(Kinetica kinetica)
Given a handle to the server, creates a type in the database based on this data type.
static ShardKeyValue String(string value)
Creates a string shard key value.
A typed value for shard key computation.
Failover to clusters in a random order (default)
static async Task RunAsync(string serverUrl, string username, string password)
Runs the shard key example demonstrating RecordRetriever for multi-head retrieval.
const string NO_ERROR_IF_NOT_EXISTS
If TRUE and if the table specified in table_name does not exist no error is returned.
string event_name
Event name - regular string column
DateTime in YYYY-MM-DD HH:MM:SS.mmm format
int category_id
Category ID - INT16 shard key
Configuration options for the BulkInserter<T>.