Kinetica   C#   API  Version 7.2.3.1
GenericRecordRetriever.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 using System.Threading;
4 
5 namespace kinetica.Utils;
6 
19  {
20  #region Fields
21 
22  private readonly Kinetica _kinetica;
23  private readonly string _tableName;
24  private readonly KineticaType _ktype;
25  private readonly GenericRecordKeyBuilder? _shardKeyBuilder;
26  private readonly bool _multiHeadEnabled;
27  private readonly Random _random;
28 
29  // HA Failover fields
30  private readonly int _dbHaRingSize;
31  private volatile int _numClusterSwitches;
32  private volatile Uri? _currentHeadNodeUrl;
33  private readonly object _haLock = new object();
34 
35  // Mutable state for HA failover
36  private volatile IList<int>? _mutableRoutingTable;
37  private volatile IList<Uri>? _mutableWorkerUrls;
38  private long _shardVersion;
39  private long _shardUpdateTime;
40 
41  // Immutable initial state
42  private readonly IList<int>? _routingTable;
43  private readonly IList<Uri> _workerUrls;
44 
45  #endregion
46 
47  #region Properties
48 
52  public Kinetica KineticaDB => _kinetica;
53 
57  public string TableName => _tableName;
58 
62  public KineticaType KType => _ktype;
63 
67  public int NumClusterSwitches => _numClusterSwitches;
68 
72  public int HARingSize => _dbHaRingSize;
73 
77  public bool MultiHeadEnabled => _multiHeadEnabled;
78 
82  public IList<string> ShardKeyColumnNames => _shardKeyBuilder?.GetRoutingColumnNames() ?? new List<string>();
83 
84  #endregion
85 
86  #region Constructor
87 
95  public GenericRecordRetriever(Kinetica kdb, string tableName,
96  KineticaType ktype,
97  WorkerList? workers = null)
98  {
99  _kinetica = kdb ?? throw new ArgumentNullException(nameof(kdb));
100  _tableName = tableName ?? throw new ArgumentNullException(nameof(tableName));
101  _ktype = ktype ?? throw new ArgumentNullException(nameof(ktype));
102 
103  // Initialize HA state
104  _dbHaRingSize = kdb.HAManager?.HARingSize ?? 1;
105  _numClusterSwitches = kdb.NumClusterSwitches;
106  _currentHeadNodeUrl = kdb.URL;
107 
108  // Set up the shard key builder
109  _shardKeyBuilder = new GenericRecordKeyBuilder(false, ktype);
110  if (!_shardKeyBuilder.HasKey())
111  _shardKeyBuilder = null;
112 
113  // Set up the worker URLs
114  _workerUrls = new List<Uri>();
115  try
116  {
117  // If no workers are given, try to get them from Kinetica
118  if (workers == null || workers.Count == 0)
119  {
120  workers = new WorkerList(kdb);
121  }
122 
123  // If we end up with multiple workers, use those
124  if (workers != null && workers.Count > 0)
125  {
126  foreach (var workerUrl in workers)
127  {
128  if (workerUrl == null) continue;
129  ((List<Uri>)_workerUrls).Add(workerUrl);
130  }
131 
132  // Get the worker rank information from Kinetica
133  _routingTable = kdb.adminShowShards().rank;
134 
135  // Check that enough worker URLs are specified
136  for (int i = 0; i < _routingTable.Count; ++i)
137  {
138  if (_routingTable[i] > _workerUrls.Count)
139  throw new KineticaException("Not enough worker URLs specified.");
140  }
141 
142  _multiHeadEnabled = true;
143  }
144  else
145  {
146  // Single head mode
147  ((List<Uri>)_workerUrls).Add(kdb.URL);
148  _routingTable = null;
149  _multiHeadEnabled = false;
150  }
151  }
152  catch (Exception ex)
153  {
154  throw new KineticaException("Error initializing GenericRecordRetriever: " + ex.Message, ex);
155  }
156 
157  _random = new Random((int)DateTime.Now.Ticks);
158  }
159 
160  #endregion
161 
162  #region HA Failover Methods
163 
164  private bool ForceFailover(Uri oldUrl, int oldClusterSwitchCount)
165  {
166  if (_kinetica.HAManager == null || _dbHaRingSize <= 1)
167  {
168  return false;
169  }
170 
171  var newUrl = _kinetica.ForceHAFailover(oldUrl, oldClusterSwitchCount);
172  if (newUrl != null)
173  {
174  bool isClusterHealthy = true;
175 
176  if (_multiHeadEnabled)
177  {
178  try
179  {
180  var newWorkers = new WorkerList(_kinetica);
181  if (newWorkers.Count == 0)
182  {
183  isClusterHealthy = false;
184  }
185  }
186  catch
187  {
188  isClusterHealthy = false;
189  }
190  }
191 
192  if (isClusterHealthy)
193  {
194  lock (_haLock)
195  {
196  _currentHeadNodeUrl = newUrl;
197  _numClusterSwitches = _kinetica.NumClusterSwitches;
198  }
199  return true;
200  }
201  }
202 
203  return false;
204  }
205 
206  private bool UpdateWorkerQueues(int countClusterSwitches, bool doReconstructWorkerUrls)
207  {
208  var reconstructWorkerUrls = doReconstructWorkerUrls && _multiHeadEnabled;
209 
210  try
211  {
212  var shardInfo = _kinetica.adminShowShards();
213  var newShardVersion = shardInfo.version;
214 
215  if (Interlocked.Read(ref _shardVersion) == newShardVersion)
216  {
217  var currNumClusterSwitches = _kinetica.NumClusterSwitches;
218  if (countClusterSwitches == currNumClusterSwitches)
219  {
220  if (reconstructWorkerUrls)
221  {
222  return ReconstructWorkerUrls();
223  }
224  return false;
225  }
226 
227  lock (_haLock)
228  {
229  _numClusterSwitches = currNumClusterSwitches;
230  }
231  }
232 
233  Interlocked.Exchange(ref _shardVersion, newShardVersion);
234  Interlocked.Exchange(ref _shardUpdateTime, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
235 
236  lock (_haLock)
237  {
238  _mutableRoutingTable = shardInfo.rank;
239  _currentHeadNodeUrl = _kinetica.URL;
240  _numClusterSwitches = _kinetica.NumClusterSwitches;
241  }
242 
243  if (reconstructWorkerUrls)
244  {
245  ReconstructWorkerUrls();
246  }
247 
248  return true;
249  }
250  catch (Exception ex) when (Kinetica.IsConnectionError(ex))
251  {
252  return false;
253  }
254  }
255 
256  private bool ReconstructWorkerUrls()
257  {
258  try
259  {
260  var newWorkerList = new WorkerList(_kinetica);
261 
262  if (newWorkerList.Count == 0)
263  {
264  return false;
265  }
266 
267  var newUrls = new List<Uri>();
268  foreach (var workerUrl in newWorkerList)
269  {
270  if (workerUrl == null) continue;
271  newUrls.Add(workerUrl);
272  }
273 
274  lock (_haLock)
275  {
276  _mutableWorkerUrls = newUrls;
277  }
278 
279  return true;
280  }
281  catch
282  {
283  return false;
284  }
285  }
286 
287  private IList<Uri> GetEffectiveWorkerUrls()
288  {
289  return _mutableWorkerUrls ?? _workerUrls;
290  }
291 
292  private IList<int>? GetEffectiveRoutingTable()
293  {
294  return _mutableRoutingTable ?? _routingTable;
295  }
296 
297  private bool HandleConnectionError(Exception ex, long retrievalAttemptTimestamp)
298  {
299  if (!Kinetica.IsConnectionError(ex))
300  {
301  return false;
302  }
303 
304  var currUrl = _currentHeadNodeUrl;
305  var currentCountClusterSwitches = _numClusterSwitches;
306 
307  bool didFailoverSucceed = false;
308 
309  if (currUrl != null && _dbHaRingSize > 1)
310  {
311  didFailoverSucceed = ForceFailover(currUrl, currentCountClusterSwitches);
312  }
313 
314  var updatedWorkerQueues = UpdateWorkerQueues(currentCountClusterSwitches, true);
315 
316  var shardUpdateTime = Interlocked.Read(ref _shardUpdateTime);
317  var retry = didFailoverSucceed || updatedWorkerQueues || retrievalAttemptTimestamp < shardUpdateTime;
318 
319  return retry;
320  }
321 
322  #endregion
323 
324  #region Record Retrieval
325 
338  IDictionary<string, object?> keyValues,
339  string? expression = null,
340  IList<string>? columns = null,
341  long offset = 0,
342  long limit = -9999)
343  {
344  if (_shardKeyBuilder == null)
345  throw new KineticaException("Cannot get by key from unsharded table: " + _tableName);
346 
347  var retrievalAttemptTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
348  var currentCountClusterSwitches = _numClusterSwitches;
349 
350  Exception? lastException = null;
351  int maxRetries = 3;
352 
353  for (int attempt = 0; attempt <= maxRetries; attempt++)
354  {
355  try
356  {
357  // Build the expression from key values
358  string? keyExpression = _shardKeyBuilder.BuildExpression(keyValues);
359  if (keyExpression == null)
360  throw new KineticaException("No expression could be made from given key values.");
361 
362  string fullExpression = keyExpression;
363  if (!string.IsNullOrEmpty(expression))
364  fullExpression = keyExpression + " AND (" + expression + ")";
365 
366  // Build the SQL query
367  string columnList = (columns != null && columns.Count > 0)
368  ? string.Join(", ", columns)
369  : "*";
370 
371  string sql = $"SELECT {columnList} FROM {_tableName} WHERE {fullExpression}";
372 
373  // Execute the SQL query (fast index lookup is automatic for primary/shard key lookups)
374  return _kinetica.executeSql(sql, offset, limit, null, null, null);
375  }
376  catch (Exception ex) when (Kinetica.IsConnectionError(ex))
377  {
378  lastException = ex;
379 
380  if (HandleConnectionError(ex, retrievalAttemptTimestamp))
381  {
382  continue; // Retry after successful failover
383  }
384 
385  throw new KineticaException("Error in retrieving records by key: " + ex.Message, ex);
386  }
387  catch (KineticaException)
388  {
389  throw;
390  }
391  catch (Exception ex)
392  {
393  throw new KineticaException("Error in retrieving records by key: " + ex.Message, ex);
394  }
395  }
396 
397  throw new KineticaException("Error in retrieving records by key after " + maxRetries + " retries: " +
398  (lastException?.Message ?? "Unknown error"), lastException);
399  }
400 
410  string whereExpression,
411  IList<string>? columns = null,
412  long offset = 0,
413  long limit = -9999)
414  {
415  // Build the SQL query
416  string columnList = (columns != null && columns.Count > 0)
417  ? string.Join(", ", columns)
418  : "*";
419 
420  string sql = $"SELECT {columnList} FROM {_tableName} WHERE {whereExpression}";
421 
422  // Execute the SQL query (fast index lookup is automatic for indexed column lookups)
423  return _kinetica.executeSql(sql, offset, limit, null, null, null);
424  }
425 
431  public bool HasAllShardKeyValues(IDictionary<string, object?> keyValues)
432  {
433  if (_shardKeyBuilder == null)
434  return false;
435 
436  foreach (var columnName in ShardKeyColumnNames)
437  {
438  if (!keyValues.ContainsKey(columnName))
439  return false;
440  }
441 
442  return true;
443  }
444 
451  public int GetWorkerIndexForKey(IDictionary<string, object?> keyValues)
452  {
453  if (_shardKeyBuilder == null || !_multiHeadEnabled)
454  return -1;
455 
456  var effectiveRoutingTable = GetEffectiveRoutingTable();
457  if (effectiveRoutingTable == null)
458  return -1;
459 
460  var shardKey = _shardKeyBuilder.Build(keyValues);
461  if (shardKey == null)
462  return -1;
463 
464  return shardKey.route(effectiveRoutingTable);
465  }
466 
467  #endregion
468  }
Kinetica KineticaDB
Gets the Kinetica connection.
int HARingSize
Gets the number of clusters in the HA ring.
Definition: HAFailover.cs:256
Builds expressions and routing keys for GenericRecord and dictionary-based records.
A list of worker URLs to use for multi-head operations.
Definition: WorkerList.cs:20
int HARingSize
Gets the HA ring size.
HAFailoverManager? HAManager
Gets the HA failover manager instance.
Definition: Kinetica.cs:192
int NumClusterSwitches
Gets the number of times the client has switched to a different cluster.
Definition: Kinetica.cs:197
GenericRecordRetriever(Kinetica kdb, string tableName, KineticaType ktype, WorkerList? workers=null)
Create a GenericRecordRetriever for the given table.
bool HasAllShardKeyValues(IDictionary< string, object?> keyValues)
Checks if the given key values contain all required shard key columns.
int GetWorkerIndexForKey(IDictionary< string, object?> keyValues)
Computes the worker index that would handle records with the given key values.
KineticaType KType
Gets the KineticaType for the table.
string TableName
Gets the table name.
ExecuteSqlResponse GetRecordsByKey(IDictionary< string, object?> keyValues, string? expression=null, IList< string >? columns=null, long offset=0, long limit=-9999)
Retrieves records for a given shard key using SQL with fast index lookup.
IList< int > rank
Array of ranks indexed by the shard number.
ExecuteSqlResponse GetRecordsByExpression(string whereExpression, IList< string >? columns=null, long offset=0, long limit=-9999)
Retrieves records using a SQL WHERE clause expression with fast index lookup optimization.
Object that permits efficient retrieval of records from GPUdb using GenericRecord
int NumClusterSwitches
Gets the number of cluster switches due to HA failover.
Failover to clusters in a random order (default)
AdminShowShardsResponse adminShowShards(AdminShowShardsRequest request_)
Show the mapping of shards to the corresponding rank and tom.
bool MultiHeadEnabled
Gets whether multi-head retrieval is enabled.
DateTime in YYYY-MM-DD HH:MM:SS.mmm format
Uri URL
URL for Kinetica Server (including "http:" and port)
Definition: Kinetica.cs:152
IList< string > ShardKeyColumnNames
Gets the shard key column names, or empty if table is not sharded.
A set of results returned by Kinetica.executeSql.
Definition: ExecuteSql.cs:1663