Kinetica   C#   API  Version 7.2.3.1
RecordRetriever.cs
Go to the documentation of this file.
1 using Avro.IO;
2 using System;
3 using System.Collections.Generic;
4 using System.Threading;
5 using kinetica.Utils;
6 
7 namespace kinetica;
8 
19  public class RecordRetriever<T> where T : new()
20  {
21  #region Fields
22 
23  private readonly Kinetica _kinetica;
24  private readonly string _tableName;
25  private readonly KineticaType _ktype;
26  private readonly RecordKeyBuilder<T>? _shardKeyBuilder;
27  private readonly bool _multiHeadEnabled;
28  private readonly Random _random;
29 
30  // HA Failover fields
31  private readonly int _dbHaRingSize;
32  private volatile int _numClusterSwitches;
33  private volatile Uri? _currentHeadNodeUrl;
34  private readonly object _haLock = new object();
35 
36  // Mutable state for HA failover
37  private volatile IList<int>? _mutableRoutingTable;
38  private volatile IList<WorkerQueue<T>>? _mutableWorkerQueues;
39  private long _shardVersion;
40  private long _shardUpdateTime;
41 
42  // Immutable initial state
43  private readonly IList<int>? _routingTable;
44  private readonly IList<WorkerQueue<T>> _workerQueues;
45 
46  #endregion
47 
48  #region Properties
49 
53  public Kinetica KineticaDB => _kinetica;
54 
58  public string TableName => _tableName;
59 
63  public int NumClusterSwitches => _numClusterSwitches;
64 
68  public int HARingSize => _dbHaRingSize;
69 
70  #endregion
71 
72  #region Constructor
73 
81  public RecordRetriever(Kinetica kdb, string table_name,
82  KineticaType ktype,
83  WorkerList? workers = null)
84  {
85  _kinetica = kdb ?? throw new ArgumentNullException(nameof(kdb));
86  _tableName = table_name ?? throw new ArgumentNullException(nameof(table_name));
87  _ktype = ktype ?? throw new ArgumentNullException(nameof(ktype));
88 
89  // Initialize HA state
90  _dbHaRingSize = kdb.HAManager?.HARingSize ?? 1;
91  _numClusterSwitches = kdb.NumClusterSwitches;
92  _currentHeadNodeUrl = kdb.URL;
93 
94  // Set up the shard key builder
95  _shardKeyBuilder = new RecordKeyBuilder<T>(false, _ktype);
96  if (!_shardKeyBuilder.hasKey())
97  _shardKeyBuilder = null;
98 
99  // Set up the worker queues
100  _workerQueues = new List<WorkerQueue<T>>();
101  try
102  {
103  // If no workers are given, try to get them from Kinetica
104  if (workers == null || workers.Count == 0)
105  {
106  workers = new WorkerList(kdb);
107  }
108 
109  // If we end up with multiple workers, use those
110  if (workers != null && workers.Count > 0)
111  {
112  // Add worker queues per worker
113  foreach (var worker_url in workers)
114  {
115  // Skip removed ranks (null URLs)
116  if (worker_url == null) continue;
117 
118  string get_records_worker_url_str = worker_url.ToString() + "get/records";
119  Uri url = new Uri(get_records_worker_url_str);
120  WorkerQueue<T> worker_queue = new WorkerQueue<T>(url);
121  ((List<WorkerQueue<T>>)_workerQueues).Add(worker_queue);
122  }
123 
124  // Get the worker rank information from Kinetica
125  _routingTable = kdb.adminShowShards().rank;
126 
127  // Check that enough worker URLs are specified
128  for (int i = 0; i < _routingTable.Count; ++i)
129  {
130  if (_routingTable[i] > _workerQueues.Count)
131  throw new KineticaException("Not enough worker URLs specified.");
132  }
133 
134  _multiHeadEnabled = true;
135  }
136  else // multihead is NOT turned on; use the regular Kinetica IP address
137  {
138  string get_records_url_str = kdb.URL.ToString() + "get/records";
139  Uri url = new Uri(get_records_url_str);
140  WorkerQueue<T> worker_queue = new WorkerQueue<T>(url);
141  ((List<WorkerQueue<T>>)_workerQueues).Add(worker_queue);
142  _routingTable = null;
143  _multiHeadEnabled = false;
144  }
145  }
146  catch (Exception ex)
147  {
148  throw new KineticaException(ex.ToString());
149  }
150 
151  // Create the random number generator
152  _random = new Random((int)DateTime.Now.Ticks);
153  }
154 
155  #endregion
156 
157  #region HA Failover Methods
158 
163  private bool ForceFailover(Uri oldUrl, int oldClusterSwitchCount)
164  {
165  if (_kinetica.HAManager == null || _dbHaRingSize <= 1)
166  {
167  return false;
168  }
169 
170  // Use Kinetica's centralized failover method
171  var newUrl = _kinetica.ForceHAFailover(oldUrl, oldClusterSwitchCount);
172  if (newUrl != null)
173  {
174  // Check worker ranks if multi-head is enabled
175  bool isClusterHealthy = true;
176 
177  if (_multiHeadEnabled)
178  {
179  try
180  {
181  var workers = new WorkerList(_kinetica);
182  if (workers.Count == 0)
183  {
184  isClusterHealthy = false;
185  }
186  }
187  catch
188  {
189  isClusterHealthy = false;
190  }
191  }
192 
193  if (isClusterHealthy)
194  {
195  lock (_haLock)
196  {
197  _currentHeadNodeUrl = newUrl;
198  _numClusterSwitches = _kinetica.NumClusterSwitches;
199  }
200  return true;
201  }
202  }
203 
204  return false;
205  }
206 
210  private bool UpdateWorkerQueues(int countClusterSwitches, bool doReconstructWorkerUrls)
211  {
212  var reconstructWorkerUrls = doReconstructWorkerUrls && _multiHeadEnabled;
213 
214  try
215  {
216  var shardInfo = _kinetica.adminShowShards();
217  var newShardVersion = shardInfo.version;
218 
219  if (Interlocked.Read(ref _shardVersion) == newShardVersion)
220  {
221  var currNumClusterSwitches = _kinetica.NumClusterSwitches;
222  if (countClusterSwitches == currNumClusterSwitches)
223  {
224  if (reconstructWorkerUrls)
225  {
226  return ReconstructWorkerUrls();
227  }
228  return false;
229  }
230 
231  lock (_haLock)
232  {
233  _numClusterSwitches = currNumClusterSwitches;
234  }
235  }
236 
237  Interlocked.Exchange(ref _shardVersion, newShardVersion);
238  Interlocked.Exchange(ref _shardUpdateTime, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
239 
240  lock (_haLock)
241  {
242  _mutableRoutingTable = shardInfo.rank;
243  _currentHeadNodeUrl = _kinetica.URL;
244  _numClusterSwitches = _kinetica.NumClusterSwitches;
245  }
246 
247  if (reconstructWorkerUrls)
248  {
249  ReconstructWorkerUrls();
250  }
251 
252  return true;
253  }
254  catch (Exception ex) when (Kinetica.IsConnectionError(ex))
255  {
256  return false;
257  }
258  }
259 
263  private bool ReconstructWorkerUrls()
264  {
265  try
266  {
267  var newWorkerList = new WorkerList(_kinetica);
268 
269  if (newWorkerList.Count == 0)
270  {
271  return false;
272  }
273 
274  var newQueues = new List<WorkerQueue<T>>();
275  foreach (var workerUrl in newWorkerList)
276  {
277  // Skip removed ranks (null URLs)
278  if (workerUrl == null) continue;
279 
280  var urlStr = workerUrl.ToString() + "get/records";
281  var url = new Uri(urlStr);
282  newQueues.Add(new WorkerQueue<T>(url));
283  }
284 
285  lock (_haLock)
286  {
287  _mutableWorkerQueues = newQueues;
288  }
289 
290  return true;
291  }
292  catch
293  {
294  return false;
295  }
296  }
297 
301  private IList<WorkerQueue<T>> GetEffectiveWorkerQueues()
302  {
303  return _mutableWorkerQueues ?? _workerQueues;
304  }
305 
309  private IList<int>? GetEffectiveRoutingTable()
310  {
311  return _mutableRoutingTable ?? _routingTable;
312  }
313 
318  private bool HandleConnectionError(Exception ex, long retrievalAttemptTimestamp)
319  {
320  if (!Kinetica.IsConnectionError(ex))
321  {
322  return false;
323  }
324 
325  var currUrl = _currentHeadNodeUrl;
326  var currentCountClusterSwitches = _numClusterSwitches;
327 
328  bool didFailoverSucceed = false;
329 
330  if (currUrl != null && _dbHaRingSize > 1)
331  {
332  didFailoverSucceed = ForceFailover(currUrl, currentCountClusterSwitches);
333  }
334 
335  var updatedWorkerQueues = UpdateWorkerQueues(currentCountClusterSwitches, true);
336 
337  var shardUpdateTime = Interlocked.Read(ref _shardUpdateTime);
338  var retry = didFailoverSucceed || updatedWorkerQueues || retrievalAttemptTimestamp < shardUpdateTime;
339 
340  return retry;
341  }
342 
343  #endregion
344 
345  #region Record Retrieval
346 
360  public GetRecordsResponse<T> getRecordsByKey(T record, string? expression = null)
361  {
362  if (_shardKeyBuilder == null)
363  throw new KineticaException("Cannot get by key from unsharded table: " + _tableName);
364 
365  var retrievalAttemptTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
366  var currentCountClusterSwitches = _numClusterSwitches;
367 
368  Exception? lastException = null;
369  int maxRetries = 3;
370 
371  for (int attempt = 0; attempt <= maxRetries; attempt++)
372  {
373  try
374  {
375  // Build the expression
376  string? full_expression = _shardKeyBuilder.buildExpression(record);
377  if (full_expression == null)
378  throw new KineticaException("No expression could be made from given record.");
379  if (expression != null)
380  full_expression = full_expression + " and (" + expression + ")";
381 
382  // Create the options map for the /get/records call
383  IDictionary<string, string> options = new Dictionary<string, string>();
384  options[GetRecordsRequest.Options.EXPRESSION] = full_expression;
386 
387  // Create a /get/records request packet
388  GetRecordsRequest request = new GetRecordsRequest(_tableName,
389  0, Kinetica.END_OF_SET,
390  options);
391 
392  // Submit the /get/records request
393  var effectiveRoutingTable = GetEffectiveRoutingTable();
394  if (effectiveRoutingTable == null)
395  {
396  // No routing information is available; talk to rank-0
397  return _kinetica.getRecords<T>(request);
398  }
399  else
400  {
401  // Talk to the appropriate worker rank
402  RawGetRecordsResponse raw_response = new RawGetRecordsResponse();
403  GetRecordsResponse<T> decoded_response = new GetRecordsResponse<T>();
404 
405  // Find the appropriate worker rank
406  RecordKey shard_key = _shardKeyBuilder.build(record);
407  var effectiveWorkerQueues = GetEffectiveWorkerQueues();
408  Uri url = effectiveWorkerQueues[shard_key.route(effectiveRoutingTable)].url;
409 
410  // Make the call using SubmitRequestRaw for direct URL (no HA failover - handled by retriever)
411  raw_response = _kinetica.SubmitRequestRaw<RawGetRecordsResponse>(url, request);
412 
413  // Set up the values of the decoded response properly
414  decoded_response.table_name = raw_response.table_name;
415  decoded_response.type_name = raw_response.type_name;
416  decoded_response.type_schema = raw_response.type_schema;
417  decoded_response.has_more_records = raw_response.has_more_records;
418  decoded_response.total_number_of_records = raw_response.total_number_of_records;
419 
420  // Decode the records
421  _kinetica.DecodeRawBinaryDataUsingRecordType(_ktype,
422  raw_response.records_binary,
423  decoded_response.data);
424  return decoded_response;
425  }
426  }
427  catch (Exception ex) when (Kinetica.IsConnectionError(ex))
428  {
429  lastException = ex;
430 
431  // Attempt HA failover using centralized handling
432  if (HandleConnectionError(ex, retrievalAttemptTimestamp))
433  {
434  // Retry after successful failover
435  continue;
436  }
437 
438  // No successful failover, throw the exception
439  throw new KineticaException("Error in retrieving records by key: " + ex.Message, ex);
440  }
441  catch (KineticaException ex)
442  {
443  throw new KineticaException("Error in retrieving records by key: ", ex);
444  }
445  catch (Exception ex)
446  {
447  throw new KineticaException("Error in retrieving records by key: ", ex);
448  }
449  }
450 
451  // All retries exhausted
452  throw new KineticaException("Error in retrieving records by key after " + maxRetries + " retries: " +
453  (lastException?.Message ?? "Unknown error"), lastException);
454  }
455 
456  #endregion
457  }
const string FAST_INDEX_LOOKUP
Indicates if indexes should be used to perform the lookup for a given expression if possible.
Definition: GetRecords.cs:60
Object that permits efficient retrieval of records from GPUdb, with support
A list of worker URLs to use for multi-head ingest.
Definition: WorkerList.cs:11
long total_number_of_records
Total/Filtered number of records.
Definition: GetRecords.cs:458
bool has_more_records
Too many records.
Definition: GetRecords.cs:429
int HARingSize
Gets the number of clusters in the HA ring.
Definition: HAFailover.cs:256
A key based on a given record that serves as either a primary key or a shard key.
Definition: RecordKey.cs:13
RecordRetriever(Kinetica kdb, string table_name, KineticaType ktype, WorkerList? workers=null)
Create a RecordRetriever object with the given parameters.
HAFailoverManager? HAManager
Gets the HA failover manager instance.
Definition: Kinetica.cs:192
int NumClusterSwitches
Gets the number of times the client has switched to a different cluster.
Definition: Kinetica.cs:197
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
int HARingSize
Gets the HA ring size.
string table_name
Value of table_name.
Definition: GetRecords.cs:401
Builds or creates RecordKey objects based on a given record.
IList< T > data
If the encoding was 'binary', then this list contains the binary encoded records retrieved from the t...
Definition: GetRecords.cs:455
long total_number_of_records
Total/Filtered number of records.
Definition: GetRecords.cs:425
A set of string constants for the parameter options.
Definition: GetRecords.cs:42
IList< byte[]> records_binary
If the encoding was 'binary', then this list contains the binary encoded records retrieved from the t...
Definition: GetRecords.cs:412
GetRecordsResponse< T > getRecordsByKey(T record, string? expression=null)
Retrieves records for a given shard key, optionally further limited by an additional expression.
string type_schema
Avro schema of records_binary or records_json.
Definition: GetRecords.cs:407
bool has_more_records
Too many records.
Definition: GetRecords.cs:462
A set of results returned by Kinetica.getRecords.
Definition: GetRecords.cs:397
const string EXPRESSION
Filter expression to apply to the table.
Definition: GetRecords.cs:45
Kinetica KineticaDB
Gets the Kinetica connection.
string table_name
Value of table_name.
Definition: GetRecords.cs:444
IList< int > rank
Array of ranks indexed by the shard number.
A set of parameters for Kinetica.getRecords.
Definition: GetRecords.cs:24
int NumClusterSwitches
Gets the number of cluster switches due to HA failover.
A set of results returned by Kinetica.getRecords.
Definition: GetRecords.cs:440
string TableName
Gets the table name.
Failover to clusters in a random order (default)
AdminShowShardsResponse adminShowShards(AdminShowShardsRequest request_)
Show the mapping of shards to the corresponding rank and tom.
int route(IList< int > routingTable)
Given a routing table consisting of worker rank indices, choose a worker rank based on the hash of th...
Definition: RecordKey.cs:900
DateTime in YYYY-MM-DD HH:MM:SS.mmm format
string type_schema
Avro schema of data or records_json.
Definition: GetRecords.cs:450
Uri URL
URL for Kinetica Server (including "http:" and port)
Definition: Kinetica.cs:152