Kinetica   C#   API  Version 7.2.3.1
HAFailover.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text.RegularExpressions;
5 using System.Threading;
6 
7 namespace kinetica;
8 
12  public enum HAFailoverOrder
13  {
17  Random,
18 
23  }
24 
30  public class ClusterAddressInfo
31  {
35  public Uri ActiveHeadNodeUrl { get; set; }
36 
40  public IDictionary<string, string> SystemProperties { get; set; }
41 
45  public IList<Uri> WorkerRankUrls { get; set; }
46 
50  public HashSet<string> HostNames { get; set; }
51 
55  public Uri HostManagerUrl { get; set; }
56 
60  public bool IsPrimaryCluster { get; set; }
61 
71  Uri activeHeadNodeUrl,
72  IDictionary<string, string> systemProperties,
73  IList<Uri> workerRankUrls,
74  HashSet<string> hostNames,
75  Uri hostManagerUrl)
76  {
77  ActiveHeadNodeUrl = activeHeadNodeUrl;
78  SystemProperties = systemProperties ?? new Dictionary<string, string>();
79  WorkerRankUrls = workerRankUrls ?? new List<Uri>();
80  HostNames = hostNames ?? new HashSet<string>();
81  HostManagerUrl = hostManagerUrl;
82  IsPrimaryCluster = false;
83 
84  // Ensure that all the known ranks' hostnames are also accounted for
85  UpdateHostnamesBasedOnRankUrls();
86  }
87 
93  public ClusterAddressInfo(Uri activeHeadNodeUrl, int hostManagerPort)
94  {
95  ActiveHeadNodeUrl = activeHeadNodeUrl;
96  SystemProperties = new Dictionary<string, string>();
97  WorkerRankUrls = new List<Uri>();
98  HostNames = new HashSet<string>();
99  IsPrimaryCluster = false;
100 
101  // Create host manager URL
102  var builder = new UriBuilder(activeHeadNodeUrl)
103  {
104  Port = hostManagerPort,
105  Path = string.Empty
106  };
107  HostManagerUrl = builder.Uri;
108 
109  // Ensure that all the known ranks' hostnames are also accounted for
110  UpdateHostnamesBasedOnRankUrls();
111  }
112 
117  private void UpdateHostnamesBasedOnRankUrls()
118  {
119  // Put the head rank's hostname in the saved hostnames
120  if (ActiveHeadNodeUrl != null && !string.IsNullOrEmpty(ActiveHeadNodeUrl.Host))
121  {
123  {
124  HostNames.Add(ActiveHeadNodeUrl.Host);
125  }
126  }
127 
128  // Put each worker rank's hostname in the saved hostnames
129  if (WorkerRankUrls != null)
130  {
131  foreach (var workerUrl in WorkerRankUrls)
132  {
133  if (workerUrl != null && !string.IsNullOrEmpty(workerUrl.Host))
134  {
135  if (!DoesClusterContainNode(workerUrl.Host))
136  {
137  HostNames.Add(workerUrl.Host);
138  }
139  }
140  }
141  }
142  }
143 
149  public bool DoesClusterContainNode(string hostName)
150  {
151  return HostNames.Contains(hostName);
152  }
153 
157  public override string ToString()
158  {
159  var workerUrls = string.Join(", ", WorkerRankUrls.Select(u => u.ToString()));
160  var hostnames = string.Join(", ", HostNames);
161  return $"{{ activeHeadNodeUrl: {ActiveHeadNodeUrl}, workerRankUrls: [{workerUrls}], hostNames: [{hostnames}], hostManagerUrl: {HostManagerUrl}, isPrimaryCluster: {IsPrimaryCluster} }}";
162  }
163 
167  public override bool Equals(object? obj)
168  {
169  if (obj is ClusterAddressInfo other)
170  {
171  return ActiveHeadNodeUrl?.ToString() == other.ActiveHeadNodeUrl?.ToString();
172  }
173  return false;
174  }
175 
179  public override int GetHashCode()
180  {
181  return ActiveHeadNodeUrl?.ToString().GetHashCode() ?? 0;
182  }
183  }
184 
190  public class HAFailoverManager
191  {
192  // Default port for host manager URLs
193  public const int DefaultHostManagerPort = 9300;
194 
195  // System properties response keys
196  private const string SYSTEM_PROP_ENABLE_HTTPD = "conf.enable_httpd_proxy";
197  private const string SYSTEM_PROP_ENABLE_MH = "conf.enable_worker_http_servers";
198  private const string SYSTEM_PROP_NUM_HOSTS = "conf.number_of_hosts";
199  private const string SYSTEM_PROP_HEAD_NODE_URLS = "conf.ha_ring_head_nodes_full";
200  private const string SYSTEM_PROP_SERVER_URLS = "conf.worker_http_server_urls";
201  private const string SYSTEM_PROP_ENABLE_HA = "conf.enable_ha";
202  private const string SYSTEM_PROP_TRUE = "TRUE";
203 
204  private readonly List<ClusterAddressInfo> _hostAddresses;
205  private readonly List<int> _haUrlIndices;
206  private string _primaryUrlHostname;
207  private int _currentClusterIndexPointer;
208  private int _numClusterSwitches;
209  private readonly object _lock = new object();
210 
214  public bool DisableFailover { get; set; }
215 
219  public bool DisableAutoDiscovery { get; set; }
220 
224  public int HostManagerPort { get; set; }
225 
229  public HAFailoverOrder FailoverOrder { get; set; }
230 
234  public Regex? HostnameRegex { get; set; }
235 
240  {
241  _hostAddresses = new List<ClusterAddressInfo>();
242  _haUrlIndices = new List<int>();
243  _primaryUrlHostname = string.Empty;
244  _currentClusterIndexPointer = 0;
245  _numClusterSwitches = 0;
246  DisableFailover = false;
247  DisableAutoDiscovery = false;
250  }
251 
255  public int HARingSize
256  {
257  get
258  {
259  lock (_lock)
260  {
261  return _hostAddresses.Count;
262  }
263  }
264  }
265 
269  public int NumClusterSwitches
270  {
271  get
272  {
273  lock (_lock)
274  {
275  return _numClusterSwitches;
276  }
277  }
278  }
279 
283  public IList<ClusterAddressInfo> GetHostAddresses()
284  {
285  lock (_lock)
286  {
287  return new List<ClusterAddressInfo>(_hostAddresses);
288  }
289  }
290 
294  public IList<Uri> GetUrls()
295  {
296  lock (_lock)
297  {
298  return _hostAddresses.Select(h => h.ActiveHeadNodeUrl).ToList();
299  }
300  }
301 
305  public IList<Uri> GetFailoverUrls()
306  {
307  lock (_lock)
308  {
309  return _haUrlIndices
310  .Where(i => i < _hostAddresses.Count)
311  .Select(i => _hostAddresses[i].ActiveHeadNodeUrl)
312  .ToList();
313  }
314  }
315 
320  {
321  lock (_lock)
322  {
323  if (_hostAddresses.Count == 0)
324  return null;
325 
326  if (_hostAddresses.Count == 1)
327  return _hostAddresses[0];
328 
329  if (_currentClusterIndexPointer < _haUrlIndices.Count)
330  {
331  int index = _haUrlIndices[_currentClusterIndexPointer];
332  if (index < _hostAddresses.Count)
333  return _hostAddresses[index];
334  }
335 
336  return null;
337  }
338  }
339 
343  public Uri? GetUrl()
344  {
346  }
347 
351  public void AddCluster(ClusterAddressInfo clusterInfo)
352  {
353  lock (_lock)
354  {
355  _hostAddresses.Add(clusterInfo);
356  }
357  }
358 
364  public void Initialize(IList<Uri> urls, Kinetica? kinetica = null)
365  {
366  lock (_lock)
367  {
368  _hostAddresses.Clear();
369  _haUrlIndices.Clear();
370  _currentClusterIndexPointer = 0;
371  _numClusterSwitches = 0;
372 
373  var urlQueue = new Queue<Uri>(urls);
374  int numUserGivenUrls = urlQueue.Count;
375  int numProcessedUrls = 0;
376  var clusterIndicesOfUserGivenUrls = new List<int>();
377 
378  while (urlQueue.Count > 0)
379  {
380  var url = urlQueue.Dequeue();
381  bool isUserGivenUrl = numProcessedUrls < numUserGivenUrls;
382  numProcessedUrls++;
383 
384  // Check if this hostname is already in a known cluster
385  int? existingIndex = GetIndexOfClusterContainingNode(url.Host);
386  if (existingIndex.HasValue)
387  {
388  if (isUserGivenUrl)
389  clusterIndicesOfUserGivenUrls.Add(existingIndex.Value);
390  continue;
391  }
392 
393  // Create minimal cluster info
394  ClusterAddressInfo? clusterInfo = null;
395 
396  if (!DisableAutoDiscovery && kinetica != null)
397  {
398  try
399  {
400  // Try to get system properties from the server
401  var systemProps = kinetica.showSystemProperties().property_map;
402  clusterInfo = CreateClusterAddressInfo(url, systemProps);
403 
404  // Get HA ring URLs and add them to the queue
405  var haRingUrls = GetHARingHeadNodeUrls(systemProps);
406  foreach (var haUrl in haRingUrls)
407  {
408  if (!GetIndexOfClusterContainingNode(haUrl.Host).HasValue &&
409  !urlQueue.Contains(haUrl))
410  {
411  urlQueue.Enqueue(haUrl);
412  }
413  }
414  }
415  catch
416  {
417  // Failed to get system properties, use minimal info
418  clusterInfo = new ClusterAddressInfo(url, HostManagerPort);
419  }
420  }
421  else
422  {
423  clusterInfo = new ClusterAddressInfo(url, HostManagerPort);
424  }
425 
426  if (isUserGivenUrl)
427  clusterIndicesOfUserGivenUrls.Add(_hostAddresses.Count);
428 
429  _hostAddresses.Add(clusterInfo);
430  }
431 
432  if (_hostAddresses.Count == 0)
433  throw new KineticaException("Could not connect to any working Kinetica server");
434 
435  // Set the primary cluster
436  if (_hostAddresses.Count == 1)
437  {
438  _hostAddresses[0].IsPrimaryCluster = true;
439  _primaryUrlHostname = _hostAddresses[0].ActiveHeadNodeUrl.Host;
440  }
441  else
442  {
443  // If all user-given URLs belong to the same cluster, make that the primary
444  if (string.IsNullOrEmpty(_primaryUrlHostname))
445  {
446  var uniqueIndices = new HashSet<int>(clusterIndicesOfUserGivenUrls);
447  if (uniqueIndices.Count == 1 && clusterIndicesOfUserGivenUrls.Count > 0)
448  {
449  int primaryIndex = clusterIndicesOfUserGivenUrls[0];
450  _primaryUrlHostname = _hostAddresses[primaryIndex].ActiveHeadNodeUrl.Host;
451  }
452  }
453  }
454 
455  // Flag the primary cluster and move it to the front
456  if (!string.IsNullOrEmpty(_primaryUrlHostname))
457  {
458  int? primaryIndex = GetIndexOfClusterContainingNode(_primaryUrlHostname);
459  if (primaryIndex.HasValue)
460  {
461  _hostAddresses[primaryIndex.Value].IsPrimaryCluster = true;
462  if (primaryIndex.Value > 0)
463  {
464  var primary = _hostAddresses[primaryIndex.Value];
465  _hostAddresses.RemoveAt(primaryIndex.Value);
466  _hostAddresses.Insert(0, primary);
467  }
468  }
469  }
470 
471  // Randomize URLs for failover
472  RandomizeUrls();
473  }
474  }
475 
479  private int? GetIndexOfClusterContainingNode(string hostname)
480  {
481  for (int i = 0; i < _hostAddresses.Count; i++)
482  {
483  if (_hostAddresses[i].DoesClusterContainNode(hostname))
484  return i;
485  }
486  return null;
487  }
488 
492  private ClusterAddressInfo CreateClusterAddressInfo(Uri url, IDictionary<string, string> systemProperties)
493  {
494  bool isHttpdEnabled = GetSystemPropertyBool(systemProperties, SYSTEM_PROP_ENABLE_HTTPD);
495  bool isMultiHeadEnabled = GetSystemPropertyBool(systemProperties, SYSTEM_PROP_ENABLE_MH);
496 
497  // Get rank URLs
498  var rankUrls = new List<Uri>();
499  if (isMultiHeadEnabled && systemProperties.TryGetValue(SYSTEM_PROP_SERVER_URLS, out var serverUrlsStr))
500  {
501  rankUrls = ParseRankUrls(serverUrlsStr);
502  }
503 
504  // Get head node URL (first rank URL, or fall back to given URL)
505  Uri activeHeadNodeUrl;
506  if (rankUrls.Count > 0)
507  {
508  activeHeadNodeUrl = rankUrls[0];
509  rankUrls.RemoveAt(0);
510  }
511  else
512  {
513  activeHeadNodeUrl = url;
514  }
515 
516  // Get hostnames
517  var hostNames = GetHostnamesFromSystemProperties(systemProperties);
518 
519  // Create host manager URL
520  Uri hostManagerUrl;
521  if (isHttpdEnabled && !string.IsNullOrEmpty(activeHeadNodeUrl.AbsolutePath) && activeHeadNodeUrl.AbsolutePath != "/")
522  {
523  var builder = new UriBuilder(activeHeadNodeUrl)
524  {
525  Path = "/gpudb-host-manager"
526  };
527  hostManagerUrl = builder.Uri;
528  }
529  else
530  {
531  var builder = new UriBuilder(activeHeadNodeUrl)
532  {
533  Port = HostManagerPort,
534  Path = string.Empty
535  };
536  hostManagerUrl = builder.Uri;
537  }
538 
539  return new ClusterAddressInfo(activeHeadNodeUrl, systemProperties, rankUrls, hostNames, hostManagerUrl);
540  }
541 
545  private List<Uri> ParseRankUrls(string serverUrlsStr)
546  {
547  var result = new List<Uri>();
548  var rankEntries = serverUrlsStr.Split(';');
549 
550  foreach (var entry in rankEntries)
551  {
552  if (string.IsNullOrEmpty(entry))
553  continue;
554 
555  var urls = entry.Split(',');
556  foreach (var urlStr in urls)
557  {
558  if (Uri.TryCreate(urlStr.Trim(), UriKind.Absolute, out var parsedUrl))
559  {
560  bool shouldAdd = HostnameRegex == null || HostnameRegex.IsMatch(parsedUrl.Host);
561  if (shouldAdd)
562  {
563  result.Add(parsedUrl);
564  break;
565  }
566  }
567  }
568  }
569 
570  return result;
571  }
572 
576  private HashSet<string> GetHostnamesFromSystemProperties(IDictionary<string, string> systemProperties)
577  {
578  var hostnames = new HashSet<string>();
579 
580  if (!systemProperties.TryGetValue(SYSTEM_PROP_NUM_HOSTS, out var numHostsStr) ||
581  !int.TryParse(numHostsStr, out int numHosts))
582  {
583  return hostnames;
584  }
585 
586  for (int i = 0; i < numHosts; i++)
587  {
588  string key = $"conf.host{i}_public_urls";
589  if (systemProperties.TryGetValue(key, out var hostnameStr))
590  {
591  var hostUrls = hostnameStr.Split(',');
592  foreach (var hostname in hostUrls)
593  {
594  var host = hostname.Trim();
595  // Strip protocol if present
596  int idx = host.IndexOf("://");
597  if (idx >= 0)
598  host = host.Substring(idx + 3);
599 
600  bool shouldAdd = HostnameRegex == null || HostnameRegex.IsMatch(host);
601  if (shouldAdd)
602  {
603  hostnames.Add(host);
604  break;
605  }
606  }
607  }
608  }
609 
610  return hostnames;
611  }
612 
616  private List<Uri> GetHARingHeadNodeUrls(IDictionary<string, string> systemProperties)
617  {
618  var haUrls = new List<Uri>();
619 
620  if (!GetSystemPropertyBool(systemProperties, SYSTEM_PROP_ENABLE_HA))
621  return haUrls;
622 
623  if (!systemProperties.TryGetValue(SYSTEM_PROP_HEAD_NODE_URLS, out var haRingStr) ||
624  string.IsNullOrEmpty(haRingStr))
625  {
626  return haUrls;
627  }
628 
629  var clusterEntries = haRingStr.Split(';');
630  foreach (var entry in clusterEntries)
631  {
632  if (string.IsNullOrEmpty(entry))
633  continue;
634 
635  var urls = entry.Split(',');
636  foreach (var urlStr in urls)
637  {
638  if (Uri.TryCreate(urlStr.Trim(), UriKind.Absolute, out var parsedUrl))
639  {
640  bool shouldAdd = HostnameRegex == null || HostnameRegex.IsMatch(parsedUrl.Host);
641  if (shouldAdd)
642  {
643  haUrls.Add(parsedUrl);
644  break;
645  }
646  }
647  }
648  }
649 
650  return haUrls;
651  }
652 
656  private static bool GetSystemPropertyBool(IDictionary<string, string> properties, string key)
657  {
658  return properties.TryGetValue(key, out var value) &&
659  value.Equals(SYSTEM_PROP_TRUE, StringComparison.OrdinalIgnoreCase);
660  }
661 
665  private void RandomizeUrls()
666  {
667  _haUrlIndices.Clear();
668  for (int i = 0; i < _hostAddresses.Count; i++)
669  {
670  _haUrlIndices.Add(i);
671  }
672 
673  if (FailoverOrder == HAFailoverOrder.Random && _haUrlIndices.Count > 1)
674  {
675  // Keep primary (index 0) first, shuffle the rest
676  var rng = new Random();
677  int n = _haUrlIndices.Count;
678  for (int i = n - 1; i > 1; i--)
679  {
680  int j = rng.Next(1, i + 1);
681  (_haUrlIndices[i], _haUrlIndices[j]) = (_haUrlIndices[j], _haUrlIndices[i]);
682  }
683  }
684  }
685 
689  private void SelectNextCluster()
690  {
691  _currentClusterIndexPointer = (_currentClusterIndexPointer + 1) % _hostAddresses.Count;
692  Interlocked.Increment(ref _numClusterSwitches);
693  }
694 
703  public Uri SwitchUrl(Uri oldUrl, int oldNumClusterSwitches, Func<Uri, bool>? isKineticaRunning = null)
704  {
705  lock (_lock)
706  {
707  if (DisableFailover)
708  {
709  throw new KineticaException("Failover is disabled!");
710  }
711 
712  // If there is only one URL, then we can't switch URLs
713  if (_hostAddresses.Count == 1)
714  {
715  throw new KineticaException("Only one cluster in ring; HA failover unavailable");
716  }
717 
718  // Get how many more times other threads have switched clusters
719  int countSwitchesSinceInvocation = _numClusterSwitches - oldNumClusterSwitches;
720 
721  // Check if another thread has tried all the clusters in the HA ring
722  if (countSwitchesSinceInvocation >= _hostAddresses.Count)
723  {
724  throw new KineticaException($"Fail-over attempted as many times as clusters in the ring; URLs attempted: {string.Join(", ", GetUrls())}");
725  }
726 
727  // Check if another thread beat us to switching the URL
728  var currentUrl = GetUrl();
729  if (currentUrl != null && !currentUrl.Equals(oldUrl) && countSwitchesSinceInvocation > 0)
730  {
731  return currentUrl;
732  }
733 
734  // This thread is the first one here--select the next cluster to use
735  while (true)
736  {
737  SelectNextCluster();
738 
739  currentUrl = GetUrl();
740  if (currentUrl == null)
741  {
742  throw new KineticaException("No current URL available");
743  }
744 
745  if (currentUrl.Equals(oldUrl))
746  {
747  // Re-shuffle and throw exception
748  RandomizeUrls();
749  throw new KineticaException($"Circled back to original URL; no clusters available for fail-over among these: {string.Join(", ", GetUrls())}");
750  }
751 
752  // Check if the new cluster is running
753  bool isRunning = isKineticaRunning?.Invoke(currentUrl) ?? true;
754  if (isRunning)
755  {
756  return currentUrl;
757  }
758  }
759  }
760  }
761  }
Failover to clusters in sequential order
ClusterAddressInfo? GetClusterInfo()
Gets the active cluster's information.
Definition: HAFailover.cs:319
void AddCluster(ClusterAddressInfo clusterInfo)
Adds a cluster address to the manager.
Definition: HAFailover.cs:351
bool DisableFailover
Whether failover is disabled
Definition: HAFailover.cs:214
int HARingSize
Gets the number of clusters in the HA ring.
Definition: HAFailover.cs:256
IList< ClusterAddressInfo > GetHostAddresses()
Gets all cluster addresses.
Definition: HAFailover.cs:283
IList< Uri > GetFailoverUrls()
Gets the list of URLs in failover order.
Definition: HAFailover.cs:305
override string ToString()
Returns a string representation of this cluster address info.
Definition: HAFailover.cs:157
bool DoesClusterContainNode(string hostName)
Checks if the given hostname (or IP address) is part of this cluster.
Definition: HAFailover.cs:149
bool DisableAutoDiscovery
Whether auto-discovery is disabled
Definition: HAFailover.cs:219
bool IsPrimaryCluster
Whether this is the primary cluster
Definition: HAFailover.cs:60
void Initialize(IList< Uri > urls, Kinetica? kinetica=null)
Initializes the manager with a list of URLs.
Definition: HAFailover.cs:364
int HostManagerPort
Host manager port
Definition: HAFailover.cs:224
const int DefaultHostManagerPort
Definition: HAFailover.cs:193
IDictionary< string, string > SystemProperties
System properties map from the cluster
Definition: HAFailover.cs:40
Uri HostManagerUrl
URL for the host manager
Definition: HAFailover.cs:55
Uri? GetUrl()
Gets the current active URL.
Definition: HAFailover.cs:343
HAFailoverOrder FailoverOrder
HA failover order
Definition: HAFailover.cs:229
IList< Uri > WorkerRankUrls
List of worker rank URLs
Definition: HAFailover.cs:45
Contains address information for a Kinetica cluster.
Definition: HAFailover.cs:30
Uri ActiveHeadNodeUrl
The active head node URL for the cluster
Definition: HAFailover.cs:35
IList< Uri > GetUrls()
Gets the list of URLs of the active head ranks of all clusters.
Definition: HAFailover.cs:294
HAFailoverOrder
High availability failover order options.
Definition: HAFailover.cs:12
ClusterAddressInfo(Uri activeHeadNodeUrl, IDictionary< string, string > systemProperties, IList< Uri > workerRankUrls, HashSet< string > hostNames, Uri hostManagerUrl)
Creates a new ClusterAddressInfo for an active cluster.
Definition: HAFailover.cs:70
Uri SwitchUrl(Uri oldUrl, int oldNumClusterSwitches, Func< Uri, bool >? isKineticaRunning=null)
Switches to the next available cluster URL for HA failover.
Definition: HAFailover.cs:703
Manages high availability failover for Kinetica connections.
Definition: HAFailover.cs:190
Regex? HostnameRegex
Optional hostname regex for filtering URLs
Definition: HAFailover.cs:234
override bool Equals(object? obj)
Equality check based on active head node URL.
Definition: HAFailover.cs:167
override int GetHashCode()
Hash code based on active head node URL.
Definition: HAFailover.cs:179
Failover to clusters in a random order (default)
ClusterAddressInfo(Uri activeHeadNodeUrl, int hostManagerPort)
Creates a ClusterAddressInfo with just an active URL and host manager port.
Definition: HAFailover.cs:93
HashSet< string > HostNames
Set of hostnames/IP addresses in the cluster
Definition: HAFailover.cs:50
HAFailoverManager()
Creates a new HAFailoverManager.
Definition: HAFailover.cs:239
int NumClusterSwitches
Gets the number of times the client has switched to a different cluster.
Definition: HAFailover.cs:270