4 using System.Collections.Concurrent;
5 using System.Collections.Generic;
55 public string Username {
get;
set; } =
string.Empty;
60 public string Password {
get;
set; } =
string.Empty;
86 public string Url {
get;
private set; }
91 public Uri
URL {
get;
private set; }
96 public string?
Username {
get;
private set; } =
null;
101 private string? Password {
get;
set; } =
null;
106 private string? OauthToken {
get;
set; } =
null;
111 private string? Authorization {
get;
set; } =
null;
124 private volatile System.Collections.Concurrent.ConcurrentDictionary<string,
KineticaType> knownTypes =
new();
127 private Dictionary<string, string> typeNameLookup = [];
130 private Dictionary<Type, KineticaType> kineticaTypeLookup = [];
140 URL =
new Uri( url_str );
141 if (
null != options )
144 Password = options.Password;
145 OauthToken = options.OauthToken;
148 Authorization = CreateAuthorizationHeader();
156 internal string? CreateAuthorizationHeader() {
157 string? authorization =
null;
159 if( OauthToken !=
null && OauthToken.Length > 0 ) {
160 authorization =
"Bearer " + OauthToken;
162 else if ( (
Username !=
null && (
Username.Length > 0 ) ) || ( Password !=
null && ( Password.Length > 0 ) ) )
164 authorization = (
"Basic " +
165 Convert.ToBase64String( Encoding.GetEncoding(
"ISO-8859-1" ).GetBytes(
Username +
":" + Password ) ) );
168 return authorization;
184 throw new KineticaException( $
"Could not get type ID for table '{table_name}'" );
185 this.knownTypes.TryAdd( ktype.
getTypeID(), ktype );
188 if ( obj_type !=
null )
205 if ( objectType !=
null )
206 this.kineticaTypeLookup.Add( objectType, kineticaType );
221 IList<
byte[]> records_binary,
222 IList<T> records ) where T :
new()
225 foreach ( var bin_record
in records_binary )
227 T obj = AvroDecode<T>( bin_record, record_type );
242 IList<
byte[]> records_binary,
243 IList<T> records ) where T :
new()
249 foreach ( var bin_record
in records_binary )
251 T obj = AvroDecode<T>( bin_record, ktype );
266 IList<IList<
byte[]>> lists_records_binary,
267 IList<IList<T>> record_lists ) where T :
new()
270 if ( schema_strings.Count != lists_records_binary.Count )
271 throw new KineticaException(
"List of schemas and list of binary encoded data do not match in count." );
274 for (
int i = 0; i < schema_strings.Count; ++i )
277 KineticaType ktype =
new(
"", schema_strings[ i ], null );
280 IList<byte[]> records_binary = lists_records_binary[ i ];
283 IList<T> records = [];
286 foreach ( var bin_record
in records_binary )
288 T obj = AvroDecode<T>( bin_record, ktype );
292 record_lists.Add( records );
306 IList<
byte[]> records_binary,
307 IList<T> records ) where T :
new()
310 if ( type_ids.Count != records_binary.Count )
311 throw new KineticaException(
"Unequal numbers of type IDs and binary encoded data objects provided." );
314 for (
int i = 0; i < records_binary.Count; ++i )
320 T obj = AvroDecode<T>( records_binary[ i ], ktype );
335 IList<IList<
byte[]>> lists_records_binary,
336 IList<IList<T>> record_lists ) where T :
new()
339 if ( type_ids.Count != lists_records_binary.Count )
340 throw new KineticaException(
"Unequal numbers of type IDs and binary encoded data objects provided." );
343 for (
int i = 0; i < lists_records_binary.Count; ++i )
349 IList<byte[]> records_binary = lists_records_binary[ i ];
352 IList<T> records = [];
355 foreach ( var bin_record
in records_binary )
358 T obj = AvroDecode<T>( bin_record, ktype );
362 record_lists.Add( records );
376 internal TResponse SubmitRequest<TResponse>( Uri url,
object request,
bool enableCompression =
false,
bool avroEncoding =
true ) where TResponse : new()
382 requestBytes = AvroEncode( request );
386 string str = JsonConvert.SerializeObject(request);
387 requestBytes = Encoding.UTF8.GetBytes( str );
391 RawKineticaResponse kineticaResponse = SubmitRequestRaw( url.ToString(), requestBytes, enableCompression, avroEncoding,
false);
396 return AvroDecode<TResponse>( kineticaResponse.data );
400 kineticaResponse.data_str = kineticaResponse.data_str.Replace(
"\\U",
"\\u" );
401 return JsonConvert.DeserializeObject<TResponse>( kineticaResponse.data_str );
415 private TResponse SubmitRequest<TResponse>(
string endpoint,
object request,
bool enableCompression =
false,
bool avroEncoding =
true) where TResponse : new()
421 requestBytes = AvroEncode(request);
425 string str = JsonConvert.SerializeObject(request);
426 requestBytes = Encoding.UTF8.GetBytes(str);
430 RawKineticaResponse kineticaResponse = SubmitRequestRaw(endpoint, requestBytes, enableCompression, avroEncoding);
435 return AvroDecode<TResponse>(kineticaResponse.data);
439 kineticaResponse.data_str = kineticaResponse.data_str.Replace(
"\\U",
"\\u");
440 return JsonConvert.DeserializeObject<TResponse>(kineticaResponse.data_str);
456 private RawKineticaResponse? SubmitRequestRaw(
string url,
byte[] requestBytes,
bool enableCompression,
bool avroEncoding,
bool only_endpoint_given =
true)
460 if ( only_endpoint_given )
462 var request = (HttpWebRequest)WebRequest.Create( url );
463 request.Method =
"POST";
465 request.ContentType = avroEncoding ?
"application/octet-stream" :
"application/json";
466 request.ContentLength = requestBytes.Length;
469 if ( this.Authorization !=
null )
471 request.Headers.Add(
"Authorization", Authorization );
476 using ( var dataStream = request.GetRequestStream())
478 dataStream.Write(requestBytes, 0, requestBytes.Length);
482 using (var response = (HttpWebResponse)request.GetResponse())
485 if (response.StatusCode == HttpStatusCode.OK)
487 using (var responseStream = response.GetResponseStream())
491 return AvroDecode<RawKineticaResponse>(responseStream);
495 using (StreamReader reader =
new(responseStream, Encoding.UTF8))
497 var responseString = reader.ReadToEnd();
498 return JsonConvert.DeserializeObject<RawKineticaResponse>(responseString);
505 catch (System.Net.WebException ex)
508 if ( ex.Status != WebExceptionStatus.ProtocolError )
509 throw new KineticaException( ex.ToString(), ex );
512 var response = ex.Response;
513 var responseStream = response.GetResponseStream();
514 string responseString;
515 RawKineticaResponse serverResponse;
519 serverResponse = AvroDecode<RawKineticaResponse>(responseStream);
523 using (StreamReader reader =
new(responseStream, Encoding.UTF8))
525 responseString = reader.ReadToEnd();
526 serverResponse = JsonConvert.DeserializeObject<RawKineticaResponse>(responseString);
530 throw new KineticaException( serverResponse.message );
534 throw new KineticaException(ex.ToString(), ex);
540 private void SetDecoderIfMissing(
string typeId,
string label,
string schemaString, IDictionary<
string, IList<string>> properties)
544 if (typeId ==
"<collection>")
549 knownTypes.GetOrAdd(typeId, (s) =>
551 return new KineticaType(label, schemaString, properties);
553 typeNameLookup[label] = typeId;
562 private KineticaType? GetType(
string typeName)
564 KineticaType? type =
null;
565 if (typeNameLookup.TryGetValue(typeName, out
string? typeId))
567 knownTypes.TryGetValue(typeId, out type);
579 private KineticaType? LookupKineticaType( Type objectType )
581 if (!kineticaTypeLookup.TryGetValue(objectType, out KineticaType? value))
593 internal byte[] AvroEncode(
object obj)
596 using ( var ms =
new MemoryStream())
600 if ( obj is Avro.Specific.ISpecificRecord)
602 var schema = (obj as Avro.Specific.ISpecificRecord).Schema;
603 Avro.Specific.SpecificDefaultWriter writer =
new(schema);
604 writer.Write(schema, obj,
new BinaryEncoder(ms));
609 Type obj_type = obj.GetType();
610 KineticaType? ktype = LookupKineticaType( obj_type );
613 throw new KineticaException(
"No known KineticaType associated with the given object. " +
614 "Need a known KineticaType to encode the object." );
618 var schema = KineticaData.SchemaFromType( obj.GetType(), ktype );
619 var recordToSend = MakeGenericRecord( obj, ktype );
620 var writer =
new Avro.Generic.DefaultWriter(schema);
621 writer.Write(schema, recordToSend,
new BinaryEncoder(ms));
636 private Avro.Generic.GenericRecord MakeGenericRecord(
object obj, KineticaType ktype )
639 var schema = KineticaData.SchemaFromType( obj.GetType(), ktype );
642 var recordToSend =
new Avro.Generic.GenericRecord(schema);
645 foreach ( var field
in schema.Fields)
647 var
property = obj.GetType()
649 .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
651 if (property ==
null)
continue;
653 recordToSend.Add(field.Name, property.GetValue(obj,
null));
667 private T AvroDecode<T>(
byte[] bytes, KineticaType? ktype =
null) where T : new()
670 var schema = KineticaData.SchemaFromType( typeof(T), ktype );
673 using (var ms =
new MemoryStream(bytes))
677 if (obj is Avro.Specific.ISpecificRecord)
679 var reader =
new Avro.Specific.SpecificDefaultReader(schema, schema);
680 reader.Read(obj,
new BinaryDecoder(ms));
685 var reader =
new Avro.Generic.DefaultReader(schema, schema);
686 Avro.Generic.GenericRecord recordToReceive =
new(schema);
687 reader.Read(recordToReceive,
new BinaryDecoder(ms));
690 foreach (var field
in schema.Fields)
692 var
property = obj.GetType()
694 .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
696 if (property ==
null)
continue;
699 if (recordToReceive.TryGetValue(field.Name, out
object val))
702 property.SetValue(obj, val);
719 private T AvroDecode<T>(Stream stream) where T : Avro.Specific.ISpecificRecord,
new()
722 var schema = KineticaData.SchemaFromType( typeof(T),
null );
723 var reader =
new Avro.Specific.SpecificReader<T>(schema, schema);
724 return reader.Read(
default,
new BinaryDecoder(stream));
int ThreadCount
Thread Count
const int END_OF_SET
No Limit
void SetKineticaSourceClassToTypeMapping(Type? objectType, KineticaType kineticaType)
Saves an object class type to a KineticaType association.
string OauthToken
Optional: OauthToken for user
string Url
URL for Kinetica Server (including "http:" and port) as a string
Uri URL
URL for Kinetica Server (including "http:" and port)
void DecodeRawBinaryDataUsingRecordType< T >(KineticaType record_type, IList< byte[]> records_binary, IList< T > records)
Given a KineticaType object for a certain record type, decode binary data into distinct records (obje...
static string GetApiVersion()
API Version
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
static KineticaType fromTypeID(Kinetica kinetica, string typeId)
Create a KineticaType object based on an existing type in the database.
void DecodeRawBinaryDataUsingTypeIDs< T >(IList< string > type_ids, IList< byte[]> records_binary, IList< T > records)
Given IDs of records types registered with Kinetica, decode binary data into distinct records (object...
string? Username
Optional: User Name for Kinetica security
void DecodeRawBinaryDataUsingSchemaString< T >(string schema_string, IList< byte[]> records_binary, IList< T > records)
Given a schema string for a certain record type, decode binary data into distinct records (objects).
string Password
Optional: Password for user
string Username
Optional: User Name for Kinetica security
Kinetica(string url_str, Options? options=null)
API Constructor
int ThreadCount
Thread Count
void AddTableType(string table_name, Type obj_type)
Given a table name, add its record type to enable proper encoding of records for insertion or updates...
API to talk to Kinetica Database