2 using System.Collections.Generic;
4 using System.Text.RegularExpressions;
5 using System.Threading;
71 Uri activeHeadNodeUrl,
72 IDictionary<string, string> systemProperties,
73 IList<Uri> workerRankUrls,
74 HashSet<string> hostNames,
80 HostNames = hostNames ??
new HashSet<string>();
85 UpdateHostnamesBasedOnRankUrls();
102 var builder =
new UriBuilder(activeHeadNodeUrl)
104 Port = hostManagerPort,
110 UpdateHostnamesBasedOnRankUrls();
117 private void UpdateHostnamesBasedOnRankUrls()
133 if (workerUrl !=
null && !
string.IsNullOrEmpty(workerUrl.Host))
159 var workerUrls =
string.Join(
", ",
WorkerRankUrls.Select(u => u.ToString()));
160 var hostnames =
string.Join(
", ",
HostNames);
161 return $
"{{ activeHeadNodeUrl: {ActiveHeadNodeUrl}, workerRankUrls: [{workerUrls}], hostNames: [{hostnames}], hostManagerUrl: {HostManagerUrl}, isPrimaryCluster: {IsPrimaryCluster} }}";
196 private const string SYSTEM_PROP_ENABLE_HTTPD =
"conf.enable_httpd_proxy";
197 private const string SYSTEM_PROP_ENABLE_MH =
"conf.enable_worker_http_servers";
198 private const string SYSTEM_PROP_NUM_HOSTS =
"conf.number_of_hosts";
199 private const string SYSTEM_PROP_HEAD_NODE_URLS =
"conf.ha_ring_head_nodes_full";
200 private const string SYSTEM_PROP_SERVER_URLS =
"conf.worker_http_server_urls";
201 private const string SYSTEM_PROP_ENABLE_HA =
"conf.enable_ha";
202 private const string SYSTEM_PROP_TRUE =
"TRUE";
204 private readonly List<ClusterAddressInfo> _hostAddresses;
205 private readonly List<int> _haUrlIndices;
206 private string _primaryUrlHostname;
207 private int _currentClusterIndexPointer;
208 private int _numClusterSwitches;
209 private readonly
object _lock =
new object();
241 _hostAddresses =
new List<ClusterAddressInfo>();
242 _haUrlIndices =
new List<int>();
243 _primaryUrlHostname =
string.Empty;
244 _currentClusterIndexPointer = 0;
245 _numClusterSwitches = 0;
261 return _hostAddresses.Count;
275 return _numClusterSwitches;
287 return new List<ClusterAddressInfo>(_hostAddresses);
298 return _hostAddresses.Select(h => h.ActiveHeadNodeUrl).ToList();
310 .Where(i => i < _hostAddresses.Count)
311 .Select(i => _hostAddresses[i].ActiveHeadNodeUrl)
323 if (_hostAddresses.Count == 0)
326 if (_hostAddresses.Count == 1)
327 return _hostAddresses[0];
329 if (_currentClusterIndexPointer < _haUrlIndices.Count)
331 int index = _haUrlIndices[_currentClusterIndexPointer];
332 if (index < _hostAddresses.Count)
333 return _hostAddresses[index];
355 _hostAddresses.Add(clusterInfo);
368 _hostAddresses.Clear();
369 _haUrlIndices.Clear();
370 _currentClusterIndexPointer = 0;
371 _numClusterSwitches = 0;
373 var urlQueue =
new Queue<Uri>(urls);
374 int numUserGivenUrls = urlQueue.Count;
375 int numProcessedUrls = 0;
376 var clusterIndicesOfUserGivenUrls =
new List<int>();
378 while (urlQueue.Count > 0)
380 var url = urlQueue.Dequeue();
381 bool isUserGivenUrl = numProcessedUrls < numUserGivenUrls;
385 int? existingIndex = GetIndexOfClusterContainingNode(url.Host);
386 if (existingIndex.HasValue)
389 clusterIndicesOfUserGivenUrls.Add(existingIndex.Value);
401 var systemProps =
kinetica.showSystemProperties().property_map;
402 clusterInfo = CreateClusterAddressInfo(url, systemProps);
405 var haRingUrls = GetHARingHeadNodeUrls(systemProps);
406 foreach (var haUrl
in haRingUrls)
408 if (!GetIndexOfClusterContainingNode(haUrl.Host).HasValue &&
409 !urlQueue.Contains(haUrl))
411 urlQueue.Enqueue(haUrl);
427 clusterIndicesOfUserGivenUrls.Add(_hostAddresses.Count);
429 _hostAddresses.Add(clusterInfo);
432 if (_hostAddresses.Count == 0)
436 if (_hostAddresses.Count == 1)
438 _hostAddresses[0].IsPrimaryCluster =
true;
439 _primaryUrlHostname = _hostAddresses[0].ActiveHeadNodeUrl.Host;
444 if (
string.IsNullOrEmpty(_primaryUrlHostname))
446 var uniqueIndices =
new HashSet<int>(clusterIndicesOfUserGivenUrls);
447 if (uniqueIndices.Count == 1 && clusterIndicesOfUserGivenUrls.Count > 0)
449 int primaryIndex = clusterIndicesOfUserGivenUrls[0];
450 _primaryUrlHostname = _hostAddresses[primaryIndex].ActiveHeadNodeUrl.Host;
456 if (!
string.IsNullOrEmpty(_primaryUrlHostname))
458 int? primaryIndex = GetIndexOfClusterContainingNode(_primaryUrlHostname);
459 if (primaryIndex.HasValue)
461 _hostAddresses[primaryIndex.Value].IsPrimaryCluster =
true;
462 if (primaryIndex.Value > 0)
464 var primary = _hostAddresses[primaryIndex.Value];
465 _hostAddresses.RemoveAt(primaryIndex.Value);
466 _hostAddresses.Insert(0, primary);
479 private int? GetIndexOfClusterContainingNode(
string hostname)
481 for (
int i = 0; i < _hostAddresses.Count; i++)
483 if (_hostAddresses[i].DoesClusterContainNode(hostname))
492 private ClusterAddressInfo CreateClusterAddressInfo(Uri url, IDictionary<string, string> systemProperties)
494 bool isHttpdEnabled = GetSystemPropertyBool(systemProperties, SYSTEM_PROP_ENABLE_HTTPD);
495 bool isMultiHeadEnabled = GetSystemPropertyBool(systemProperties, SYSTEM_PROP_ENABLE_MH);
498 var rankUrls =
new List<Uri>();
499 if (isMultiHeadEnabled && systemProperties.TryGetValue(SYSTEM_PROP_SERVER_URLS, out var serverUrlsStr))
501 rankUrls = ParseRankUrls(serverUrlsStr);
505 Uri activeHeadNodeUrl;
506 if (rankUrls.Count > 0)
508 activeHeadNodeUrl = rankUrls[0];
509 rankUrls.RemoveAt(0);
513 activeHeadNodeUrl = url;
517 var hostNames = GetHostnamesFromSystemProperties(systemProperties);
521 if (isHttpdEnabled && !
string.IsNullOrEmpty(activeHeadNodeUrl.AbsolutePath) && activeHeadNodeUrl.AbsolutePath !=
"/")
523 var builder =
new UriBuilder(activeHeadNodeUrl)
525 Path =
"/gpudb-host-manager" 527 hostManagerUrl = builder.Uri;
531 var builder =
new UriBuilder(activeHeadNodeUrl)
536 hostManagerUrl = builder.Uri;
539 return new ClusterAddressInfo(activeHeadNodeUrl, systemProperties, rankUrls, hostNames, hostManagerUrl);
545 private List<Uri> ParseRankUrls(
string serverUrlsStr)
547 var result =
new List<Uri>();
548 var rankEntries = serverUrlsStr.Split(
';');
550 foreach (var entry
in rankEntries)
552 if (
string.IsNullOrEmpty(entry))
555 var urls = entry.Split(
',');
556 foreach (var urlStr
in urls)
558 if (Uri.TryCreate(urlStr.Trim(), UriKind.Absolute, out var parsedUrl))
563 result.Add(parsedUrl);
576 private HashSet<string> GetHostnamesFromSystemProperties(IDictionary<string, string> systemProperties)
578 var hostnames =
new HashSet<string>();
580 if (!systemProperties.TryGetValue(SYSTEM_PROP_NUM_HOSTS, out var numHostsStr) ||
581 !
int.TryParse(numHostsStr, out
int numHosts))
586 for (
int i = 0; i < numHosts; i++)
588 string key = $
"conf.host{i}_public_urls";
589 if (systemProperties.TryGetValue(key, out var hostnameStr))
591 var hostUrls = hostnameStr.Split(
',');
592 foreach (var hostname
in hostUrls)
594 var host = hostname.Trim();
596 int idx = host.IndexOf(
"://");
598 host = host.Substring(idx + 3);
616 private List<Uri> GetHARingHeadNodeUrls(IDictionary<string, string> systemProperties)
618 var haUrls =
new List<Uri>();
620 if (!GetSystemPropertyBool(systemProperties, SYSTEM_PROP_ENABLE_HA))
623 if (!systemProperties.TryGetValue(SYSTEM_PROP_HEAD_NODE_URLS, out var haRingStr) ||
624 string.IsNullOrEmpty(haRingStr))
629 var clusterEntries = haRingStr.Split(
';');
630 foreach (var entry
in clusterEntries)
632 if (
string.IsNullOrEmpty(entry))
635 var urls = entry.Split(
',');
636 foreach (var urlStr
in urls)
638 if (Uri.TryCreate(urlStr.Trim(), UriKind.Absolute, out var parsedUrl))
643 haUrls.Add(parsedUrl);
656 private static bool GetSystemPropertyBool(IDictionary<string, string> properties,
string key)
658 return properties.TryGetValue(key, out var value) &&
659 value.Equals(SYSTEM_PROP_TRUE, StringComparison.OrdinalIgnoreCase);
665 private void RandomizeUrls()
667 _haUrlIndices.Clear();
668 for (
int i = 0; i < _hostAddresses.Count; i++)
670 _haUrlIndices.Add(i);
677 int n = _haUrlIndices.Count;
678 for (
int i = n - 1; i > 1; i--)
680 int j = rng.Next(1, i + 1);
681 (_haUrlIndices[i], _haUrlIndices[j]) = (_haUrlIndices[j], _haUrlIndices[i]);
689 private void SelectNextCluster()
691 _currentClusterIndexPointer = (_currentClusterIndexPointer + 1) % _hostAddresses.Count;
692 Interlocked.Increment(ref _numClusterSwitches);
703 public Uri
SwitchUrl(Uri oldUrl,
int oldNumClusterSwitches, Func<Uri, bool>? isKineticaRunning =
null)
713 if (_hostAddresses.Count == 1)
715 throw new KineticaException(
"Only one cluster in ring; HA failover unavailable");
719 int countSwitchesSinceInvocation = _numClusterSwitches - oldNumClusterSwitches;
722 if (countSwitchesSinceInvocation >= _hostAddresses.Count)
724 throw new KineticaException($
"Fail-over attempted as many times as clusters in the ring; URLs attempted: {string.Join(",
", GetUrls())}");
728 var currentUrl =
GetUrl();
729 if (currentUrl !=
null && !currentUrl.Equals(oldUrl) && countSwitchesSinceInvocation > 0)
740 if (currentUrl ==
null)
745 if (currentUrl.Equals(oldUrl))
749 throw new KineticaException($
"Circled back to original URL; no clusters available for fail-over among these: {string.Join(",
", GetUrls())}");
753 bool isRunning = isKineticaRunning?.Invoke(currentUrl) ??
true;
Failover to clusters in sequential order
ClusterAddressInfo? GetClusterInfo()
Gets the active cluster's information.
void AddCluster(ClusterAddressInfo clusterInfo)
Adds a cluster address to the manager.
bool DisableFailover
Whether failover is disabled
int HARingSize
Gets the number of clusters in the HA ring.
IList< ClusterAddressInfo > GetHostAddresses()
Gets all cluster addresses.
IList< Uri > GetFailoverUrls()
Gets the list of URLs in failover order.
override string ToString()
Returns a string representation of this cluster address info.
bool DoesClusterContainNode(string hostName)
Checks if the given hostname (or IP address) is part of this cluster.
bool DisableAutoDiscovery
Whether auto-discovery is disabled
bool IsPrimaryCluster
Whether this is the primary cluster
void Initialize(IList< Uri > urls, Kinetica? kinetica=null)
Initializes the manager with a list of URLs.
int HostManagerPort
Host manager port
const int DefaultHostManagerPort
IDictionary< string, string > SystemProperties
System properties map from the cluster
Uri HostManagerUrl
URL for the host manager
Uri? GetUrl()
Gets the current active URL.
HAFailoverOrder FailoverOrder
HA failover order
IList< Uri > WorkerRankUrls
List of worker rank URLs
Contains address information for a Kinetica cluster.
Uri ActiveHeadNodeUrl
The active head node URL for the cluster
IList< Uri > GetUrls()
Gets the list of URLs of the active head ranks of all clusters.
HAFailoverOrder
High availability failover order options.
ClusterAddressInfo(Uri activeHeadNodeUrl, IDictionary< string, string > systemProperties, IList< Uri > workerRankUrls, HashSet< string > hostNames, Uri hostManagerUrl)
Creates a new ClusterAddressInfo for an active cluster.
Uri SwitchUrl(Uri oldUrl, int oldNumClusterSwitches, Func< Uri, bool >? isKineticaRunning=null)
Switches to the next available cluster URL for HA failover.
Manages high availability failover for Kinetica connections.
Regex? HostnameRegex
Optional hostname regex for filtering URLs
override bool Equals(object? obj)
Equality check based on active head node URL.
override int GetHashCode()
Hash code based on active head node URL.
Failover to clusters in a random order (default)
ClusterAddressInfo(Uri activeHeadNodeUrl, int hostManagerPort)
Creates a ClusterAddressInfo with just an active URL and host manager port.
HashSet< string > HostNames
Set of hostnames/IP addresses in the cluster
HAFailoverManager()
Creates a new HAFailoverManager.
int NumClusterSwitches
Gets the number of times the client has switched to a different cluster.