5 using System.Net.Http.Headers;
40 private bool _disposed =
false;
54 public string Username {
get;
set; } =
string.Empty;
59 public string Password {
get;
set; } =
string.Empty;
147 public string Url {
get;
private set; }
152 public Uri
URL {
get;
private set; }
157 public string?
Username {
get;
private set; } =
null;
162 private string? Password {
get;
set; } =
null;
167 private string? OauthToken {
get;
set; } =
null;
172 private string? Authorization {
get;
set; } =
null;
204 return _haFailoverManager?.
GetHostAddresses() ??
new List<ClusterAddressInfo>();
216 private volatile System.Collections.Concurrent.ConcurrentDictionary<string,
KineticaType> knownTypes =
new();
219 private Dictionary<string, string> typeNameLookup = [];
227 private Dictionary<Type, KineticaType> kineticaTypeLookup = [];
236 : this(new List<string> { url_str }, transport, options)
248 if (urls ==
null || urls.Count == 0)
252 Url = urls[0].TrimEnd(
'/');
256 _transport = transport;
262 Password = options.Password;
263 OauthToken = options.OauthToken;
268 Authorization = CreateAuthorizationHeader();
271 if (urls.Count > 1 || !options.DisableFailover)
276 DisableAutoDiscovery = options.DisableAutoDiscovery,
277 HostManagerPort = options.HostManagerPort,
278 FailoverOrder = options.HAFailoverOrder
281 if (!
string.IsNullOrEmpty(options.HostnameRegex))
283 _haFailoverManager.
HostnameRegex =
new System.Text.RegularExpressions.Regex(options.HostnameRegex);
286 var uriList = urls.Select(u =>
new Uri(u.TrimEnd(
'/'))).ToList();
293 var uriList = urls.Select(u =>
new Uri(u.TrimEnd(
'/'))).ToList();
304 : this(new List<string> { url_str }, options)
315 if (urls ==
null || urls.Count == 0)
319 Url = urls[0].TrimEnd(
'/');
323 var timeout = options?.Timeout > 0
324 ? TimeSpan.FromMilliseconds(options.Timeout)
325 : TimeSpan.FromSeconds(30);
329 options?.PooledConnectionLifetime,
330 options?.PooledConnectionIdleTimeout);
332 if (
null != options )
335 Password = options.Password;
336 OauthToken = options.OauthToken;
339 Authorization = CreateAuthorizationHeader();
345 if (urls.Count > 1 || !options.DisableAutoDiscovery)
350 DisableAutoDiscovery = options.DisableAutoDiscovery,
351 HostManagerPort = options.HostManagerPort,
352 FailoverOrder = options.HAFailoverOrder
355 if (!
string.IsNullOrEmpty(options.HostnameRegex))
357 _haFailoverManager.
HostnameRegex =
new System.Text.RegularExpressions.Regex(options.HostnameRegex);
361 var uriList = urls.Select(u =>
new Uri(u.TrimEnd(
'/'))).ToList();
364 InitializeWithRetry(uriList, options);
367 var currentUrl = _haFailoverManager.
GetUrl();
368 if (currentUrl !=
null)
370 Url = currentUrl.ToString().TrimEnd(
'/');
380 DisableAutoDiscovery =
true 382 var uriList = urls.Select(u =>
new Uri(u.TrimEnd(
'/'))).ToList();
393 private void InitializeWithRetry(IList<Uri> uriList, Options options)
395 if (_haFailoverManager ==
null)
396 throw new InvalidOperationException(
"HAFailoverManager not initialized");
399 int attemptNumber = 0;
400 int baseTimeoutMs = options.ServerConnectionTimeout > 0 ? options.ServerConnectionTimeout : 60000;
401 int maxTotalTimeMs = options.InitialConnectionAttemptTimeout;
402 Exception? lastException =
null;
407 int currentTimeout = baseTimeoutMs * (1 << Math.Min(attemptNumber - 1, 5));
420 if (IsNonRetriableInitializationError(ex))
431 throw new KineticaException($
"Failed to initialize connection: {ex.Message}", ex);
436 var elapsed = (
DateTime.UtcNow - startTime).TotalMilliseconds;
437 if (maxTotalTimeMs <= 0 || elapsed >= maxTotalTimeMs)
448 throw new KineticaException($
"Failed to initialize connection after {attemptNumber} attempts: {ex.Message}", ex);
453 int waitTime = Math.Min(currentTimeout, (
int)(maxTotalTimeMs - elapsed));
456 Thread.Sleep(Math.Min(waitTime, 5000));
465 private static bool IsNonRetriableInitializationError(Exception ex)
468 if (ex.Message.Contains(
"hostname", StringComparison.OrdinalIgnoreCase) &&
469 ex.Message.Contains(
"regex", StringComparison.OrdinalIgnoreCase))
475 if (ex.Message.Contains(
"Unauthorized", StringComparison.OrdinalIgnoreCase) ||
476 ex.Message.Contains(
"401", StringComparison.OrdinalIgnoreCase) ||
477 ex.Message.Contains(
"credentials", StringComparison.OrdinalIgnoreCase))
493 if (clusterInfo?.WorkerRankUrls !=
null && clusterInfo.WorkerRankUrls.Count > 0)
502 if (workers.Count > 0)
504 return workers.ToList();
539 if (_haFailoverManager ==
null)
546 if (clusterInfo ==
null)
551 clusterInfo.SystemProperties = systemProps;
555 if (workers.Count > 0)
557 clusterInfo.WorkerRankUrls = workers.ToList();
568 internal string? CreateAuthorizationHeader() {
569 string? authorization =
null;
571 if( OauthToken !=
null && OauthToken.Length > 0 ) {
572 authorization =
"Bearer " + OauthToken;
574 else if ( (
Username !=
null && (
Username.Length > 0 ) ) || ( Password !=
null && ( Password.Length > 0 ) ) )
576 authorization = (
"Basic " +
577 Convert.ToBase64String( Encoding.GetEncoding(
"ISO-8859-1" ).GetBytes(
Username +
":" + Password ) ) );
580 return authorization;
589 GC.SuppressFinalize(
this);
596 protected virtual void Dispose(
bool disposing)
603 if (_transport is IDisposable disposable)
605 disposable.Dispose();
626 throw new KineticaException( $
"Could not get type ID for table '{table_name}'" );
627 this.knownTypes.TryAdd( ktype.
getTypeID(), ktype );
630 if ( obj_type !=
null )
647 if ( objectType !=
null )
648 this.kineticaTypeLookup[objectType] = kineticaType;
663 IList<
byte[]> records_binary,
664 IList<T> records ) where T :
new()
667 foreach ( var bin_record
in records_binary )
669 T obj = AvroDecode<T>( bin_record, record_type );
684 IList<
byte[]> records_binary,
685 IList<T> records ) where T :
new()
691 foreach ( var bin_record
in records_binary )
693 T obj = AvroDecode<T>( bin_record, ktype );
708 IList<IList<
byte[]>> lists_records_binary,
709 IList<IList<T>> record_lists ) where T :
new()
712 if ( schema_strings.Count != lists_records_binary.Count )
713 throw new KineticaException(
"List of schemas and list of binary encoded data do not match in count." );
716 for (
int i = 0; i < schema_strings.Count; ++i )
719 KineticaType ktype =
new(
"", schema_strings[ i ], null );
722 IList<byte[]> records_binary = lists_records_binary[ i ];
725 IList<T> records = [];
728 foreach ( var bin_record
in records_binary )
730 T obj = AvroDecode<T>( bin_record, ktype );
734 record_lists.Add( records );
748 IList<
byte[]> records_binary,
749 IList<T> records ) where T :
new()
752 if ( type_ids.Count != records_binary.Count )
753 throw new KineticaException(
"Unequal numbers of type IDs and binary encoded data objects provided." );
756 for (
int i = 0; i < records_binary.Count; ++i )
762 T obj = AvroDecode<T>( records_binary[ i ], ktype );
777 IList<IList<
byte[]>> lists_records_binary,
778 IList<IList<T>> record_lists ) where T :
new()
781 if ( type_ids.Count != lists_records_binary.Count )
782 throw new KineticaException(
"Unequal numbers of type IDs and binary encoded data objects provided." );
785 for (
int i = 0; i < lists_records_binary.Count; ++i )
791 IList<byte[]> records_binary = lists_records_binary[ i ];
794 IList<T> records = [];
797 foreach ( var bin_record
in records_binary )
800 T obj = AvroDecode<T>( bin_record, ktype );
804 record_lists.Add( records );
808 #region Request Submission API (Matches Rust gpudb.rs design) 822 private TResponse SubmitRequest<TResponse>(
string endpoint,
object request,
bool enableCompression =
false,
bool avroEncoding =
true) where TResponse : new()
825 byte[] requestBytes = avroEncoding
826 ? AvroEncode(request)
827 : Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(request));
830 if (_haFailoverManager ==
null || _haFailoverManager.
HARingSize <= 1)
832 string fullUrl =
Url + endpoint;
833 RawKineticaResponse kineticaResponse = SubmitRequestToUrlInternal(fullUrl, requestBytes, enableCompression, avroEncoding);
834 return DecodeResponse<TResponse>(kineticaResponse, avroEncoding);
838 var currentUrl = _haFailoverManager.
GetUrl();
839 if (currentUrl ==
null)
844 var originalUrl = currentUrl;
852 string fullUrl = currentUrl.ToString().TrimEnd(
'/') + endpoint;
853 RawKineticaResponse kineticaResponse = SubmitRequestToUrlInternal(fullUrl, requestBytes, enableCompression, avroEncoding);
854 return DecodeResponse<TResponse>(kineticaResponse, avroEncoding);
863 Url = currentUrl.ToString().TrimEnd(
'/');
869 throw new KineticaException($
"Connection failed and HA failover unsuccessful: {ex.Message}", ex);
893 private async System.Threading.Tasks.Task<TResponse> SubmitRequestAsync<TResponse>(
896 bool enableCompression =
false,
897 bool avroEncoding =
true,
898 System.Threading.CancellationToken cancellationToken =
default)
899 where TResponse :
new()
902 byte[] requestBytes = avroEncoding
903 ? AvroEncode(request)
904 : Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(request));
907 if (_haFailoverManager ==
null || _haFailoverManager.
HARingSize <= 1)
909 string fullUrl =
Url + endpoint;
911 fullUrl, requestBytes, enableCompression, avroEncoding, cancellationToken);
912 return DecodeResponse<TResponse>(kineticaResponse, avroEncoding);
916 var currentUrl = _haFailoverManager.
GetUrl();
917 if (currentUrl ==
null)
922 var originalUrl = currentUrl;
930 string fullUrl = currentUrl.ToString().TrimEnd(
'/') + endpoint;
932 fullUrl, requestBytes, enableCompression, avroEncoding, cancellationToken);
933 return DecodeResponse<TResponse>(kineticaResponse, avroEncoding);
942 Url = currentUrl.ToString().TrimEnd(
'/');
948 throw new KineticaException($
"Connection failed and HA failover unsuccessful: {ex.Message}", ex);
971 public TResponse
SubmitRequestRaw<TResponse>(Uri url,
object request,
bool enableCompression =
false,
bool avroEncoding =
true) where TResponse : new()
974 byte[] requestBytes = avroEncoding
975 ? AvroEncode(request)
976 : Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(request));
978 RawKineticaResponse kineticaResponse = SubmitRequestToUrlInternal(url.ToString(), requestBytes, enableCompression, avroEncoding);
979 return DecodeResponse<TResponse>(kineticaResponse, avroEncoding);
994 return SubmitRequestToUrlInternal(url.ToString(), requestBytes,
UseSnappy,
true);
999 #region Internal HTTP Helpers 1004 private TResponse DecodeResponse<TResponse>(
RawKineticaResponse kineticaResponse,
bool avroEncoding) where TResponse :
new()
1008 return AvroDecode<TResponse>(kineticaResponse.data);
1012 kineticaResponse.data_str = kineticaResponse.data_str.Replace(
"\\U",
"\\u");
1013 return JsonConvert.DeserializeObject<TResponse>(kineticaResponse.data_str);
1029 using var client =
new HttpClient { Timeout = TimeSpan.FromSeconds(5) };
1030 using var response = client.GetAsync(url).Result;
1032 if (response.IsSuccessStatusCode)
1034 string responseText = response.Content.ReadAsStringAsync().Result;
1035 return responseText.Contains(
"Kinetica is running!");
1054 private RawKineticaResponse SubmitRequestToUrlInternal(
string url,
byte[] requestBytes,
bool enableCompression,
bool avroEncoding)
1062 if (enableCompression && avroEncoding)
1065 bodyBytes = Snappy.CompressToArray(requestBytes);
1066 contentType =
"application/x-snappy";
1070 bodyBytes = requestBytes;
1071 contentType = avroEncoding ?
"application/octet-stream" :
"application/json";
1075 var responseBytes = _transport.Post(
1080 System.Threading.CancellationToken.None);
1085 return AvroDecode<RawKineticaResponse>(responseBytes);
1089 var responseString = Encoding.UTF8.GetString(responseBytes);
1090 responseString = responseString.Replace(
"\\U",
"\\u");
1104 serverResponse = AvroDecode<RawKineticaResponse>(tex.
Body);
1108 var responseString = Encoding.UTF8.GetString(tex.
Body);
1113 serverResponse?.message ?? $
"Server returned HTTP {tex.StatusCode}",
1127 catch (HttpRequestException ex)
1131 catch (TaskCanceledException ex)
1135 catch (OperationCanceledException ex)
1143 catch (Exception ex)
1159 private async Task<RawKineticaResponse> SubmitRequestToUrlInternalAsync(
1161 byte[] requestBytes,
1162 bool enableCompression,
1164 System.Threading.CancellationToken cancellationToken =
default)
1172 if (enableCompression && avroEncoding)
1175 bodyBytes = Snappy.CompressToArray(requestBytes);
1176 contentType =
"application/x-snappy";
1180 bodyBytes = requestBytes;
1181 contentType = avroEncoding ?
"application/octet-stream" :
"application/json";
1185 var responseBytes = await _transport
1186 .PostAsync(url, bodyBytes, contentType, Authorization, cancellationToken)
1187 .ConfigureAwait(
false);
1192 return AvroDecode<RawKineticaResponse>(responseBytes);
1196 var responseString = Encoding.UTF8.GetString(responseBytes);
1197 responseString = responseString.Replace(
"\\U",
"\\u");
1211 serverResponse = AvroDecode<RawKineticaResponse>(tex.
Body);
1215 var responseString = Encoding.UTF8.GetString(tex.
Body);
1220 serverResponse?.message ?? $
"Server returned HTTP {tex.StatusCode}",
1234 catch (HttpRequestException ex)
1238 catch (TaskCanceledException ex)
1242 catch (OperationCanceledException ex)
1250 catch (Exception ex)
1263 return ex is System.Net.WebException webEx && webEx.Status != WebExceptionStatus.ProtocolError ||
1264 ex is System.Net.Sockets.SocketException ||
1265 ex is IOException ||
1266 ex is System.Net.Http.HttpRequestException ||
1267 ex is TaskCanceledException ||
1268 (ex is
KineticaException kex && kex.Message.Contains(
"connection", StringComparison.OrdinalIgnoreCase));
1278 internal Uri? ForceHAFailover(Uri currentUrl,
int currentSwitchCount)
1280 if (_haFailoverManager ==
null || _haFailoverManager.
HARingSize <= 1)
1288 Url = newUrl.ToString().TrimEnd(
'/');
1305 #region Type Registration and Encoding 1307 private void SetDecoderIfMissing(
string typeId,
string label,
string schemaString, IDictionary<
string, IList<string>> properties)
1311 if (typeId ==
"<collection>")
1316 knownTypes.GetOrAdd(typeId, (s) =>
1318 return new KineticaType(label, schemaString, properties);
1320 typeNameLookup[label] = typeId;
1332 if (typeNameLookup.TryGetValue(typeName, out
string? typeId))
1334 knownTypes.TryGetValue(typeId, out type);
1348 if (!kineticaTypeLookup.TryGetValue(objectType, out
KineticaType? value))
1360 internal byte[] AvroEncode(
object obj)
1363 using ( var ms =
new MemoryStream())
1376 Type obj_type = obj.GetType();
1378 if ( ktype ==
null )
1380 throw new KineticaException(
"No known KineticaType associated with the given object. " +
1381 "Need a known KineticaType to encode the object." );
1386 var recordToSend = MakeGenericRecord( obj, ktype );
1392 return ms.ToArray();
1412 foreach ( var field
in schema.Fields)
1414 var
property = obj.GetType()
1416 .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
1418 if (property ==
null)
continue;
1420 recordToSend.
Add(field.Name, property.GetValue(obj,
null));
1424 return recordToSend;
1434 internal T AvroDecode<T>(
byte[] bytes,
KineticaType? ktype =
null) where T : new()
1440 using (var ms =
new MemoryStream(bytes))
1457 foreach (var field
in schema.Fields)
1459 var
property = obj.GetType()
1461 .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
1463 if (property ==
null)
continue;
1466 if (recordToReceive.TryGetValue(field.Name, out
object val))
1469 property.SetValue(obj, val);
Reader wrapper class for reading data and storing into specific classes
ClusterAddressInfo? GetClusterInfo()
Gets the active cluster's information.
int StatusCode
HTTP status code from the server response.
A list of worker URLs to use for multi-head ingest.
byte [] Body
Raw response body bytes (may contain Avro-encoded error message).
void AddTableType(string table_name, Type obj_type)
Given a table name, add its record type to enable proper encoding of records for insertion or updates...
string? HostnameRegex
Optional: Regex pattern to filter URLs by hostname/IP
void Add(string fieldName, object fieldValue)
bool UseSnappy
Use Snappy compression for requests
Kinetica(string url_str, Options? options=null)
API Constructor
int Timeout
Request timeout in milliseconds (0 = infinite)
IList< int >? GetCurrentRoutingTable()
Gets the current routing table for multi-head operations.
bool DisableFailover
Whether failover is disabled
TResponse SubmitRequestRaw< TResponse >(Uri url, object request, bool enableCompression=false, bool avroEncoding=true)
Submit a request directly to a specific URL without HA failover.
static KineticaType fromTypeID(Kinetica kinetica, string typeId)
Create a KineticaType object based on an existing type in the database.
int InitialConnectionAttemptTimeout
Initial connection attempt timeout in milliseconds.
TimeSpan PooledConnectionLifetime
Maximum lifetime of pooled HTTP connections.
int HARingSize
Gets the number of clusters in the HA ring.
Kinetica(IList< string > urls, Options? options=null)
API Constructor with multiple URLs for HA failover support.
IList< ClusterAddressInfo > GetHostAddresses()
Gets all cluster addresses.
The default implementation for the generic reader.
T Read(T reuse, Decoder dec)
Generic read function
static bool IsConnectionError(Exception ex)
Checks if an exception is a connection error that warrants HA failover.
KineticaData - class to help with Avro Encoding for Kinetica
HAFailoverManager? HAManager
Gets the HA failover manager instance.
int NumClusterSwitches
Gets the number of times the client has switched to a different cluster.
string? Username
Optional: User Name for Kinetica security
bool DisableFailover
Whether to disable failover upon failures
string Username
Optional: User Name for Kinetica security
A General purpose writer for serializing objects into a Stream using Avro.
Interface class for generated classes
int ThreadCount
Thread Count
Reader class for reading data and storing into specific classes
bool DisableAutoDiscovery
Whether auto-discovery is disabled
void Initialize(IList< Uri > urls, Kinetica? kinetica=null)
Initializes the manager with a list of URLs.
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
int ServerConnectionTimeout
Server connection timeout in milliseconds.
int ThreadCount
Thread Count
void DecodeRawBinaryDataUsingTypeIDs< T >(IList< string > type_ids, IList< byte[]> records_binary, IList< T > records)
Given IDs of records types registered with Kinetica, decode binary data into distinct records (object...
ShowSystemPropertiesResponse showSystemProperties(ShowSystemPropertiesRequest request_)
Returns server configuration and version related information to the caller.
const int DefaultHostManagerPort
Thrown by HttpClientTransport when the server responds with a non-2xx status code.
The default type used by GenericReader and GenericWriter for RecordSchema.
Uri? GetUrl()
Gets the current active URL.
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
const int END_OF_SET
No Limit
virtual void Write(Schema schema, object value, Encoder encoder)
Examines the schema and dispatches the actual work to one of the other methods of this class.
object Read(object reuse, Schema writerSchema, Schema readerSchema, Decoder d)
IList< Uri > WorkerRankUrls
List of worker rank URLs
bool DisableAutoDiscovery
Whether to disable automatic discovery of clusters and worker ranks
Contains address information for a Kinetica cluster.
int HARingSize
Gets the HA ring size.
string PrimaryUrl
URL of the primary cluster in the HA environment
static ? RecordSchema SchemaFromType(System.Type t, KineticaType? ktype=null)
Create an Avro Schema from a System.Type and a KineticaType.
ClusterAddressInfo? GetCurrentClusterInfo()
Gets the current active cluster information.
virtual void Dispose(bool disposing)
Disposes managed and unmanaged resources.
bool RefreshClusterInfo()
Refreshes cluster information after a failover.
Abstraction over the raw HTTP POST layer.
void DecodeRawBinaryDataUsingRecordType< T >(KineticaType record_type, IList< byte[]> records_binary, IList< T > records)
Given a KineticaType object for a certain record type, decode binary data into distinct records (obje...
void DecodeRawBinaryDataUsingSchemaString< T >(string schema_string, IList< byte[]> records_binary, IList< T > records)
Given a schema string for a certain record type, decode binary data into distinct records (objects).
IList< int > rank
Array of ranks indexed by the shard number.
HAFailoverOrder
High availability failover order options.
bool IsKineticaRunning(Uri url)
Checks if Kinetica is running at the given URL.
IDictionary< string, string > property_map
A map of server configuration parameters and version information.
IHttpTransport implementation backed by HttpClient
Uri SwitchUrl(Uri oldUrl, int oldNumClusterSwitches, Func< Uri, bool >? isKineticaRunning=null)
Switches to the next available cluster URL for HA failover.
Manages high availability failover for Kinetica connections.
void Dispose()
Disposes the Kinetica client and releases HTTP resources.
Regex? HostnameRegex
Optional hostname regex for filtering URLs
Immutable collection of metadata about a Kinetica type.
string Password
Optional: Password for user
string Url
URL for Kinetica Server (including "http:" and port) as a string
RawKineticaResponse SubmitRequestRawBytes(Uri url, byte[] requestBytes)
Submit pre-encoded request bytes directly to a specific URL without HA failover.
AdminShowShardsResponse adminShowShards(AdminShowShardsRequest request_)
Show the mapping of shards to the corresponding rank and tom.
void SetKineticaSourceClassToTypeMapping(Type? objectType, KineticaType kineticaType)
Saves an object class type to a KineticaType association.
Class for writing data from any specific objects
int HostManagerPort
Host manager port number
int NumClusterSwitches
Gets the number of times the client has switched to a different cluster.
DateTime in YYYY-MM-DD HH:MM:SS.mmm format
static string GetApiVersion()
API Version
IList< Uri >? GetCurrentWorkerUrls()
Gets the current worker URLs for multi-head operations.
Uri URL
URL for Kinetica Server (including "http:" and port)
string OauthToken
Optional: OauthToken for user
IList< ClusterAddressInfo > GetHARingInfo()
Gets the list of all cluster addresses in the HA ring.
TimeSpan PooledConnectionIdleTimeout
Idle timeout for pooled HTTP connections.
Decoder for Avro binary format