2 using System.Collections.Generic;
3 using System.Threading;
23 private readonly
string _tableName;
26 private readonly
bool _multiHeadEnabled;
27 private readonly
Random _random;
30 private readonly
int _dbHaRingSize;
31 private volatile int _numClusterSwitches;
32 private volatile Uri? _currentHeadNodeUrl;
33 private readonly
object _haLock =
new object();
36 private volatile IList<int>? _mutableRoutingTable;
37 private volatile IList<Uri>? _mutableWorkerUrls;
38 private long _shardVersion;
39 private long _shardUpdateTime;
42 private readonly IList<int>? _routingTable;
43 private readonly IList<Uri> _workerUrls;
82 public IList<string>
ShardKeyColumnNames => _shardKeyBuilder?.GetRoutingColumnNames() ??
new List<string>();
99 _kinetica = kdb ??
throw new ArgumentNullException(nameof(kdb));
100 _tableName = tableName ??
throw new ArgumentNullException(nameof(tableName));
101 _ktype = ktype ??
throw new ArgumentNullException(nameof(ktype));
106 _currentHeadNodeUrl = kdb.
URL;
110 if (!_shardKeyBuilder.HasKey())
111 _shardKeyBuilder =
null;
114 _workerUrls =
new List<Uri>();
118 if (workers ==
null || workers.Count == 0)
124 if (workers !=
null && workers.Count > 0)
126 foreach (var workerUrl
in workers)
128 if (workerUrl ==
null)
continue;
129 ((List<Uri>)_workerUrls).Add(workerUrl);
136 for (
int i = 0; i < _routingTable.Count; ++i)
138 if (_routingTable[i] > _workerUrls.Count)
142 _multiHeadEnabled =
true;
147 ((List<Uri>)_workerUrls).Add(kdb.
URL);
148 _routingTable =
null;
149 _multiHeadEnabled =
false;
154 throw new KineticaException(
"Error initializing GenericRecordRetriever: " + ex.Message, ex);
162 #region HA Failover Methods 164 private bool ForceFailover(Uri oldUrl,
int oldClusterSwitchCount)
166 if (_kinetica.HAManager ==
null || _dbHaRingSize <= 1)
171 var newUrl = _kinetica.ForceHAFailover(oldUrl, oldClusterSwitchCount);
174 bool isClusterHealthy =
true;
176 if (_multiHeadEnabled)
181 if (newWorkers.Count == 0)
183 isClusterHealthy =
false;
188 isClusterHealthy =
false;
192 if (isClusterHealthy)
196 _currentHeadNodeUrl = newUrl;
197 _numClusterSwitches = _kinetica.NumClusterSwitches;
206 private bool UpdateWorkerQueues(
int countClusterSwitches,
bool doReconstructWorkerUrls)
208 var reconstructWorkerUrls = doReconstructWorkerUrls && _multiHeadEnabled;
212 var shardInfo = _kinetica.adminShowShards();
213 var newShardVersion = shardInfo.version;
215 if (Interlocked.Read(ref _shardVersion) == newShardVersion)
217 var currNumClusterSwitches = _kinetica.NumClusterSwitches;
218 if (countClusterSwitches == currNumClusterSwitches)
220 if (reconstructWorkerUrls)
222 return ReconstructWorkerUrls();
229 _numClusterSwitches = currNumClusterSwitches;
233 Interlocked.Exchange(ref _shardVersion, newShardVersion);
234 Interlocked.Exchange(ref _shardUpdateTime, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
238 _mutableRoutingTable = shardInfo.rank;
239 _currentHeadNodeUrl = _kinetica.URL;
240 _numClusterSwitches = _kinetica.NumClusterSwitches;
243 if (reconstructWorkerUrls)
245 ReconstructWorkerUrls();
250 catch (Exception ex) when (
Kinetica.IsConnectionError(ex))
256 private bool ReconstructWorkerUrls()
260 var newWorkerList =
new WorkerList(_kinetica);
262 if (newWorkerList.Count == 0)
267 var newUrls =
new List<Uri>();
268 foreach (var workerUrl
in newWorkerList)
270 if (workerUrl ==
null)
continue;
271 newUrls.Add(workerUrl);
276 _mutableWorkerUrls = newUrls;
287 private IList<Uri> GetEffectiveWorkerUrls()
289 return _mutableWorkerUrls ?? _workerUrls;
292 private IList<int>? GetEffectiveRoutingTable()
294 return _mutableRoutingTable ?? _routingTable;
297 private bool HandleConnectionError(Exception ex,
long retrievalAttemptTimestamp)
299 if (!
Kinetica.IsConnectionError(ex))
304 var currUrl = _currentHeadNodeUrl;
305 var currentCountClusterSwitches = _numClusterSwitches;
307 bool didFailoverSucceed =
false;
309 if (currUrl !=
null && _dbHaRingSize > 1)
311 didFailoverSucceed = ForceFailover(currUrl, currentCountClusterSwitches);
314 var updatedWorkerQueues = UpdateWorkerQueues(currentCountClusterSwitches,
true);
316 var shardUpdateTime = Interlocked.Read(ref _shardUpdateTime);
317 var retry = didFailoverSucceed || updatedWorkerQueues || retrievalAttemptTimestamp < shardUpdateTime;
324 #region Record Retrieval 338 IDictionary<string, object?> keyValues,
339 string? expression =
null,
340 IList<string>? columns =
null,
344 if (_shardKeyBuilder ==
null)
345 throw new KineticaException(
"Cannot get by key from unsharded table: " + _tableName);
347 var retrievalAttemptTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
348 var currentCountClusterSwitches = _numClusterSwitches;
350 Exception? lastException =
null;
353 for (
int attempt = 0; attempt <= maxRetries; attempt++)
358 string? keyExpression = _shardKeyBuilder.BuildExpression(keyValues);
359 if (keyExpression ==
null)
360 throw new KineticaException(
"No expression could be made from given key values.");
362 string fullExpression = keyExpression;
363 if (!
string.IsNullOrEmpty(expression))
364 fullExpression = keyExpression +
" AND (" + expression +
")";
367 string columnList = (columns !=
null && columns.Count > 0)
368 ?
string.Join(
", ", columns)
371 string sql = $
"SELECT {columnList} FROM {_tableName} WHERE {fullExpression}";
374 return _kinetica.executeSql(sql, offset, limit,
null,
null,
null);
376 catch (Exception ex) when (
Kinetica.IsConnectionError(ex))
380 if (HandleConnectionError(ex, retrievalAttemptTimestamp))
385 throw new KineticaException(
"Error in retrieving records by key: " + ex.Message, ex);
393 throw new KineticaException(
"Error in retrieving records by key: " + ex.Message, ex);
397 throw new KineticaException(
"Error in retrieving records by key after " + maxRetries +
" retries: " +
398 (lastException?.Message ??
"Unknown error"), lastException);
410 string whereExpression,
411 IList<string>? columns =
null,
416 string columnList = (columns !=
null && columns.Count > 0)
417 ?
string.Join(
", ", columns)
420 string sql = $
"SELECT {columnList} FROM {_tableName} WHERE {whereExpression}";
423 return _kinetica.executeSql(sql, offset, limit,
null,
null,
null);
433 if (_shardKeyBuilder ==
null)
438 if (!keyValues.ContainsKey(columnName))
453 if (_shardKeyBuilder ==
null || !_multiHeadEnabled)
456 var effectiveRoutingTable = GetEffectiveRoutingTable();
457 if (effectiveRoutingTable ==
null)
460 var shardKey = _shardKeyBuilder.Build(keyValues);
461 if (shardKey ==
null)
464 return shardKey.route(effectiveRoutingTable);
Kinetica KineticaDB
Gets the Kinetica connection.
int HARingSize
Gets the number of clusters in the HA ring.
Builds expressions and routing keys for GenericRecord and dictionary-based records.
A list of worker URLs to use for multi-head operations.
int HARingSize
Gets the HA ring size.
HAFailoverManager? HAManager
Gets the HA failover manager instance.
int NumClusterSwitches
Gets the number of times the client has switched to a different cluster.
GenericRecordRetriever(Kinetica kdb, string tableName, KineticaType ktype, WorkerList? workers=null)
Create a GenericRecordRetriever for the given table.
bool HasAllShardKeyValues(IDictionary< string, object?> keyValues)
Checks if the given key values contain all required shard key columns.
int GetWorkerIndexForKey(IDictionary< string, object?> keyValues)
Computes the worker index that would handle records with the given key values.
KineticaType KType
Gets the KineticaType for the table.
string TableName
Gets the table name.
ExecuteSqlResponse GetRecordsByKey(IDictionary< string, object?> keyValues, string? expression=null, IList< string >? columns=null, long offset=0, long limit=-9999)
Retrieves records for a given shard key using SQL with fast index lookup.
IList< int > rank
Array of ranks indexed by the shard number.
ExecuteSqlResponse GetRecordsByExpression(string whereExpression, IList< string >? columns=null, long offset=0, long limit=-9999)
Retrieves records using a SQL WHERE clause expression with fast index lookup optimization.
Object that permits efficient retrieval of records from GPUdb using GenericRecord
int NumClusterSwitches
Gets the number of cluster switches due to HA failover.
Failover to clusters in a random order (default)
AdminShowShardsResponse adminShowShards(AdminShowShardsRequest request_)
Show the mapping of shards to the corresponding rank and tom.
bool MultiHeadEnabled
Gets whether multi-head retrieval is enabled.
DateTime in YYYY-MM-DD HH:MM:SS.mmm format
Uri URL
URL for Kinetica Server (including "http:" and port)
IList< string > ShardKeyColumnNames
Gets the shard key column names, or empty if table is not sharded.
A set of results returned by Kinetica.executeSql.