Kinetica   C#   API  Version 7.2.3.1
WorkerList.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections;
3 using System.Collections.Generic;
4 using System.Linq;
5 using System.Text.RegularExpressions;
6 
7 namespace kinetica.Utils;
8 
20  public sealed class WorkerList : IReadOnlyList<Uri?>
21  {
22  private readonly List<Uri?> _urls;
23 
27  public bool IsMultiHeadEnabled { get; private set; }
28 
32  public Regex? IpRegex { get; private set; }
33 
37  public bool IsQueriedUrlList { get; private set; }
38 
42  public int Count => _urls.Count;
43 
48  public Uri? this[int index] => index >= 0 && index < _urls.Count ? _urls[index] : null;
49 
58  public WorkerList()
59  {
60  _urls = new List<Uri?>();
61  IsMultiHeadEnabled = false;
62  IpRegex = null;
63  IsQueriedUrlList = false;
64  }
65 
80  public WorkerList(Kinetica db) : this(db, (Regex?)null)
81  {
82  }
83 
100  public WorkerList(Kinetica db, Regex? ip_regex)
101  {
102  _urls = new List<Uri?>();
103  IpRegex = ip_regex;
104  IsQueriedUrlList = true;
105  IsMultiHeadEnabled = false;
106 
107  if (db == null)
108  throw new ArgumentNullException(nameof(db));
109 
110  // Check if auto-discovery is enabled on the connection
111  // If HAManager exists and has auto-discovery disabled, return empty list
112  if (db.HAManager?.DisableAutoDiscovery == true)
113  {
114  return;
115  }
116 
117  // Get the system properties from the database server
118  IDictionary<string, string> system_properties = db.showSystemProperties().property_map;
119 
120  // Find out if multi-head ingest is turned on or not
121  if (!system_properties.TryGetValue(ShowSystemPropertiesResponse.PropertyMap.CONF_ENABLE_WORKER_HTTP_SERVERS, out string? multi_head_ingestion_param))
123 
124  bool is_multi_head_ingest_enabled = multi_head_ingestion_param?.Equals(ShowSystemPropertiesResponse.PropertyMap.TRUE, StringComparison.OrdinalIgnoreCase) ?? false;
125 
126  // Nothing to do if multi-head ingestion is disabled
127  if (!is_multi_head_ingest_enabled)
128  {
129  return;
130  }
131 
132  IsMultiHeadEnabled = true;
133 
134  // Multi-head ingestion IS enabled; find the worker URLs
135  if (system_properties.TryGetValue("conf.worker_http_server_urls", out string? worker_urls_str) && !string.IsNullOrEmpty(worker_urls_str))
136  {
137  ParseWorkerUrls(worker_urls_str);
138  }
139  else
140  {
141  // Fall back to IPs and ports
142  ParseWorkerIpsAndPorts(system_properties, db);
143  }
144 
145  // Check that this list is not empty (but only if no removed ranks exist)
146  if (_urls.Count == 0)
147  throw new KineticaException("No worker HTTP servers found.");
148  }
149 
155  public static WorkerList FromUrls(IEnumerable<Uri> urls)
156  {
157  var workerList = new WorkerList();
158  foreach (var url in urls)
159  {
160  workerList._urls.Add(url);
161  }
162  return workerList;
163  }
164 
177  public static WorkerList WithIpPrefix(Kinetica db, string? ip_prefix)
178  {
179  if (string.IsNullOrEmpty(ip_prefix))
180  {
181  return new WorkerList(db);
182  }
183  var regex = new Regex("^" + Regex.Escape(ip_prefix) + ".*");
184  return new WorkerList(db, regex);
185  }
186 
192  private void ParseWorkerUrls(string worker_urls_str)
193  {
194  string[] worker_url_lists = worker_urls_str.Split(';');
195 
196  // Skip rank 0 (index 0), start from rank 1
197  for (int i = 1; i < worker_url_lists.Length; ++i)
198  {
199  string url_list = worker_url_lists[i];
200 
201  // Handle removed ranks (empty string)
202  if (string.IsNullOrEmpty(url_list))
203  {
204  _urls.Add(null);
205  continue;
206  }
207 
208  // Need to split each of the URL lists on a comma
209  string[] urls = url_list.Split(',');
210 
211  bool matching_url_found = false;
212 
213  // Find at least one URL to work with
214  foreach (string url_str in urls)
215  {
216  try
217  {
218  // If a regular expression is given, then see if this one is a match
219  if (IpRegex != null)
220  {
221  // Extract host from URL for matching
222  var tempUri = new Uri(url_str);
223  matching_url_found = IpRegex.IsMatch(tempUri.Host);
224  }
225  else
226  {
227  // No regex given, so take the first URL encountered for this worker
228  matching_url_found = true;
229  }
230 
231  if (matching_url_found)
232  {
233  Uri url = new Uri(url_str);
234  _urls.Add(url);
235  break;
236  }
237  }
238  catch (Exception ex)
239  {
240  throw new KineticaException($"Invalid URL '{url_str}': {ex.Message}");
241  }
242  }
243 
244  if (!matching_url_found)
245  throw new KineticaException($"No matching URL found for worker #{i}.");
246  }
247  }
248 
253  private void ParseWorkerIpsAndPorts(IDictionary<string, string> system_properties, Kinetica db)
254  {
255  if (!system_properties.TryGetValue(ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_IPS, out string? worker_ips_str) || string.IsNullOrEmpty(worker_ips_str))
257 
258  if (!system_properties.TryGetValue(ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_PORTS, out string? worker_ports_str) || string.IsNullOrEmpty(worker_ports_str))
260 
261  // Split the strings
262  string[] worker_ip_lists = worker_ips_str.Split(';');
263  string[] worker_ports = worker_ports_str.Split(';');
264 
265  // Check that there are the same number of IPs and ports supplied
266  if (worker_ip_lists.Length != worker_ports.Length)
267  throw new KineticaException("Inconsistent number of values for "
269  + " and "
271 
272  // Get the protocol from the database URL
273  string scheme = db.URL.Scheme;
274 
275  // Skip rank 0 (index 0), start from rank 1
276  for (int i = 1; i < worker_ip_lists.Length; ++i)
277  {
278  string ip_list = worker_ip_lists[i];
279 
280  // Handle removed ranks (empty string)
281  if (string.IsNullOrEmpty(ip_list))
282  {
283  _urls.Add(null);
284  continue;
285  }
286 
287  // Need to split each of the IP lists on a comma
288  string[] ips = ip_list.Split(',');
289 
290  bool matching_ip_found = false;
291 
292  // Find at least one IP to work with
293  foreach (string ip in ips)
294  {
295  try
296  {
297  // If a regular expression is given, then see if this one is a match
298  if (IpRegex != null)
299  matching_ip_found = IpRegex.IsMatch(ip);
300  else
301  matching_ip_found = true;
302 
303  if (matching_ip_found)
304  {
305  UriBuilder uri_builder = new UriBuilder(scheme, ip, int.Parse(worker_ports[i]));
306  Uri url = uri_builder.Uri;
307  _urls.Add(url);
308  break;
309  }
310  }
311  catch (Exception ex)
312  {
313  throw new KineticaException($"Error creating URL for worker #{i}: {ex.Message}");
314  }
315  }
316 
317  if (!matching_ip_found)
318  throw new KineticaException($"No matching IP found for worker #{i}.");
319  }
320  }
321 
322  // ==================== Mutation Methods ====================
323 
328  public void Add(Uri url)
329  {
330  _urls.Add(url);
331  }
332 
337  public void AddOrNull(Uri? url)
338  {
339  _urls.Add(url);
340  }
341 
345  public void Clear()
346  {
347  _urls.Clear();
348  }
349 
350  // ==================== Query Methods ====================
351 
355  public bool IsEmpty => _urls.Count == 0;
356 
361  public IEnumerable<Uri> GetActiveUrls()
362  {
363  return _urls.Where(u => u != null).Cast<Uri>();
364  }
365 
369  public int ActiveCount => _urls.Count(u => u != null);
370 
376  public List<string> ToUrlStrings()
377  {
378  return _urls
379  .Where(u => u != null)
380  .Select(u => u!.ToString())
381  .ToList();
382  }
383 
390  public Uri? Get(int index)
391  {
392  return index >= 0 && index < _urls.Count ? _urls[index] : null;
393  }
394 
395  // ==================== IReadOnlyList Implementation ====================
396 
401  public IEnumerator<Uri?> GetEnumerator()
402  {
403  return _urls.GetEnumerator();
404  }
405 
406  IEnumerator IEnumerable.GetEnumerator()
407  {
408  return GetEnumerator();
409  }
410 
411  // ==================== Backward Compatibility ====================
412 
418  public IEnumerable<Uri> GetActiveUrlsEnumerator()
419  {
420  return GetActiveUrls();
421  }
422  }
void AddOrNull(Uri? url)
Adds a URL or null (for removed rank) to the worker list.
Definition: WorkerList.cs:337
void Add(Uri url)
Adds a URL to the worker list.
Definition: WorkerList.cs:328
A set of string constants for the parameter property_map.
A set of results returned by Kinetica.showSystemProperties.
IEnumerable< Uri > GetActiveUrlsEnumerator()
Returns an enumerator that iterates through only active (non-null) worker URLs.
Definition: WorkerList.cs:418
A list of worker URLs to use for multi-head operations.
Definition: WorkerList.cs:20
HAFailoverManager? HAManager
Gets the HA failover manager instance.
Definition: Kinetica.cs:192
WorkerList()
Creates an empty WorkerList that can be populated manually with worker URLs to support multi-head ope...
Definition: WorkerList.cs:58
bool IsMultiHeadEnabled
Whether multi-head I/O is enabled on the server.
Definition: WorkerList.cs:27
bool IsEmpty
Returns whether the worker list is empty.
Definition: WorkerList.cs:355
bool DisableAutoDiscovery
Whether auto-discovery is disabled
Definition: HAFailover.cs:219
static WorkerList FromUrls(IEnumerable< Uri > urls)
Creates a WorkerList from explicit URLs.
Definition: WorkerList.cs:155
WorkerList(Kinetica db, Regex? ip_regex)
Creates a WorkerList and automatically populates it with the worker URLs from Kinetica to support mul...
Definition: WorkerList.cs:100
const string TRUE
Indicates that the system is configured for multi-head ingestion.
IEnumerable< Uri > GetActiveUrls()
Gets all active (non-null) URLs in the worker list.
Definition: WorkerList.cs:361
ShowSystemPropertiesResponse showSystemProperties(ShowSystemPropertiesRequest request_)
Returns server configuration and version related information to the caller.
int ActiveCount
Gets the number of active (non-null) workers in the list.
Definition: WorkerList.cs:369
Regex? IpRegex
The IP regex used to filter worker URLs, if one was specified.
Definition: WorkerList.cs:32
const string CONF_WORKER_HTTP_SERVER_IPS
Semicolon (';') separated string of IP addresses of all the ingestion-enabled worker heads of the sys...
int Count
Gets the number of workers in the list (including removed ranks).
Definition: WorkerList.cs:42
void Clear()
Clears all URLs from the worker list.
Definition: WorkerList.cs:345
List< string > ToUrlStrings()
Converts the worker list to a list of URL strings.
Definition: WorkerList.cs:376
IDictionary< string, string > property_map
A map of server configuration parameters and version information.
const string CONF_WORKER_HTTP_SERVER_PORTS
Semicolon (';') separated string of the port numbers of all the ingestion-enabled worker ranks of the...
bool IsQueriedUrlList
Whether this worker list was created by querying the server.
Definition: WorkerList.cs:37
Uri? Get(int index)
Gets the URL at the specified index, or null if the rank was removed or index is out of bounds.
Definition: WorkerList.cs:390
IEnumerator< Uri?> GetEnumerator()
Returns an enumerator that iterates through the worker URLs.
Definition: WorkerList.cs:401
const string CONF_ENABLE_WORKER_HTTP_SERVERS
Boolean value indicating whether the system is configured for multi-head ingestion.
Uri URL
URL for Kinetica Server (including "http:" and port)
Definition: Kinetica.cs:152
static WorkerList WithIpPrefix(Kinetica db, string? ip_prefix)
Creates a WorkerList and automatically populates it with the worker URLs from Kinetica,...
Definition: WorkerList.cs:177