Kinetica C# API  Version 7.0.19.0
 All Classes Namespaces Files Functions Variables Enumerations Enumerator Properties Pages
WorkerList.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 using System.Text.RegularExpressions;
4 
5 
6 namespace kinetica.Utils
7 {
11  public sealed class WorkerList : List<System.Uri>
12  {
20  public WorkerList() { }
21 
35  public WorkerList(Kinetica db, Regex ip_regex = null)
36  {
37  // Get the system properties from the database server
38  IDictionary<string, string> system_properties = db.showSystemProperties().property_map;
39 
40  // Find out if multi-head ingest is turned on or not
41  string multi_head_ingestion_param;
42  system_properties.TryGetValue(ShowSystemPropertiesResponse.PropertyMap.CONF_ENABLE_WORKER_HTTP_SERVERS, out multi_head_ingestion_param);
43  if (multi_head_ingestion_param == null)
44  throw new KineticaException("Missing value for " + ShowSystemPropertiesResponse.PropertyMap.CONF_ENABLE_WORKER_HTTP_SERVERS);
45  bool is_multi_head_ingest_enabled = multi_head_ingestion_param.Equals(ShowSystemPropertiesResponse.PropertyMap.TRUE);
46 
47  // Nothing to do if multi-head ingestion is disabled
48  if (!is_multi_head_ingest_enabled)
49  {
50  return;
51  }
52 
53  // Multi-head ingestion IS enabled; find the worker URLs; then
54  // add them to the worker url vector.
55  // -----------------------------------------------------------
56  string worker_urls_str;
57  system_properties.TryGetValue("conf.worker_http_server_urls", out worker_urls_str);
58  if ( worker_urls_str.Length > 0 ) // found some URLs
59  {
60  // Parse the URLs
61  // --------------
62  // Split the strings
63  string[] worker_url_lists = worker_urls_str.Split(';');
64 
65  // Ignoring the very first rank (rank-0), add all matching
66  // URLs (if any regex is given), or just add the first one
67  for (int i = 1; i < worker_url_lists.Length; ++i)
68  {
69  string url_list = worker_url_lists[i];
70 
71  // Need to split each of the URL lists on a comma
72  string[] urls = url_list.Split(',');
73 
74  bool matching_url_found = false;
75 
76  // Find at least one URL to work with
77  foreach (string url_str in urls)
78  {
79  // Try to create the URL
80  try
81  {
82  // If a regular expression is given, then see if this one is a match
83  if (ip_regex != null)
84  matching_url_found = ip_regex.IsMatch(url_str);
85  else // no regex given, so take the first URL encountered for this worker
86  matching_url_found = true;
87 
88  if (matching_url_found)
89  {
90  Uri url = new Uri( url_str );
91 
92  // Add the URL to this WorkerList
93  this.Add(url);
94  break; // don't keep trying to match URLs in this group
95  } // end inner if
96  } // end try
97  catch (Exception ex)
98  {
99  throw new KineticaException(ex.Message);
100  }
101  } // end inner foreach
102 
103  if (!matching_url_found)
104  throw new KineticaException($"No matching URL found for worker #{i}.");
105  } // end outer for
106  }
107  else // construct the URLs from IP addresses and ports
108  {
109  // Get the worker IPs and ports
110  string worker_ips_str, worker_ports_str;
111  system_properties.TryGetValue(ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_IPS, out worker_ips_str);
112  system_properties.TryGetValue(ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_PORTS, out worker_ports_str);
113 
114  // Check that we got them
115  if (worker_ips_str.Length == 0)
116  throw new KineticaException("Missing value for " + ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_IPS);
117  if (worker_ports_str.Length == 0)
118  throw new KineticaException("Missing value for " + ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_PORTS);
119 
120  // Parse the IPs and the ports
121  // ---------------------------
122  // Split the strings
123  string[] worker_ip_lists = worker_ips_str.Split(';');
124  string[] worker_ports = worker_ports_str.Split(';');
125 
126  // Check that there are the same number of IPs and ports supplied
127  if (worker_ip_lists.Length != worker_ports.Length)
128  throw new KineticaException("Inconsistent number of values for "
129  + ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_IPS
130  + " and "
131  + ShowSystemPropertiesResponse.PropertyMap.CONF_WORKER_HTTP_SERVER_PORTS);
132  // Create the URLs using the IPs and the ports, but
133  // ignore the very first rank (rank-0)
134  for (int i = 1; i < worker_ip_lists.Length; ++i)
135  {
136  string ip_list = worker_ip_lists[i];
137 
138  // Need to split each of the IP lists on a comma
139  string[] ips = ip_list.Split(',');
140 
141  bool matching_ip_found = false;
142 
143  // Find at least one IP to work with
144  foreach (string ip in ips)
145  {
146  // Try to create the URL
147  try
148  {
149  // If a regular expression is given, then see if this one is a match
150  if (ip_regex != null)
151  matching_ip_found = ip_regex.IsMatch(ip);
152  else // no regex given, so take the first IP encountered for this worker
153  matching_ip_found = true;
154 
155  if (matching_ip_found)
156  {
157  UriBuilder uri_builder = new UriBuilder( db.URL.Scheme,
158  ip, Int32.Parse(worker_ports[i]) );
159  Uri url = uri_builder.Uri;
160 
161  // Add the URL to this WorkerList
162  this.Add(url);
163  break; // don't keep trying to match IPs in this group
164  } // end inner if
165  } // end try
166  catch (Exception ex)
167  {
168  throw new KineticaException(ex.Message);
169  }
170  } // end inner foreach
171 
172  if (!matching_ip_found)
173  throw new KineticaException($"No matching IP found for worker #{i}.");
174  } // end outer for
175  } // end if-else
176 
177  // Check that this list is not empty
178  if (this.Count == 0)
179  throw new KineticaException("No worker HTTP servers found.");
180  } // end constructor
181 
182  } // end class WorkerList
183 
184 } // end namespace kinetica.Utils
A list of worker URLs to use for multi-head ingest.
Definition: WorkerList.cs:11
A set of results returned by Kinetica.showSystemProperties(IDictionary{string, string}).
Uri URL
URL for Kinetica Server (including &quot;http:&quot; and port)
Definition: Kinetica.cs:87
WorkerList()
Creates an empty WorkerList that can be populated manually with worker URLs to support multi-head ing...
Definition: WorkerList.cs:20
WorkerList(Kinetica db, Regex ip_regex=null)
Creates a WorkerList object and automatically populates it with the worker URLs from GPUdb to support...
Definition: WorkerList.cs:35
API to talk to Kinetica Database
Definition: Kinetica.cs:40