Kinetica   C#   API  Version 7.2.3.1
ShardKeyExample.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Threading.Tasks;
5 using kinetica;
6 using kinetica.Records;
7 
8 namespace Example
9 {
24  public static class ShardKeyExample
25  {
34  {
35  // ========== PRIMARY KEY (also part of shard key) ==========
37  public int id { get; set; }
38 
39  // ========== SHARD KEY COLUMNS ==========
40  // All these columns participate in sharding
41 
43  public string region_code { get; set; } = string.Empty;
44 
46  public int category_id { get; set; }
47 
48  // ========== NON-SHARD DATA COLUMNS ==========
49  // These columns store data but don't affect sharding
50 
52  public long event_timestamp { get; set; }
53 
55  public string device_uuid { get; set; } = string.Empty;
56 
58  public string event_name { get; set; } = string.Empty;
59 
61  public double event_value { get; set; }
62 
64  public string client_ip { get; set; } = string.Empty;
65 
67  public string metadata { get; set; } = string.Empty;
68 
69  public override string ToString()
70  {
71  return $"ShardedRecord {{ id={id}, region='{region_code}', category={category_id}, " +
72  $"event='{event_name}', value={event_value:F2} }}";
73  }
74 
76  {
77  return new ShardKeyValues(
78  ("id", ShardKeyValue.Int(id)),
79  ("region_code", ShardKeyValue.String(region_code)),
80  ("category_id", ShardKeyValue.Int(category_id))
81  );
82  }
83  }
84 
88  public static async Task RunAsync(string serverUrl, string username, string password)
89  {
90  Console.WriteLine();
91  Console.WriteLine("=========================================================");
92  Console.WriteLine("= Shard Key Example - RecordRetriever Multi-Head Demo =");
93  Console.WriteLine("=========================================================");
94  Console.WriteLine();
95 
96  var options = new Kinetica.Options
97  {
98  Username = username,
99  Password = password
100  };
101  var kdb = new Kinetica(serverUrl, options);
102 
103  const string tableName = "csharp_shard_key_example";
104 
105  try
106  {
107  // Step 1: Create table with shard keys
108  Console.WriteLine("Step 1: Creating table with shard key columns...");
109  CreateTable(kdb, tableName);
110  Console.WriteLine($" Table '{tableName}' created with composite shard key.");
111  Console.WriteLine(" Shard key columns: id, region_code, category_id\n");
112 
113  // Step 2: Generate and insert records
114  Console.WriteLine("Step 2: Generating test records...");
115  const int recordCount = 500;
116  var records = GenerateRecords(recordCount);
117  Console.WriteLine($" Generated {recordCount} records across 4 regions and 5 categories.\n");
118 
119  Console.WriteLine("Step 3: Inserting records using BulkInserter...");
120  var insertStats = await InsertRecordsAsync(kdb, tableName, records);
121  Console.WriteLine($" Inserted: {insertStats.inserted}, Updated: {insertStats.updated}");
122  Console.WriteLine($" Throughput: {insertStats.throughput:N0} records/sec\n");
123 
124  // Step 4: Create RecordRetriever for multi-head retrieval
125  Console.WriteLine("Step 4: Setting up RecordRetriever for multi-head retrieval...");
126  var ktype = KineticaType.fromTable(kdb, tableName);
127  var retriever = new RecordRetriever<ShardedRecord>(kdb, tableName, ktype);
128  Console.WriteLine(" RecordRetriever initialized with worker routing.\n");
129 
130  // Step 5: Demonstrate retrieval by shard key
131  Console.WriteLine("Step 5: Retrieving records by shard key (direct worker routing)...");
132  Console.WriteLine(new string('-', 80));
133 
134  // Pick some sample records to query by their shard keys
135  var sampleRecords = new[]
136  {
137  records[0], // First record
138  records[100], // Middle record
139  records[250], // Another middle record
140  records[499] // Last record
141  };
142 
143  foreach (var sampleRecord in sampleRecords)
144  {
145  Console.WriteLine($"\n Querying by shard key: id={sampleRecord.id}, region='{sampleRecord.region_code}', " +
146  $"category={sampleRecord.category_id}");
147 
148  try
149  {
150  // Create a key record with just the shard key values
151  var keyRecord = new ShardedRecord
152  {
153  id = sampleRecord.id,
154  region_code = sampleRecord.region_code,
155  category_id = sampleRecord.category_id
156  };
157 
158  var response = retriever.getRecordsByKey(keyRecord);
159  Console.WriteLine($" Found {response.data.Count} record(s):");
160  foreach (var record in response.data)
161  {
162  Console.WriteLine($" -> {record}");
163  }
164  }
165  catch (Exception ex)
166  {
167  // Show full exception details for debugging
168  var innerMsg = ex.InnerException?.Message ?? ex.Message;
169  Console.WriteLine($" Error: {innerMsg}");
170  }
171  }
172 
173  Console.WriteLine(new string('-', 80));
174 
175  // Step 6: Demonstrate limitations of additional expressions
176  Console.WriteLine("\nStep 6: Note on RecordRetriever expression limitations...");
177  Console.WriteLine(new string('-', 80));
178  Console.WriteLine(" RecordRetriever.getRecordsByKey() with additional expressions requires:");
179  Console.WriteLine(" - Only equality (=) and AND operators");
180  Console.WriteLine(" - Referenced columns must have attribute indexes");
181  Console.WriteLine(" - No range operators (>, <, >=, <=)");
182  Console.WriteLine("");
183  Console.WriteLine(" For complex queries, use SQL via executeSql() or filter():");
184 
185  // Demo using SQL for complex queries after shard-based lookup
186  var sqlResponse = kdb.executeSql(
187  $"SELECT id, region_code, category_id, event_name, event_value FROM {tableName} WHERE region_code = 'US-E' AND event_value > 40 LIMIT 3",
188  0, -9999);
189  Console.WriteLine($"\n SQL query (region='US-E' AND event_value > 40): Found {sqlResponse.total_number_of_records} records");
190  foreach (KineticaRecord rec in sqlResponse.data.Take(3))
191  {
192  Console.WriteLine($" -> {rec.ContentsToString()}");
193  }
194  Console.WriteLine(new string('-', 80));
195 
196  // Step 7: Show distribution statistics
197  Console.WriteLine("\nStep 7: Showing record distribution by region...");
198  ShowDistributionByRegion(kdb, tableName);
199 
200  Console.WriteLine("\nShard Key Example completed successfully!");
201  }
202  catch (Exception ex)
203  {
204  Console.WriteLine($"Error: {ex.Message}");
205  Console.WriteLine(ex.StackTrace);
206  }
207  finally
208  {
209  Console.WriteLine($"\nCleaning up: Dropping table '{tableName}'...");
210  try
211  {
212  kdb.clearTable(tableName, "", new Dictionary<string, string>
213  {
215  });
216  Console.WriteLine(" Table dropped.");
217  }
218  catch (Exception ex)
219  {
220  Console.WriteLine($" Warning: Could not drop table: {ex.Message}");
221  }
222  }
223  }
224 
228  private static void CreateTable(Kinetica kdb, string tableName)
229  {
230  var columnProperties = new Dictionary<string, IList<string>>
231  {
232  // Primary key and shard key - simple types that work well with RecordRetriever
233  ["id"] = new List<string> { ColumnProperty.PRIMARY_KEY, ColumnProperty.SHARD_KEY },
234  ["region_code"] = new List<string> { ColumnProperty.CHAR4, ColumnProperty.PRIMARY_KEY, ColumnProperty.SHARD_KEY },
235  ["category_id"] = new List<string> { ColumnProperty.INT16, ColumnProperty.PRIMARY_KEY, ColumnProperty.SHARD_KEY },
236 
237  // Data columns (not part of shard key)
238  ["event_timestamp"] = new List<string> { ColumnProperty.TIMESTAMP },
239  ["device_uuid"] = new List<string> { ColumnProperty.UUID },
240  ["client_ip"] = new List<string> { ColumnProperty.IPV4 },
241  ["metadata"] = new List<string> { ColumnProperty.JSON }
242  };
243 
244  // Clear existing table
245  kdb.clearTable(tableName, "", new Dictionary<string, string>
246  {
248  });
249 
250  // Create type and table
251  var ktype = KineticaType.fromClass(typeof(ShardedRecord), columnProperties);
252  string typeId = ktype.create(kdb);
253  kdb.createTable(tableName, typeId);
254 
255  // Create attribute indexes on shard key columns for fast lookup
256  // This is required for RecordRetriever.getRecordsByKey() to work
257  Console.WriteLine(" Creating attribute indexes on shard key columns...");
258  foreach (var col in new[] { "id", "region_code", "category_id" })
259  {
260  try
261  {
262  kdb.alterTable(tableName, "create_index", col, new Dictionary<string, string>());
263  }
264  catch { /* Ignore if index already exists */ }
265  }
266  }
267 
271  private static List<ShardedRecord> GenerateRecords(int count)
272  {
273  var records = new List<ShardedRecord>(count);
274  var random = new Random(42);
275  var baseTime = new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc);
276 
277  // Define regions and categories for distribution
278  var regions = new[] { "US-E", "US-W", "EU-W", "APAC" };
279  var eventNames = new[] { "click", "view", "purchase", "signup", "logout" };
280 
281  for (int i = 0; i < count; i++)
282  {
283  var record = new ShardedRecord
284  {
285  // Shard key columns
286  id = i,
287  region_code = regions[i % regions.Length],
288  category_id = (i % 5) + 1, // Categories 1-5
289  event_timestamp = ((DateTimeOffset)baseTime.AddMinutes(i)).ToUnixTimeMilliseconds(),
290  device_uuid = GenerateDeterministicUuid(i),
291 
292  // Data columns
293  event_name = eventNames[random.Next(eventNames.Length)],
294  event_value = random.NextDouble() * 100,
295  client_ip = $"{random.Next(1, 256)}.{random.Next(0, 256)}.{random.Next(0, 256)}.{random.Next(1, 256)}",
296  metadata = $"{{\"session_id\": \"{Guid.NewGuid()}\", \"page\": \"/page/{i % 20}\"}}"
297  };
298 
299  records.Add(record);
300  }
301 
302  return records;
303  }
304 
308  private static string GenerateDeterministicUuid(int index)
309  {
310  // Create a reproducible UUID based on index
311  var bytes = new byte[16];
312  BitConverter.GetBytes(index).CopyTo(bytes, 0);
313  BitConverter.GetBytes(index * 31).CopyTo(bytes, 4);
314  BitConverter.GetBytes(index * 17).CopyTo(bytes, 8);
315  BitConverter.GetBytes(index * 13).CopyTo(bytes, 12);
316  return new Guid(bytes).ToString();
317  }
318 
322  private static async Task<(long inserted, long updated, double throughput)> InsertRecordsAsync(
323  Kinetica kdb, string tableName, List<ShardedRecord> records)
324  {
325  var ktype = KineticaType.fromTable(kdb, tableName);
326 
327  var options = new BulkInserterOptions
328  {
329  BatchSize = 200,
330  MaxInFlightBatches = 5,
331  MaxRetries = 3
332  };
333 
334  var sw = System.Diagnostics.Stopwatch.StartNew();
335 
336  await using var inserter = new BulkInserter<ShardedRecord>(kdb, tableName, ktype, options);
337  inserter.InsertBatch(records);
338  await inserter.CloseAsync();
339 
340  sw.Stop();
341 
342  var throughput = records.Count / (sw.Elapsed.TotalMilliseconds / 1000.0);
343 
344  var errors = inserter.DrainErrors();
345  if (errors.Count > 0)
346  {
347  Console.WriteLine($" Warning: {errors.Count} errors during insertion");
348  }
349 
350  return (inserter.CountInserted, inserter.CountUpdated, throughput);
351  }
352 
356  private static void ShowDistributionByRegion(Kinetica kdb, string tableName)
357  {
358  try
359  {
360  // Use SQL for cleaner aggregation
361  var response = kdb.executeSql(
362  $"SELECT region_code, COUNT(*) as record_count FROM {tableName} GROUP BY region_code ORDER BY region_code",
363  0, -9999);
364 
365  Console.WriteLine(" Region distribution:");
366  foreach (KineticaRecord record in response.data)
367  {
368  Console.WriteLine($" {record.ContentsToString()}");
369  }
370  }
371  catch (Exception ex)
372  {
373  Console.WriteLine($" Could not get distribution: {ex.Message}");
374  }
375  }
376  }
377 }
Example demonstrating shard key usage with RecordRetriever for multi-head retrieval.
static ShardKeyValue Int(int value)
Creates a 32-bit integer shard key value.
long event_timestamp
Event timestamp - stored as LONG (not part of shard key for simpler expression building)
double event_value
Event value - double (cannot be shard key)
const string INT16
This property provides optimized memory and query performance for int columns.
const string PRIMARY_KEY
This property indicates that this column will be part of (or the entire) primary key.
Record class with multiple shard key columns demonstrating allowed shard key types.
CreateTableResponse createTable(CreateTableRequest request_)
Creates a new table with the given type (definition of columns).
A set of parameters for Kinetica.clearTable.
Definition: ClearTable.cs:19
ExecuteSqlResponse executeSql(ExecuteSqlRequest request_)
Execute a SQL statement (query, DML, or DDL).
Column properties used for Kinetica types.
string client_ip
IP address of the client
Convenience class for using Avro.Generic.GenericRecord objects.
const string TIMESTAMP
Valid only for 'long' columns.
const string JSON
Valid only for 'string' columns.
High-performance bulk inserter for Kinetica with support for multi-head ingest,
Definition: BulkInserter.cs:28
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
const string CHAR4
This property provides optimized memory, disk and query performance for string columns.
Collection of shard key column names and values.
A set of string constants for the parameter options.
Definition: ClearTable.cs:24
string region_code
Region code - CHAR4 shard key for geographic distribution
const string IPV4
This property provides optimized memory, disk and query performance for string columns representing I...
ClearTableResponse clearTable(ClearTableRequest request_)
Clears (drops) one or all tables in the database cluster.
Manages the insertion into GPUdb of large numbers of records in bulk, with automatic batch management...
static KineticaType fromClass(Type recordClass, IDictionary< string, IList< string >> properties=null)
Create a KineticaType object from properties of a record class and Kinetica column properties.
ShardKeyValues GetShardKeyValues()
Returns shard key column names and their typed values.
const string SHARD_KEY
This property indicates that this column will be part of (or the entire) shard key.
AlterTableResponse alterTable(AlterTableRequest request_)
Apply various modifications to a table or view.
string device_uuid
Device UUID - stored for reference
int id
Primary key - integer ID (also shard key)
const string UUID
Valid only for 'string' columns.
string metadata
Additional JSON metadata
string create(Kinetica kinetica)
Given a handle to the server, creates a type in the database based on this data type.
static ShardKeyValue String(string value)
Creates a string shard key value.
A typed value for shard key computation.
Failover to clusters in a random order (default)
static async Task RunAsync(string serverUrl, string username, string password)
Runs the shard key example demonstrating RecordRetriever for multi-head retrieval.
Interface for extracting shard key values from a record.
const string NO_ERROR_IF_NOT_EXISTS
If TRUE and if the table specified in table_name does not exist no error is returned.
Definition: ClearTable.cs:40
string event_name
Event name - regular string column
DateTime in YYYY-MM-DD HH:MM:SS.mmm format
int category_id
Category ID - INT16 shard key
Configuration options for the BulkInserter<T>.