3 using System.Collections.Generic;
4 using System.Threading;
24 private readonly
string _tableName;
27 private readonly
bool _multiHeadEnabled;
28 private readonly
Random _random;
31 private readonly
int _dbHaRingSize;
32 private volatile int _numClusterSwitches;
33 private volatile Uri? _currentHeadNodeUrl;
34 private readonly
object _haLock =
new object();
37 private volatile IList<int>? _mutableRoutingTable;
38 private volatile IList<WorkerQueue<T>>? _mutableWorkerQueues;
39 private long _shardVersion;
40 private long _shardUpdateTime;
43 private readonly IList<int>? _routingTable;
44 private readonly IList<WorkerQueue<T>> _workerQueues;
85 _kinetica = kdb ??
throw new ArgumentNullException(nameof(kdb));
86 _tableName = table_name ??
throw new ArgumentNullException(nameof(table_name));
87 _ktype = ktype ??
throw new ArgumentNullException(nameof(ktype));
92 _currentHeadNodeUrl = kdb.
URL;
96 if (!_shardKeyBuilder.hasKey())
97 _shardKeyBuilder =
null;
100 _workerQueues =
new List<WorkerQueue<T>>();
104 if (workers ==
null || workers.Count == 0)
110 if (workers !=
null && workers.Count > 0)
113 foreach (var worker_url
in workers)
116 if (worker_url ==
null)
continue;
118 string get_records_worker_url_str = worker_url.ToString() +
"get/records";
119 Uri url =
new Uri(get_records_worker_url_str);
121 ((List<WorkerQueue<T>>)_workerQueues).Add(worker_queue);
128 for (
int i = 0; i < _routingTable.Count; ++i)
130 if (_routingTable[i] > _workerQueues.Count)
134 _multiHeadEnabled =
true;
138 string get_records_url_str = kdb.
URL.ToString() +
"get/records";
139 Uri url =
new Uri(get_records_url_str);
141 ((List<WorkerQueue<T>>)_workerQueues).Add(worker_queue);
142 _routingTable =
null;
143 _multiHeadEnabled =
false;
157 #region HA Failover Methods 163 private bool ForceFailover(Uri oldUrl,
int oldClusterSwitchCount)
165 if (_kinetica.HAManager ==
null || _dbHaRingSize <= 1)
171 var newUrl = _kinetica.ForceHAFailover(oldUrl, oldClusterSwitchCount);
175 bool isClusterHealthy =
true;
177 if (_multiHeadEnabled)
182 if (workers.Count == 0)
184 isClusterHealthy =
false;
189 isClusterHealthy =
false;
193 if (isClusterHealthy)
197 _currentHeadNodeUrl = newUrl;
198 _numClusterSwitches = _kinetica.NumClusterSwitches;
210 private bool UpdateWorkerQueues(
int countClusterSwitches,
bool doReconstructWorkerUrls)
212 var reconstructWorkerUrls = doReconstructWorkerUrls && _multiHeadEnabled;
216 var shardInfo = _kinetica.adminShowShards();
217 var newShardVersion = shardInfo.version;
219 if (Interlocked.Read(ref _shardVersion) == newShardVersion)
221 var currNumClusterSwitches = _kinetica.NumClusterSwitches;
222 if (countClusterSwitches == currNumClusterSwitches)
224 if (reconstructWorkerUrls)
226 return ReconstructWorkerUrls();
233 _numClusterSwitches = currNumClusterSwitches;
237 Interlocked.Exchange(ref _shardVersion, newShardVersion);
238 Interlocked.Exchange(ref _shardUpdateTime, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
242 _mutableRoutingTable = shardInfo.rank;
243 _currentHeadNodeUrl = _kinetica.URL;
244 _numClusterSwitches = _kinetica.NumClusterSwitches;
247 if (reconstructWorkerUrls)
249 ReconstructWorkerUrls();
254 catch (Exception ex) when (
Kinetica.IsConnectionError(ex))
263 private bool ReconstructWorkerUrls()
267 var newWorkerList =
new WorkerList(_kinetica);
269 if (newWorkerList.Count == 0)
274 var newQueues =
new List<WorkerQueue<T>>();
275 foreach (var workerUrl
in newWorkerList)
278 if (workerUrl ==
null)
continue;
280 var urlStr = workerUrl.ToString() +
"get/records";
281 var url =
new Uri(urlStr);
287 _mutableWorkerQueues = newQueues;
301 private IList<WorkerQueue<T>> GetEffectiveWorkerQueues()
303 return _mutableWorkerQueues ?? _workerQueues;
309 private IList<int>? GetEffectiveRoutingTable()
311 return _mutableRoutingTable ?? _routingTable;
318 private bool HandleConnectionError(Exception ex,
long retrievalAttemptTimestamp)
320 if (!
Kinetica.IsConnectionError(ex))
325 var currUrl = _currentHeadNodeUrl;
326 var currentCountClusterSwitches = _numClusterSwitches;
328 bool didFailoverSucceed =
false;
330 if (currUrl !=
null && _dbHaRingSize > 1)
332 didFailoverSucceed = ForceFailover(currUrl, currentCountClusterSwitches);
335 var updatedWorkerQueues = UpdateWorkerQueues(currentCountClusterSwitches,
true);
337 var shardUpdateTime = Interlocked.Read(ref _shardUpdateTime);
338 var retry = didFailoverSucceed || updatedWorkerQueues || retrievalAttemptTimestamp < shardUpdateTime;
345 #region Record Retrieval 362 if (_shardKeyBuilder ==
null)
363 throw new KineticaException(
"Cannot get by key from unsharded table: " + _tableName);
365 var retrievalAttemptTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
366 var currentCountClusterSwitches = _numClusterSwitches;
368 Exception? lastException =
null;
371 for (
int attempt = 0; attempt <= maxRetries; attempt++)
376 string? full_expression = _shardKeyBuilder.buildExpression(record);
377 if (full_expression ==
null)
379 if (expression !=
null)
380 full_expression = full_expression +
" and (" + expression +
")";
383 IDictionary<string, string> options =
new Dictionary<string, string>();
393 var effectiveRoutingTable = GetEffectiveRoutingTable();
394 if (effectiveRoutingTable ==
null)
397 return _kinetica.getRecords<T>(request);
406 RecordKey shard_key = _shardKeyBuilder.build(record);
407 var effectiveWorkerQueues = GetEffectiveWorkerQueues();
408 Uri url = effectiveWorkerQueues[shard_key.
route(effectiveRoutingTable)].url;
421 _kinetica.DecodeRawBinaryDataUsingRecordType(_ktype,
423 decoded_response.
data);
424 return decoded_response;
427 catch (Exception ex) when (
Kinetica.IsConnectionError(ex))
432 if (HandleConnectionError(ex, retrievalAttemptTimestamp))
439 throw new KineticaException(
"Error in retrieving records by key: " + ex.Message, ex);
452 throw new KineticaException(
"Error in retrieving records by key after " + maxRetries +
" retries: " +
453 (lastException?.Message ??
"Unknown error"), lastException);
const string FAST_INDEX_LOOKUP
Indicates if indexes should be used to perform the lookup for a given expression if possible.
Object that permits efficient retrieval of records from GPUdb, with support
A list of worker URLs to use for multi-head ingest.
long total_number_of_records
Total/Filtered number of records.
bool has_more_records
Too many records.
int HARingSize
Gets the number of clusters in the HA ring.
A key based on a given record that serves as either a primary key or a shard key.
RecordRetriever(Kinetica kdb, string table_name, KineticaType ktype, WorkerList? workers=null)
Create a RecordRetriever object with the given parameters.
HAFailoverManager? HAManager
Gets the HA failover manager instance.
int NumClusterSwitches
Gets the number of times the client has switched to a different cluster.
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
int HARingSize
Gets the HA ring size.
string table_name
Value of table_name.
Builds or creates RecordKey objects based on a given record.
IList< T > data
If the encoding was 'binary', then this list contains the binary encoded records retrieved from the t...
long total_number_of_records
Total/Filtered number of records.
A set of string constants for the parameter options.
IList< byte[]> records_binary
If the encoding was 'binary', then this list contains the binary encoded records retrieved from the t...
GetRecordsResponse< T > getRecordsByKey(T record, string? expression=null)
Retrieves records for a given shard key, optionally further limited by an additional expression.
string type_schema
Avro schema of records_binary or records_json.
bool has_more_records
Too many records.
A set of results returned by Kinetica.getRecords.
const string EXPRESSION
Filter expression to apply to the table.
Kinetica KineticaDB
Gets the Kinetica connection.
string table_name
Value of table_name.
IList< int > rank
Array of ranks indexed by the shard number.
A set of parameters for Kinetica.getRecords.
int NumClusterSwitches
Gets the number of cluster switches due to HA failover.
A set of results returned by Kinetica.getRecords.
string TableName
Gets the table name.
Failover to clusters in a random order (default)
AdminShowShardsResponse adminShowShards(AdminShowShardsRequest request_)
Show the mapping of shards to the corresponding rank and tom.
int route(IList< int > routingTable)
Given a routing table consisting of worker rank indices, choose a worker rank based on the hash of th...
DateTime in YYYY-MM-DD HH:MM:SS.mmm format
string type_schema
Avro schema of data or records_json.
Uri URL
URL for Kinetica Server (including "http:" and port)