Kinetica   C#   API  Version 7.2.3.1
Kinetica.cs
Go to the documentation of this file.
1 using Avro.IO;
2 using Newtonsoft.Json;
3 using Snappier;
4 using System.Net;
5 using System.Net.Http.Headers;
6 using System.Text;
7 using kinetica.Utils;
8 
32 
33 namespace kinetica;
34 
38 public partial class Kinetica : IDisposable
39  {
40  private bool _disposed = false;
44  public const int END_OF_SET = -9999;
45 
49  public class Options
50  {
54  public string Username { get; set; } = string.Empty;
55 
59  public string Password { get; set; } = string.Empty;
60 
64  public string OauthToken { get; set; } = string.Empty;
65 
69  public bool UseSnappy { get; set; } = false;
70 
74  public int ThreadCount { get; set; } = 1;
75 
79  public bool DisableFailover { get; set; } = false;
80 
84  public bool DisableAutoDiscovery { get; set; } = false;
85 
89  public HAFailoverOrder HAFailoverOrder { get; set; } = HAFailoverOrder.Random;
90 
94  public int Timeout { get; set; } = 0;
95 
100 
104  public string? HostnameRegex { get; set; } = null;
105 
109  public string PrimaryUrl { get; set; } = string.Empty;
110 
117  public int InitialConnectionAttemptTimeout { get; set; } = 0;
118 
124  public int ServerConnectionTimeout { get; set; } = 60000;
125 
130  public TimeSpan PooledConnectionLifetime { get; set; } = TimeSpan.FromMinutes(2);
131 
135  public TimeSpan PooledConnectionIdleTimeout { get; set; } = TimeSpan.FromMinutes(2);
136  }
137 
142  public static string GetApiVersion() { return API_VERSION; }
143 
147  public string Url { get; private set; }
148 
152  public Uri URL { get; private set; }
153 
157  public string? Username { get; private set; } = null;
158 
162  private string? Password { get; set; } = null;
163 
167  private string? OauthToken { get; set; } = null;
168 
172  private string? Authorization { get; set; } = null;
173 
177  public bool UseSnappy { get; set; } = false;
178 
182  public int ThreadCount { get; set; } = 1;
183 
187  private HAFailoverManager? _haFailoverManager = null;
188 
192  public HAFailoverManager? HAManager => _haFailoverManager;
193 
197  public int NumClusterSwitches => _haFailoverManager?.NumClusterSwitches ?? 0;
198 
202  public IList<ClusterAddressInfo> GetHARingInfo()
203  {
204  return _haFailoverManager?.GetHostAddresses() ?? new List<ClusterAddressInfo>();
205  }
206 
211  {
212  return _haFailoverManager?.GetClusterInfo();
213  }
214 
215  // private string authorization;
216  private volatile System.Collections.Concurrent.ConcurrentDictionary<string, KineticaType> knownTypes = new();
217 
218  // private type label to type ID lookup table
219  private Dictionary<string, string> typeNameLookup = [];
220 
224  private readonly IHttpTransport _transport;
225 
226  // private object class type to KineticaType lookup table
227  private Dictionary<Type, KineticaType> kineticaTypeLookup = [];
228 
235  internal Kinetica(string url_str, IHttpTransport transport, Options? options = null)
236  : this(new List<string> { url_str }, transport, options)
237  {
238  }
239 
246  internal Kinetica(IList<string> urls, IHttpTransport transport, Options? options = null)
247  {
248  if (urls == null || urls.Count == 0)
249  throw new KineticaException("At least one URL must be provided");
250 
251  // Use the first URL as the primary
252  Url = urls[0].TrimEnd('/');
253  URL = new Uri(Url);
254 
255  // Use the provided transport (for testing)
256  _transport = transport;
257 
258  // Initialize other properties from options
259  if (options != null)
260  {
261  Username = options.Username;
262  Password = options.Password;
263  OauthToken = options.OauthToken;
264  UseSnappy = options.UseSnappy;
265  ThreadCount = options.ThreadCount;
266 
267  // Create authorization header
268  Authorization = CreateAuthorizationHeader();
269 
270  // Initialize HA failover manager if multiple URLs or options require it
271  if (urls.Count > 1 || !options.DisableFailover)
272  {
273  _haFailoverManager = new HAFailoverManager
274  {
275  DisableFailover = options.DisableFailover,
276  DisableAutoDiscovery = options.DisableAutoDiscovery,
277  HostManagerPort = options.HostManagerPort,
278  FailoverOrder = options.HAFailoverOrder
279  };
280 
281  if (!string.IsNullOrEmpty(options.HostnameRegex))
282  {
283  _haFailoverManager.HostnameRegex = new System.Text.RegularExpressions.Regex(options.HostnameRegex);
284  }
285 
286  var uriList = urls.Select(u => new Uri(u.TrimEnd('/'))).ToList();
287  _haFailoverManager.Initialize(uriList, this);
288  }
289  }
290  else
291  {
292  _haFailoverManager = new HAFailoverManager { DisableAutoDiscovery = true };
293  var uriList = urls.Select(u => new Uri(u.TrimEnd('/'))).ToList();
294  _haFailoverManager.Initialize(uriList, null);
295  }
296  }
297 
303  public Kinetica( string url_str, Options? options = null )
304  : this(new List<string> { url_str }, options)
305  {
306  }
307 
313  public Kinetica( IList<string> urls, Options? options = null )
314  {
315  if (urls == null || urls.Count == 0)
316  throw new KineticaException("At least one URL must be provided");
317 
318  // Use the first URL as the primary
319  Url = urls[0].TrimEnd('/');
320  URL = new Uri(Url);
321 
322  // Initialize HTTP transport layer
323  var timeout = options?.Timeout > 0
324  ? TimeSpan.FromMilliseconds(options.Timeout)
325  : TimeSpan.FromSeconds(30); // Default timeout
326 
327  _transport = new HttpClientTransport(
328  timeout,
329  options?.PooledConnectionLifetime,
330  options?.PooledConnectionIdleTimeout);
331 
332  if ( null != options ) // If caller specified options
333  {
334  Username = options.Username;
335  Password = options.Password;
336  OauthToken = options.OauthToken;
337 
338  // Handle authorization
339  Authorization = CreateAuthorizationHeader();
340 
341  UseSnappy = options.UseSnappy;
342  ThreadCount = options.ThreadCount;
343 
344  // Initialize HA failover manager if there are multiple URLs or HA options are set
345  if (urls.Count > 1 || !options.DisableAutoDiscovery)
346  {
347  _haFailoverManager = new HAFailoverManager
348  {
349  DisableFailover = options.DisableFailover,
350  DisableAutoDiscovery = options.DisableAutoDiscovery,
351  HostManagerPort = options.HostManagerPort,
352  FailoverOrder = options.HAFailoverOrder
353  };
354 
355  if (!string.IsNullOrEmpty(options.HostnameRegex))
356  {
357  _haFailoverManager.HostnameRegex = new System.Text.RegularExpressions.Regex(options.HostnameRegex);
358  }
359 
360  // Convert string URLs to Uri objects
361  var uriList = urls.Select(u => new Uri(u.TrimEnd('/'))).ToList();
362 
363  // Initialize with retry logic and exponential backoff (matching Rust implementation)
364  InitializeWithRetry(uriList, options);
365 
366  // Update the URL to the current active cluster
367  var currentUrl = _haFailoverManager.GetUrl();
368  if (currentUrl != null)
369  {
370  Url = currentUrl.ToString().TrimEnd('/');
371  URL = currentUrl;
372  }
373  }
374  }
375  else
376  {
377  // No options provided, initialize with a single URL
378  _haFailoverManager = new HAFailoverManager
379  {
380  DisableAutoDiscovery = true
381  };
382  var uriList = urls.Select(u => new Uri(u.TrimEnd('/'))).ToList();
383  _haFailoverManager.Initialize(uriList, null);
384  }
385  }
386 
393  private void InitializeWithRetry(IList<Uri> uriList, Options options)
394  {
395  if (_haFailoverManager == null)
396  throw new InvalidOperationException("HAFailoverManager not initialized");
397 
398  var startTime = DateTime.UtcNow;
399  int attemptNumber = 0;
400  int baseTimeoutMs = options.ServerConnectionTimeout > 0 ? options.ServerConnectionTimeout : 60000;
401  int maxTotalTimeMs = options.InitialConnectionAttemptTimeout;
402  Exception? lastException = null;
403 
404  while (true)
405  {
406  attemptNumber++;
407  int currentTimeout = baseTimeoutMs * (1 << Math.Min(attemptNumber - 1, 5)); // Exponential backoff, cap at 32x
408 
409  try
410  {
411  // Try to initialize with auto-discovery
412  _haFailoverManager.Initialize(uriList, this);
413  return; // Success
414  }
415  catch (Exception ex)
416  {
417  lastException = ex;
418 
419  // Check if this is a non-retriable error (like hostname regex mismatch or authorization failure)
420  if (IsNonRetriableInitializationError(ex))
421  {
422  // Fall back to initialization without auto-discovery
423  try
424  {
425  _haFailoverManager.DisableAutoDiscovery = true;
426  _haFailoverManager.Initialize(uriList, null);
427  return;
428  }
429  catch
430  {
431  throw new KineticaException($"Failed to initialize connection: {ex.Message}", ex);
432  }
433  }
434 
435  // Check if we've exceeded the total timeout
436  var elapsed = (DateTime.UtcNow - startTime).TotalMilliseconds;
437  if (maxTotalTimeMs <= 0 || elapsed >= maxTotalTimeMs)
438  {
439  // No more retries - fall back to initialization without auto-discovery
440  try
441  {
442  _haFailoverManager.DisableAutoDiscovery = true;
443  _haFailoverManager.Initialize(uriList, null);
444  return;
445  }
446  catch
447  {
448  throw new KineticaException($"Failed to initialize connection after {attemptNumber} attempts: {ex.Message}", ex);
449  }
450  }
451 
452  // Wait before retrying (exponential backoff)
453  int waitTime = Math.Min(currentTimeout, (int)(maxTotalTimeMs - elapsed));
454  if (waitTime > 0)
455  {
456  Thread.Sleep(Math.Min(waitTime, 5000)); // Cap wait at 5 seconds per attempt
457  }
458  }
459  }
460  }
461 
465  private static bool IsNonRetriableInitializationError(Exception ex)
466  {
467  // Hostname regex failures won't change with retries
468  if (ex.Message.Contains("hostname", StringComparison.OrdinalIgnoreCase) &&
469  ex.Message.Contains("regex", StringComparison.OrdinalIgnoreCase))
470  {
471  return true;
472  }
473 
474  // Authorization failures won't change with retries
475  if (ex.Message.Contains("Unauthorized", StringComparison.OrdinalIgnoreCase) ||
476  ex.Message.Contains("401", StringComparison.OrdinalIgnoreCase) ||
477  ex.Message.Contains("credentials", StringComparison.OrdinalIgnoreCase))
478  {
479  return true;
480  }
481 
482  return false;
483  }
484 
490  public IList<Uri>? GetCurrentWorkerUrls()
491  {
492  var clusterInfo = _haFailoverManager?.GetClusterInfo();
493  if (clusterInfo?.WorkerRankUrls != null && clusterInfo.WorkerRankUrls.Count > 0)
494  {
495  return clusterInfo.WorkerRankUrls;
496  }
497 
498  // Try to get fresh worker URLs from the server
499  try
500  {
501  var workers = new WorkerList(this);
502  if (workers.Count > 0)
503  {
504  return workers.ToList();
505  }
506  }
507  catch
508  {
509  // Fall back to cached URLs or null
510  }
511 
512  return null;
513  }
514 
520  public IList<int>? GetCurrentRoutingTable()
521  {
522  try
523  {
524  return adminShowShards().rank;
525  }
526  catch
527  {
528  return null;
529  }
530  }
531 
537  public bool RefreshClusterInfo()
538  {
539  if (_haFailoverManager == null)
540  return false;
541 
542  try
543  {
544  // Get fresh system properties to update cluster info
545  var clusterInfo = _haFailoverManager.GetClusterInfo();
546  if (clusterInfo == null)
547  return false;
548 
549  // Update system properties from the server
550  var systemProps = showSystemProperties().property_map;
551  clusterInfo.SystemProperties = systemProps;
552 
553  // Update worker URLs if multi-head is enabled
554  var workers = new WorkerList(this);
555  if (workers.Count > 0)
556  {
557  clusterInfo.WorkerRankUrls = workers.ToList();
558  }
559 
560  return true;
561  }
562  catch
563  {
564  return false;
565  }
566  }
567 
568  internal string? CreateAuthorizationHeader() {
569  string? authorization = null;
570  // Handle authorization
571  if( OauthToken != null && OauthToken.Length > 0 ) {
572  authorization = "Bearer " + OauthToken;
573  }
574  else if ( ( Username != null && ( Username.Length > 0 ) ) || ( Password != null && ( Password.Length > 0 ) ) )
575  {
576  authorization = ( "Basic " +
577  Convert.ToBase64String( Encoding.GetEncoding( "ISO-8859-1" ).GetBytes( Username + ":" + Password ) ) );
578  }
579 
580  return authorization;
581  }
582 
586  public void Dispose()
587  {
588  Dispose(true);
589  GC.SuppressFinalize(this);
590  }
591 
596  protected virtual void Dispose(bool disposing)
597  {
598  if (!_disposed)
599  {
600  if (disposing)
601  {
602  // Dispose managed resources
603  if (_transport is IDisposable disposable)
604  {
605  disposable.Dispose();
606  }
607  }
608 
609  _disposed = true;
610  }
611  }
612 
619  public void AddTableType( string table_name, Type obj_type )
620  {
621  try
622  {
623  // Get the type from the table
624  KineticaType ktype = KineticaType.fromTable( this, table_name );
625  if ( ktype.getTypeID() == null )
626  throw new KineticaException( $"Could not get type ID for table '{table_name}'" );
627  this.knownTypes.TryAdd( ktype.getTypeID(), ktype );
628 
629  // Save a mapping of the object to the KineticaType
630  if ( obj_type != null )
631  this.SetKineticaSourceClassToTypeMapping( obj_type, ktype );
632 
633  } catch ( KineticaException ex )
634  {
635  throw new KineticaException( "Error creating type from table", ex );
636  }
637  } // end AddTableType
638 
645  public void SetKineticaSourceClassToTypeMapping( Type? objectType, KineticaType kineticaType )
646  {
647  if ( objectType != null )
648  this.kineticaTypeLookup[objectType] = kineticaType;
649  return;
650  } // end SetKineticaSourceClassToTypeMapping
651 
652 
653 
663  IList<byte[]> records_binary,
664  IList<T> records ) where T : new()
665  {
666  // Using the KineticaType object, decode all the records from avro binary encoding
667  foreach ( var bin_record in records_binary )
668  {
669  T obj = AvroDecode<T>( bin_record, record_type );
670  records.Add( obj );
671  }
672  } // DecodeRawBinaryDataUsingRecordType
673 
674 
683  public void DecodeRawBinaryDataUsingSchemaString<T>( string schema_string,
684  IList<byte[]> records_binary,
685  IList<T> records ) where T : new()
686  {
687  // Create a KineticaType object based on the schema string
688  KineticaType ktype = new("", schema_string, null);
689 
690  // Using the KineticaType object, decode all the records from avro binary encoding
691  foreach ( var bin_record in records_binary )
692  {
693  T obj = AvroDecode<T>( bin_record, ktype );
694  records.Add( obj );
695  }
696  } // DecodeRawBinaryDataUsingSchemaString
697 
707  public void DecodeRawBinaryDataUsingSchemaString<T>( IList<string> schema_strings,
708  IList<IList<byte[]>> lists_records_binary,
709  IList<IList<T>> record_lists ) where T : new()
710  {
711  // Check that the list of schemas and list of binary encode data match in length
712  if ( schema_strings.Count != lists_records_binary.Count )
713  throw new KineticaException( "List of schemas and list of binary encoded data do not match in count." );
714 
715  // Using the KineticaType object, decode all the records from avro binary encoding
716  for ( int i = 0; i < schema_strings.Count; ++i )
717  {
718  // Create a KineticaType object based on the schema string
719  KineticaType ktype = new( "", schema_strings[ i ], null );
720 
721  // Get the binary encoded data for this list
722  IList<byte[]> records_binary = lists_records_binary[ i ];
723 
724  // Create a container to put the decoded records
725  IList<T> records = [];
726 
727  // The inner list actually contains the binary data
728  foreach ( var bin_record in records_binary )
729  {
730  T obj = AvroDecode<T>( bin_record, ktype );
731  records.Add( obj );
732  }
733  // Add the records into the outgoing list
734  record_lists.Add( records );
735  }
736  } // DecodeRawBinaryDataUsingSchemaString
737 
738 
747  public void DecodeRawBinaryDataUsingTypeIDs<T>( IList<string> type_ids,
748  IList<byte[]> records_binary,
749  IList<T> records ) where T : new()
750  {
751  // Make sure that the length of the type IDs and records are the same
752  if ( type_ids.Count != records_binary.Count )
753  throw new KineticaException( "Unequal numbers of type IDs and binary encoded data objects provided." );
754 
755  // Decode all the records
756  for ( int i = 0; i < records_binary.Count; ++i )
757  {
758  // Per object, use the respective type ID to create the appropriate KineticaType
759  KineticaType ktype = KineticaType.fromTypeID( this, type_ids[ i ] );
760 
761  // Using the KineticaType object, decode the record.
762  T obj = AvroDecode<T>( records_binary[ i ], ktype );
763  records.Add( obj );
764  }
765  } // DecodeRawBinaryDataUsingTypeIDs
766 
767 
776  public void DecodeRawBinaryDataUsingTypeIDs<T>( IList<string> type_ids,
777  IList<IList<byte[]>> lists_records_binary,
778  IList<IList<T>> record_lists ) where T : new()
779  {
780  // Make sure that the length of the type IDs and records are the same
781  if ( type_ids.Count != lists_records_binary.Count )
782  throw new KineticaException( "Unequal numbers of type IDs and binary encoded data objects provided." );
783 
784  // Decode all the records
785  for ( int i = 0; i < lists_records_binary.Count; ++i )
786  {
787  // Per object, use the respective type ID to create the appropriate KineticaType
788  KineticaType ktype = KineticaType.fromTypeID( this, type_ids[ i ] );
789 
790  // Get the binary encoded data for this list
791  IList<byte[]> records_binary = lists_records_binary[ i ];
792 
793  // Create a container to put the decoded records
794  IList<T> records = [];
795 
796  // The inner list actually contains the binary data
797  foreach ( var bin_record in records_binary )
798  {
799  // Using the KineticaType object, decode the record.
800  T obj = AvroDecode<T>( bin_record, ktype );
801  records.Add( obj );
802  }
803  // Add the records into the outgoing list
804  record_lists.Add( records );
805  }
806  } // DecodeRawBinaryDataUsingTypeIDs
807 
808  #region Request Submission API (Matches Rust gpudb.rs design)
809 
822  private TResponse SubmitRequest<TResponse>(string endpoint, object request, bool enableCompression = false, bool avroEncoding = true) where TResponse : new()
823  {
824  // Encode the request
825  byte[] requestBytes = avroEncoding
826  ? AvroEncode(request)
827  : Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(request));
828 
829  // If HA failover is not available, just submit the request directly
830  if (_haFailoverManager == null || _haFailoverManager.HARingSize <= 1)
831  {
832  string fullUrl = Url + endpoint;
833  RawKineticaResponse kineticaResponse = SubmitRequestToUrlInternal(fullUrl, requestBytes, enableCompression, avroEncoding);
834  return DecodeResponse<TResponse>(kineticaResponse, avroEncoding);
835  }
836 
837  // HA failover is available - attempt with failover logic
838  var currentUrl = _haFailoverManager.GetUrl();
839  if (currentUrl == null)
840  {
841  throw new KineticaException("No URL available");
842  }
843 
844  var originalUrl = currentUrl;
845  int currentSwitchCount = _haFailoverManager.NumClusterSwitches;
846 
847  while (true)
848  {
849  try
850  {
851  // Build the full URL with the endpoint
852  string fullUrl = currentUrl.ToString().TrimEnd('/') + endpoint;
853  RawKineticaResponse kineticaResponse = SubmitRequestToUrlInternal(fullUrl, requestBytes, enableCompression, avroEncoding);
854  return DecodeResponse<TResponse>(kineticaResponse, avroEncoding);
855  }
856  catch (Exception ex) when (IsConnectionError(ex))
857  {
858  // This is a connection error - attempt failover
859  try
860  {
861  currentUrl = _haFailoverManager.SwitchUrl(originalUrl, currentSwitchCount, IsKineticaRunning);
862  // Update the main URL reference
863  Url = currentUrl.ToString().TrimEnd('/');
864  URL = currentUrl;
865  }
866  catch (KineticaException)
867  {
868  // Failover failed - re-throw the original exception
869  throw new KineticaException($"Connection failed and HA failover unsuccessful: {ex.Message}", ex);
870  }
871  }
872  catch (KineticaException)
873  {
874  // API error from server - don't failover, just rethrow
875  throw;
876  }
877  }
878  }
879 
893  private async System.Threading.Tasks.Task<TResponse> SubmitRequestAsync<TResponse>(
894  string endpoint,
895  object request,
896  bool enableCompression = false,
897  bool avroEncoding = true,
898  System.Threading.CancellationToken cancellationToken = default)
899  where TResponse : new()
900  {
901  // Encode the request
902  byte[] requestBytes = avroEncoding
903  ? AvroEncode(request)
904  : Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(request));
905 
906  // If HA failover is not available, just submit the request directly
907  if (_haFailoverManager == null || _haFailoverManager.HARingSize <= 1)
908  {
909  string fullUrl = Url + endpoint;
910  RawKineticaResponse kineticaResponse = await SubmitRequestToUrlInternalAsync(
911  fullUrl, requestBytes, enableCompression, avroEncoding, cancellationToken);
912  return DecodeResponse<TResponse>(kineticaResponse, avroEncoding);
913  }
914 
915  // HA failover is available - attempt with failover logic
916  var currentUrl = _haFailoverManager.GetUrl();
917  if (currentUrl == null)
918  {
919  throw new KineticaException("No URL available");
920  }
921 
922  var originalUrl = currentUrl;
923  int currentSwitchCount = _haFailoverManager.NumClusterSwitches;
924 
925  while (true)
926  {
927  try
928  {
929  // Build the full URL with the endpoint
930  string fullUrl = currentUrl.ToString().TrimEnd('/') + endpoint;
931  RawKineticaResponse kineticaResponse = await SubmitRequestToUrlInternalAsync(
932  fullUrl, requestBytes, enableCompression, avroEncoding, cancellationToken);
933  return DecodeResponse<TResponse>(kineticaResponse, avroEncoding);
934  }
935  catch (Exception ex) when (IsConnectionError(ex))
936  {
937  // This is a connection error - attempt failover
938  try
939  {
940  currentUrl = _haFailoverManager.SwitchUrl(originalUrl, currentSwitchCount, IsKineticaRunning);
941  // Update the main URL reference
942  Url = currentUrl.ToString().TrimEnd('/');
943  URL = currentUrl;
944  }
945  catch (KineticaException)
946  {
947  // Failover failed - re-throw the original exception
948  throw new KineticaException($"Connection failed and HA failover unsuccessful: {ex.Message}", ex);
949  }
950  }
951  catch (KineticaException)
952  {
953  // API error from server - don't failover, just rethrow
954  throw;
955  }
956  }
957  }
958 
971  public TResponse SubmitRequestRaw<TResponse>(Uri url, object request, bool enableCompression = false, bool avroEncoding = true) where TResponse : new()
972  {
973  // Encode the request
974  byte[] requestBytes = avroEncoding
975  ? AvroEncode(request)
976  : Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(request));
977 
978  RawKineticaResponse kineticaResponse = SubmitRequestToUrlInternal(url.ToString(), requestBytes, enableCompression, avroEncoding);
979  return DecodeResponse<TResponse>(kineticaResponse, avroEncoding);
980  }
981 
992  public RawKineticaResponse SubmitRequestRawBytes(Uri url, byte[] requestBytes)
993  {
994  return SubmitRequestToUrlInternal(url.ToString(), requestBytes, UseSnappy, true);
995  }
996 
997  #endregion
998 
999  #region Internal HTTP Helpers
1000 
1004  private TResponse DecodeResponse<TResponse>(RawKineticaResponse kineticaResponse, bool avroEncoding) where TResponse : new()
1005  {
1006  if (avroEncoding)
1007  {
1008  return AvroDecode<TResponse>(kineticaResponse.data);
1009  }
1010  else // JSON
1011  {
1012  kineticaResponse.data_str = kineticaResponse.data_str.Replace("\\U", "\\u");
1013  return JsonConvert.DeserializeObject<TResponse>(kineticaResponse.data_str);
1014  }
1015  }
1016 
1022  public bool IsKineticaRunning(Uri url)
1023  {
1024  try
1025  {
1026  // Simple GET request to check if server is running
1027  // We can't use the transport layer here as it only supports POST
1028  // So we'll create a temporary HttpClient for this check
1029  using var client = new HttpClient { Timeout = TimeSpan.FromSeconds(5) };
1030  using var response = client.GetAsync(url).Result;
1031 
1032  if (response.IsSuccessStatusCode)
1033  {
1034  string responseText = response.Content.ReadAsStringAsync().Result;
1035  return responseText.Contains("Kinetica is running!");
1036  }
1037  return false;
1038  }
1039  catch
1040  {
1041  return false;
1042  }
1043  }
1044 
1054  private RawKineticaResponse SubmitRequestToUrlInternal(string url, byte[] requestBytes, bool enableCompression, bool avroEncoding)
1055  {
1056  try
1057  {
1058  // Apply Snappy compression if enabled
1059  byte[] bodyBytes;
1060  string contentType;
1061 
1062  if (enableCompression && avroEncoding)
1063  {
1064  // Compress using Snappier (pure managed Snappy implementation)
1065  bodyBytes = Snappy.CompressToArray(requestBytes);
1066  contentType = "application/x-snappy";
1067  }
1068  else
1069  {
1070  bodyBytes = requestBytes;
1071  contentType = avroEncoding ? "application/octet-stream" : "application/json";
1072  }
1073 
1074  // Use the HTTP transport layer to send the request
1075  var responseBytes = _transport.Post(
1076  url,
1077  bodyBytes,
1078  contentType,
1079  Authorization,
1080  System.Threading.CancellationToken.None);
1081 
1082  // Decode the response
1083  if (avroEncoding)
1084  {
1085  return AvroDecode<RawKineticaResponse>(responseBytes);
1086  }
1087  else // JSON
1088  {
1089  var responseString = Encoding.UTF8.GetString(responseBytes);
1090  responseString = responseString.Replace("\\U", "\\u");
1091  return JsonConvert.DeserializeObject<RawKineticaResponse>(responseString)
1092  ?? throw new KineticaException("Failed to deserialize response");
1093  }
1094  }
1095  catch (KineticaTransportException tex)
1096  {
1097  // HTTP transport returned a non-2xx status code
1098  // The server may have encoded an error message in the response body
1099  try
1100  {
1101  RawKineticaResponse? serverResponse;
1102  if (avroEncoding)
1103  {
1104  serverResponse = AvroDecode<RawKineticaResponse>(tex.Body);
1105  }
1106  else // JSON
1107  {
1108  var responseString = Encoding.UTF8.GetString(tex.Body);
1109  serverResponse = JsonConvert.DeserializeObject<RawKineticaResponse>(responseString);
1110  }
1111 
1112  throw new KineticaException(
1113  serverResponse?.message ?? $"Server returned HTTP {tex.StatusCode}",
1114  tex.StatusCode,
1115  tex);
1116  }
1117  catch (KineticaException)
1118  {
1119  throw;
1120  }
1121  catch
1122  {
1123  // Could not decode error response - throw with status code
1124  throw new KineticaException($"Server returned HTTP {tex.StatusCode}", tex.StatusCode, tex);
1125  }
1126  }
1127  catch (HttpRequestException ex)
1128  {
1129  throw new KineticaException(ex.ToString(), ex);
1130  }
1131  catch (TaskCanceledException ex)
1132  {
1133  throw new KineticaException("Request timed out: " + ex.ToString(), ex);
1134  }
1135  catch (OperationCanceledException ex)
1136  {
1137  throw new KineticaException("Request cancelled: " + ex.ToString(), ex);
1138  }
1139  catch (KineticaException)
1140  {
1141  throw;
1142  }
1143  catch (Exception ex)
1144  {
1145  throw new KineticaException(ex.ToString(), ex);
1146  }
1147  }
1148 
1159  private async Task<RawKineticaResponse> SubmitRequestToUrlInternalAsync(
1160  string url,
1161  byte[] requestBytes,
1162  bool enableCompression,
1163  bool avroEncoding,
1164  System.Threading.CancellationToken cancellationToken = default)
1165  {
1166  try
1167  {
1168  // Apply Snappy compression if enabled
1169  byte[] bodyBytes;
1170  string contentType;
1171 
1172  if (enableCompression && avroEncoding)
1173  {
1174  // Compress using Snappier (pure managed Snappy implementation)
1175  bodyBytes = Snappy.CompressToArray(requestBytes);
1176  contentType = "application/x-snappy";
1177  }
1178  else
1179  {
1180  bodyBytes = requestBytes;
1181  contentType = avroEncoding ? "application/octet-stream" : "application/json";
1182  }
1183 
1184  // Use the HTTP transport layer to send the request asynchronously
1185  var responseBytes = await _transport
1186  .PostAsync(url, bodyBytes, contentType, Authorization, cancellationToken)
1187  .ConfigureAwait(false);
1188 
1189  // Decode the response
1190  if (avroEncoding)
1191  {
1192  return AvroDecode<RawKineticaResponse>(responseBytes);
1193  }
1194  else // JSON
1195  {
1196  var responseString = Encoding.UTF8.GetString(responseBytes);
1197  responseString = responseString.Replace("\\U", "\\u");
1198  return JsonConvert.DeserializeObject<RawKineticaResponse>(responseString)
1199  ?? throw new KineticaException("Failed to deserialize response");
1200  }
1201  }
1202  catch (KineticaTransportException tex)
1203  {
1204  // HTTP transport returned a non-2xx status code
1205  // The server may have encoded an error message in the response body
1206  try
1207  {
1208  RawKineticaResponse? serverResponse;
1209  if (avroEncoding)
1210  {
1211  serverResponse = AvroDecode<RawKineticaResponse>(tex.Body);
1212  }
1213  else // JSON
1214  {
1215  var responseString = Encoding.UTF8.GetString(tex.Body);
1216  serverResponse = JsonConvert.DeserializeObject<RawKineticaResponse>(responseString);
1217  }
1218 
1219  throw new KineticaException(
1220  serverResponse?.message ?? $"Server returned HTTP {tex.StatusCode}",
1221  tex.StatusCode,
1222  tex);
1223  }
1224  catch (KineticaException)
1225  {
1226  throw;
1227  }
1228  catch
1229  {
1230  // Could not decode error response - throw with status code
1231  throw new KineticaException($"Server returned HTTP {tex.StatusCode}", tex.StatusCode, tex);
1232  }
1233  }
1234  catch (HttpRequestException ex)
1235  {
1236  throw new KineticaException(ex.ToString(), ex);
1237  }
1238  catch (TaskCanceledException ex)
1239  {
1240  throw new KineticaException("Request timed out: " + ex.ToString(), ex);
1241  }
1242  catch (OperationCanceledException ex)
1243  {
1244  throw new KineticaException("Request cancelled: " + ex.ToString(), ex);
1245  }
1246  catch (KineticaException)
1247  {
1248  throw;
1249  }
1250  catch (Exception ex)
1251  {
1252  throw new KineticaException(ex.ToString(), ex);
1253  }
1254  }
1255 
1261  public static bool IsConnectionError(Exception ex)
1262  {
1263  return ex is System.Net.WebException webEx && webEx.Status != WebExceptionStatus.ProtocolError ||
1264  ex is System.Net.Sockets.SocketException ||
1265  ex is IOException ||
1266  ex is System.Net.Http.HttpRequestException ||
1267  ex is TaskCanceledException ||
1268  (ex is KineticaException kex && kex.Message.Contains("connection", StringComparison.OrdinalIgnoreCase));
1269  }
1270 
1278  internal Uri? ForceHAFailover(Uri currentUrl, int currentSwitchCount)
1279  {
1280  if (_haFailoverManager == null || _haFailoverManager.HARingSize <= 1)
1281  {
1282  return null;
1283  }
1284 
1285  try
1286  {
1287  var newUrl = _haFailoverManager.SwitchUrl(currentUrl, currentSwitchCount, IsKineticaRunning);
1288  Url = newUrl.ToString().TrimEnd('/');
1289  URL = newUrl;
1290  return newUrl;
1291  }
1292  catch (KineticaException)
1293  {
1294  return null;
1295  }
1296  }
1297 
1301  public int HARingSize => _haFailoverManager?.HARingSize ?? 1;
1302 
1303  #endregion
1304 
1305  #region Type Registration and Encoding
1306 
1307  private void SetDecoderIfMissing(string typeId, string label, string schemaString, IDictionary<string, IList<string>> properties)
1308  {
1309  // If the table is a collection, it does not have a proper type so ignore it
1310 
1311  if (typeId == "<collection>")
1312  {
1313  return;
1314  }
1315 
1316  knownTypes.GetOrAdd(typeId, (s) =>
1317  {
1318  return new KineticaType(label, schemaString, properties);
1319  });
1320  typeNameLookup[label] = typeId;
1321  }
1322 
1323 
1329  private KineticaType? GetType(string typeName)
1330  {
1331  KineticaType? type = null;
1332  if (typeNameLookup.TryGetValue(typeName, out string? typeId))
1333  {
1334  knownTypes.TryGetValue(typeId, out type);
1335  }
1336 
1337  return type;
1338  }
1339 
1340 
1346  private KineticaType? LookupKineticaType( Type objectType )
1347  {
1348  if (!kineticaTypeLookup.TryGetValue(objectType, out KineticaType? value))
1349  return null; // none found
1350 
1351  return value;
1352  } // LookupKineticaType()
1353 
1354 
1360  internal byte[] AvroEncode(object obj)
1361  {
1362  // Create a stream that will allow us to view the underlying memory
1363  using ( var ms = new MemoryStream())
1364  {
1365  // Write the object to the memory stream
1366  // If obj is an ISpecificRecord, this is more efficient
1367  if ( obj is Avro.Specific.ISpecificRecord)
1368  {
1369  var schema = (obj as Avro.Specific.ISpecificRecord).Schema;
1370  Avro.Specific.SpecificDefaultWriter writer = new(schema);
1371  writer.Write(schema, obj, new BinaryEncoder(ms));
1372  }
1373  else // Not an ISpecificRecord - this way is less efficient
1374  {
1375  // Get the KineticaType associated with the object to be encoded
1376  Type obj_type = obj.GetType();
1377  KineticaType? ktype = LookupKineticaType( obj_type );
1378  if ( ktype == null )
1379  {
1380  throw new KineticaException( "No known KineticaType associated with the given object. " +
1381  "Need a known KineticaType to encode the object." );
1382  }
1383 
1384  // Make a copy of the object to send as a GenericRecord, then write that to the memory stream
1385  var schema = KineticaData.SchemaFromType( obj.GetType(), ktype );
1386  var recordToSend = MakeGenericRecord( obj, ktype );
1387  var writer = new Avro.Generic.DefaultWriter(schema);
1388  writer.Write(schema, recordToSend, new BinaryEncoder(ms));
1389  }
1390 
1391  // Get the memory from the stream
1392  return ms.ToArray();
1393  }
1394  } // end AvroEncode
1395 
1403  private Avro.Generic.GenericRecord MakeGenericRecord( object obj, KineticaType ktype )
1404  {
1405  // Get the schema
1406  var schema = KineticaData.SchemaFromType( obj.GetType(), ktype );
1407 
1408  // Create a new GenericRecord for this schema
1409  var recordToSend = new Avro.Generic.GenericRecord(schema);
1410 
1411  // Copy each field from obj to recordToSend
1412  foreach ( var field in schema.Fields)
1413  {
1414  var property = obj.GetType()
1415  .GetProperties()
1416  .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
1417 
1418  if (property == null) continue;
1419 
1420  recordToSend.Add(field.Name, property.GetValue(obj, null));
1421  }
1422 
1423  // Return the newly created object
1424  return recordToSend;
1425  }
1426 
1434  internal T AvroDecode<T>(byte[] bytes, KineticaType? ktype = null) where T : new()
1435  {
1436  // Get the schema
1437  var schema = KineticaData.SchemaFromType( typeof(T), ktype );
1438 
1439  // Create a stream to read the binary data
1440  using (var ms = new MemoryStream(bytes))
1441  {
1442  // Create a new object to return
1443  T obj = new();
1444  if (obj is Avro.Specific.ISpecificRecord)
1445  {
1446  var reader = new Avro.Specific.SpecificDefaultReader(schema, schema);
1447  reader.Read(obj, new BinaryDecoder(ms));
1448  }
1449  else
1450  {
1451  // Not ISpecificRecord, so first read into a new GenericRecord
1452  var reader = new Avro.Generic.DefaultReader(schema, schema);
1453  Avro.Generic.GenericRecord recordToReceive = new(schema);
1454  reader.Read(recordToReceive, new BinaryDecoder(ms));
1455 
1456  // Now, copy all the fields from the GenericRecord to obj
1457  foreach (var field in schema.Fields)
1458  {
1459  var property = obj.GetType()
1460  .GetProperties()
1461  .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
1462 
1463  if (property == null) continue;
1464 
1465  // Try to get the property
1466  if (recordToReceive.TryGetValue(field.Name, out object val))
1467  {
1468  // If successful, write the property to obj
1469  property.SetValue(obj, val);
1470  }
1471  } // end foreach
1472  } // end if-else
1473 
1474  // Return the new object
1475  return obj;
1476  } // end using
1477  } // end AvroDecode<T>
1478 
1479 
1486  internal T AvroDecode<T>(Stream stream) where T : Avro.Specific.ISpecificRecord, new()
1487  {
1488  // T obj = new T(); // Activator.CreateInstance<T>();
1489  var schema = KineticaData.SchemaFromType( typeof(T), null );
1490  var reader = new Avro.Specific.SpecificReader<T>(schema, schema);
1491  return reader.Read(default, new BinaryDecoder(stream));
1492  }
1493 
1494  #endregion
1495  } // end class Kinetica
1496 
1497 
Reader wrapper class for reading data and storing into specific classes
ClusterAddressInfo? GetClusterInfo()
Gets the active cluster's information.
Definition: HAFailover.cs:319
int StatusCode
HTTP status code from the server response.
A list of worker URLs to use for multi-head ingest.
Definition: WorkerList.cs:11
byte [] Body
Raw response body bytes (may contain Avro-encoded error message).
void AddTableType(string table_name, Type obj_type)
Given a table name, add its record type to enable proper encoding of records for insertion or updates...
Definition: Kinetica.cs:619
string? HostnameRegex
Optional: Regex pattern to filter URLs by hostname/IP
Definition: Kinetica.cs:104
void Add(string fieldName, object fieldValue)
bool UseSnappy
Use Snappy compression for requests
Definition: Kinetica.cs:69
Kinetica(string url_str, Options? options=null)
API Constructor
Definition: Kinetica.cs:303
int Timeout
Request timeout in milliseconds (0 = infinite)
Definition: Kinetica.cs:94
IList< int >? GetCurrentRoutingTable()
Gets the current routing table for multi-head operations.
Definition: Kinetica.cs:520
bool DisableFailover
Whether failover is disabled
Definition: HAFailover.cs:214
TResponse SubmitRequestRaw< TResponse >(Uri url, object request, bool enableCompression=false, bool avroEncoding=true)
Submit a request directly to a specific URL without HA failover.
Definition: Kinetica.cs:971
static KineticaType fromTypeID(Kinetica kinetica, string typeId)
Create a KineticaType object based on an existing type in the database.
int InitialConnectionAttemptTimeout
Initial connection attempt timeout in milliseconds.
Definition: Kinetica.cs:117
TimeSpan PooledConnectionLifetime
Maximum lifetime of pooled HTTP connections.
Definition: Kinetica.cs:130
int HARingSize
Gets the number of clusters in the HA ring.
Definition: HAFailover.cs:256
Kinetica(IList< string > urls, Options? options=null)
API Constructor with multiple URLs for HA failover support.
Definition: Kinetica.cs:313
IList< ClusterAddressInfo > GetHostAddresses()
Gets all cluster addresses.
Definition: HAFailover.cs:283
The default implementation for the generic reader.
T Read(T reuse, Decoder dec)
Generic read function
const string API_VERSION
static bool IsConnectionError(Exception ex)
Checks if an exception is a connection error that warrants HA failover.
Definition: Kinetica.cs:1261
KineticaData - class to help with Avro Encoding for Kinetica
Definition: KineticaData.cs:14
HAFailoverManager? HAManager
Gets the HA failover manager instance.
Definition: Kinetica.cs:192
int NumClusterSwitches
Gets the number of times the client has switched to a different cluster.
Definition: Kinetica.cs:197
string? Username
Optional: User Name for Kinetica security
Definition: Kinetica.cs:157
bool DisableFailover
Whether to disable failover upon failures
Definition: Kinetica.cs:79
string Username
Optional: User Name for Kinetica security
Definition: Kinetica.cs:54
A General purpose writer for serializing objects into a Stream using Avro.
Interface class for generated classes
int ThreadCount
Thread Count
Definition: Kinetica.cs:74
Reader class for reading data and storing into specific classes
bool DisableAutoDiscovery
Whether auto-discovery is disabled
Definition: HAFailover.cs:219
Connection Options
Definition: Kinetica.cs:49
void Initialize(IList< Uri > urls, Kinetica? kinetica=null)
Initializes the manager with a list of URLs.
Definition: HAFailover.cs:364
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
int ServerConnectionTimeout
Server connection timeout in milliseconds.
Definition: Kinetica.cs:124
int ThreadCount
Thread Count
Definition: Kinetica.cs:182
void DecodeRawBinaryDataUsingTypeIDs< T >(IList< string > type_ids, IList< byte[]> records_binary, IList< T > records)
Given IDs of records types registered with Kinetica, decode binary data into distinct records (object...
Definition: Kinetica.cs:747
ShowSystemPropertiesResponse showSystemProperties(ShowSystemPropertiesRequest request_)
Returns server configuration and version related information to the caller.
const int DefaultHostManagerPort
Definition: HAFailover.cs:193
Thrown by HttpClientTransport when the server responds with a non-2xx status code.
Write leaf values.
The default type used by GenericReader and GenericWriter for RecordSchema.
Uri? GetUrl()
Gets the current active URL.
Definition: HAFailover.cs:343
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
const int END_OF_SET
No Limit
Definition: Kinetica.cs:44
bool UseSnappy
Use Snappy
Definition: Kinetica.cs:177
virtual void Write(Schema schema, object value, Encoder encoder)
Examines the schema and dispatches the actual work to one of the other methods of this class.
object Read(object reuse, Schema writerSchema, Schema readerSchema, Decoder d)
IList< Uri > WorkerRankUrls
List of worker rank URLs
Definition: HAFailover.cs:45
bool DisableAutoDiscovery
Whether to disable automatic discovery of clusters and worker ranks
Definition: Kinetica.cs:84
Contains address information for a Kinetica cluster.
Definition: HAFailover.cs:30
int HARingSize
Gets the HA ring size.
Definition: Kinetica.cs:1301
string PrimaryUrl
URL of the primary cluster in the HA environment
Definition: Kinetica.cs:109
static ? RecordSchema SchemaFromType(System.Type t, KineticaType? ktype=null)
Create an Avro Schema from a System.Type and a KineticaType.
Definition: KineticaData.cs:92
ClusterAddressInfo? GetCurrentClusterInfo()
Gets the current active cluster information.
Definition: Kinetica.cs:210
virtual void Dispose(bool disposing)
Disposes managed and unmanaged resources.
Definition: Kinetica.cs:596
bool RefreshClusterInfo()
Refreshes cluster information after a failover.
Definition: Kinetica.cs:537
Abstraction over the raw HTTP POST layer.
void DecodeRawBinaryDataUsingRecordType< T >(KineticaType record_type, IList< byte[]> records_binary, IList< T > records)
Given a KineticaType object for a certain record type, decode binary data into distinct records (obje...
Definition: Kinetica.cs:662
void DecodeRawBinaryDataUsingSchemaString< T >(string schema_string, IList< byte[]> records_binary, IList< T > records)
Given a schema string for a certain record type, decode binary data into distinct records (objects).
Definition: Kinetica.cs:683
IList< int > rank
Array of ranks indexed by the shard number.
HAFailoverOrder
High availability failover order options.
Definition: HAFailover.cs:12
string getTypeID()
bool IsKineticaRunning(Uri url)
Checks if Kinetica is running at the given URL.
Definition: Kinetica.cs:1022
IDictionary< string, string > property_map
A map of server configuration parameters and version information.
IHttpTransport implementation backed by HttpClient
Uri SwitchUrl(Uri oldUrl, int oldNumClusterSwitches, Func< Uri, bool >? isKineticaRunning=null)
Switches to the next available cluster URL for HA failover.
Definition: HAFailover.cs:703
Manages high availability failover for Kinetica connections.
Definition: HAFailover.cs:190
void Dispose()
Disposes the Kinetica client and releases HTTP resources.
Definition: Kinetica.cs:586
Regex? HostnameRegex
Optional hostname regex for filtering URLs
Definition: HAFailover.cs:234
Immutable collection of metadata about a Kinetica type.
Definition: Type.cs:36
string Password
Optional: Password for user
Definition: Kinetica.cs:59
string Url
URL for Kinetica Server (including "http:" and port) as a string
Definition: Kinetica.cs:147
RawKineticaResponse SubmitRequestRawBytes(Uri url, byte[] requestBytes)
Submit pre-encoded request bytes directly to a specific URL without HA failover.
Definition: Kinetica.cs:992
AdminShowShardsResponse adminShowShards(AdminShowShardsRequest request_)
Show the mapping of shards to the corresponding rank and tom.
void SetKineticaSourceClassToTypeMapping(Type? objectType, KineticaType kineticaType)
Saves an object class type to a KineticaType association.
Definition: Kinetica.cs:645
Class for writing data from any specific objects
int HostManagerPort
Host manager port number
Definition: Kinetica.cs:99
int NumClusterSwitches
Gets the number of times the client has switched to a different cluster.
Definition: HAFailover.cs:270
DateTime in YYYY-MM-DD HH:MM:SS.mmm format
static string GetApiVersion()
API Version
Definition: Kinetica.cs:142
IList< Uri >? GetCurrentWorkerUrls()
Gets the current worker URLs for multi-head operations.
Definition: Kinetica.cs:490
Uri URL
URL for Kinetica Server (including "http:" and port)
Definition: Kinetica.cs:152
string OauthToken
Optional: OauthToken for user
Definition: Kinetica.cs:64
IList< ClusterAddressInfo > GetHARingInfo()
Gets the list of all cluster addresses in the HA ring.
Definition: Kinetica.cs:202
TimeSpan PooledConnectionIdleTimeout
Idle timeout for pooled HTTP connections.
Definition: Kinetica.cs:135
Decoder for Avro binary format