4 using System.Collections.Concurrent;
5 using System.Collections.Generic;
40 public partial class Kinetica
55 public string Username {
get;
set; } =
string.Empty;
60 public string Password {
get;
set; } =
string.Empty;
65 public string OauthToken {
get;
set; } =
string.Empty;
69 public bool UseSnappy {
get;
set; } =
false;
86 public string Url {
get;
private set; }
91 public Uri
URL {
get;
private set; }
96 public string?
Username {
get;
private set; } =
null;
101 private string? Password {
get;
set; } =
null;
106 private string? OauthToken {
get;
set; } =
null;
111 private string? Authorization {
get;
set; } =
null;
116 public bool UseSnappy {
get;
set; } =
false;
124 private volatile System.Collections.Concurrent.ConcurrentDictionary<string, KineticaType> knownTypes =
new();
127 private Dictionary<string, string> typeNameLookup = [];
130 private Dictionary<Type, KineticaType> kineticaTypeLookup = [];
140 URL =
new Uri( url_str );
141 if (
null != options )
144 Password = options.Password;
145 OauthToken = options.OauthToken;
148 Authorization = CreateAuthorizationHeader();
156 internal string? CreateAuthorizationHeader() {
157 string? authorization =
null;
159 if( OauthToken !=
null && OauthToken.Length > 0 ) {
160 authorization =
"Bearer " + OauthToken;
162 else if ( (
Username !=
null && (
Username.Length > 0 ) ) || ( Password !=
null && ( Password.Length > 0 ) ) )
164 authorization = (
"Basic " +
165 Convert.ToBase64String( Encoding.GetEncoding(
"ISO-8859-1" ).GetBytes(
Username +
":" + Password ) ) );
168 return authorization;
184 throw new KineticaException( $
"Could not get type ID for table '{table_name}'" );
185 this.knownTypes.TryAdd( ktype.
getTypeID(), ktype );
188 if ( obj_type !=
null )
205 if ( objectType !=
null )
206 this.kineticaTypeLookup.Add( objectType, kineticaType );
221 IList<
byte[]> records_binary,
222 IList<T> records ) where T :
new()
225 foreach ( var bin_record
in records_binary )
227 T obj = AvroDecode<T>( bin_record, record_type );
242 IList<
byte[]> records_binary,
243 IList<T> records ) where T :
new()
249 foreach ( var bin_record
in records_binary )
251 T obj = AvroDecode<T>( bin_record, ktype );
266 IList<IList<
byte[]>> lists_records_binary,
267 IList<IList<T>> record_lists ) where T :
new()
270 if ( schema_strings.Count != lists_records_binary.Count )
271 throw new KineticaException(
"List of schemas and list of binary encoded data do not match in count." );
274 for (
int i = 0; i < schema_strings.Count; ++i )
277 KineticaType ktype =
new(
"", schema_strings[ i ], null );
280 IList<byte[]> records_binary = lists_records_binary[ i ];
283 IList<T> records = [];
286 foreach ( var bin_record
in records_binary )
288 T obj = AvroDecode<T>( bin_record, ktype );
292 record_lists.Add( records );
306 IList<
byte[]> records_binary,
307 IList<T> records ) where T :
new()
310 if ( type_ids.Count != records_binary.Count )
311 throw new KineticaException(
"Unequal numbers of type IDs and binary encoded data objects provided." );
314 for (
int i = 0; i < records_binary.Count; ++i )
320 T obj = AvroDecode<T>( records_binary[ i ], ktype );
335 IList<IList<
byte[]>> lists_records_binary,
336 IList<IList<T>> record_lists ) where T :
new()
339 if ( type_ids.Count != lists_records_binary.Count )
340 throw new KineticaException(
"Unequal numbers of type IDs and binary encoded data objects provided." );
343 for (
int i = 0; i < lists_records_binary.Count; ++i )
349 IList<byte[]> records_binary = lists_records_binary[ i ];
352 IList<T> records = [];
355 foreach ( var bin_record
in records_binary )
358 T obj = AvroDecode<T>( bin_record, ktype );
362 record_lists.Add( records );
376 internal TResponse SubmitRequest<TResponse>( Uri url,
object request,
bool enableCompression =
false,
bool avroEncoding =
true ) where TResponse : new()
382 requestBytes = AvroEncode( request );
386 string str = JsonConvert.SerializeObject(request);
387 requestBytes = Encoding.UTF8.GetBytes( str );
391 RawKineticaResponse kineticaResponse = SubmitRequestRaw( url.ToString(), requestBytes, enableCompression, avroEncoding,
false);
396 return AvroDecode<TResponse>( kineticaResponse.data );
400 kineticaResponse.data_str = kineticaResponse.data_str.Replace(
"\\U",
"\\u" );
401 return JsonConvert.DeserializeObject<TResponse>( kineticaResponse.data_str );
415 private TResponse SubmitRequest<TResponse>(
string endpoint,
object request,
bool enableCompression =
false,
bool avroEncoding =
true) where TResponse : new()
421 requestBytes = AvroEncode(request);
425 string str = JsonConvert.SerializeObject(request);
426 requestBytes = Encoding.UTF8.GetBytes(str);
430 RawKineticaResponse kineticaResponse = SubmitRequestRaw(endpoint, requestBytes, enableCompression, avroEncoding);
435 return AvroDecode<TResponse>(kineticaResponse.data);
439 kineticaResponse.data_str = kineticaResponse.data_str.Replace(
"\\U",
"\\u");
440 return JsonConvert.DeserializeObject<TResponse>(kineticaResponse.data_str);
456 private RawKineticaResponse? SubmitRequestRaw(
string url,
byte[] requestBytes,
bool enableCompression,
bool avroEncoding,
bool only_endpoint_given =
true)
460 if ( only_endpoint_given )
462 var request = (HttpWebRequest)WebRequest.Create( url );
463 request.Method =
"POST";
465 request.ContentType = avroEncoding ?
"application/octet-stream" :
"application/json";
466 request.ContentLength = requestBytes.Length;
469 if ( this.Authorization !=
null )
471 request.Headers.Add(
"Authorization", Authorization );
476 using ( var dataStream = request.GetRequestStream())
478 dataStream.Write(requestBytes, 0, requestBytes.Length);
482 using (var response = (HttpWebResponse)request.GetResponse())
485 if (response.StatusCode == HttpStatusCode.OK)
487 using (var responseStream = response.GetResponseStream())
491 return AvroDecode<RawKineticaResponse>(responseStream);
495 using (StreamReader reader =
new(responseStream, Encoding.UTF8))
497 var responseString = reader.ReadToEnd();
498 return JsonConvert.DeserializeObject<RawKineticaResponse>(responseString);
505 catch (System.Net.WebException ex)
508 if ( ex.Status != WebExceptionStatus.ProtocolError )
509 throw new KineticaException( ex.ToString(), ex );
512 var response = ex.Response;
513 var responseStream = response.GetResponseStream();
514 string responseString;
515 RawKineticaResponse serverResponse;
519 serverResponse = AvroDecode<RawKineticaResponse>(responseStream);
523 using (StreamReader reader =
new(responseStream, Encoding.UTF8))
525 responseString = reader.ReadToEnd();
526 serverResponse = JsonConvert.DeserializeObject<RawKineticaResponse>(responseString);
530 throw new KineticaException( serverResponse.message );
534 throw new KineticaException(ex.ToString(), ex);
540 private void SetDecoderIfMissing(
string typeId,
string label,
string schemaString, IDictionary<
string, IList<string>> properties)
544 if (typeId ==
"<collection>")
549 knownTypes.GetOrAdd(typeId, (s) =>
551 return new KineticaType(label, schemaString, properties);
553 typeNameLookup[label] = typeId;
562 private KineticaType? GetType(
string typeName)
564 KineticaType? type =
null;
565 if (typeNameLookup.TryGetValue(typeName, out
string? typeId))
567 knownTypes.TryGetValue(typeId, out type);
579 private KineticaType? LookupKineticaType( Type objectType )
581 if (!kineticaTypeLookup.TryGetValue(objectType, out KineticaType? value))
593 internal byte[] AvroEncode(
object obj)
596 using ( var ms =
new MemoryStream())
609 Type obj_type = obj.GetType();
610 KineticaType? ktype = LookupKineticaType( obj_type );
613 throw new KineticaException(
"No known KineticaType associated with the given object. " +
614 "Need a known KineticaType to encode the object." );
618 var schema = KineticaData.SchemaFromType( obj.GetType(), ktype );
619 var recordToSend = MakeGenericRecord( obj, ktype );
639 var schema = KineticaData.SchemaFromType( obj.GetType(), ktype );
645 foreach ( var field
in schema.Fields)
647 var
property = obj.GetType()
649 .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
651 if (property ==
null)
continue;
653 recordToSend.
Add(field.Name, property.GetValue(obj,
null));
667 private T AvroDecode<T>(
byte[] bytes, KineticaType? ktype =
null) where T : new()
670 var schema = KineticaData.SchemaFromType( typeof(T), ktype );
673 using (var ms =
new MemoryStream(bytes))
690 foreach (var field
in schema.Fields)
692 var
property = obj.GetType()
694 .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
696 if (property ==
null)
continue;
699 if (recordToReceive.TryGetValue(field.Name, out
object val))
702 property.SetValue(obj, val);
722 var schema = KineticaData.SchemaFromType( typeof(T),
null );
Reader wrapper class for reading data and storing into specific classes
void Add(string fieldName, object fieldValue)
void SetKineticaSourceClassToTypeMapping(Type? objectType, KineticaType kineticaType)
Saves an object class type to a KineticaType association.
The default implementation for the generic reader.
T Read(T reuse, Decoder dec)
Generic read function
string OauthToken
Optional: OauthToken for user
int ThreadCount
Thread Count
void DecodeRawBinaryDataUsingRecordType< T >(KineticaType record_type, IList< byte[]> records_binary, IList< T > records)
Given a KineticaType object for a certain record type, decode binary data into distinct records (obje...
static string GetApiVersion()
API Version
A General purpose writer for serializing objects into a Stream using Avro.
Interface class for generated classes
Reader class for reading data and storing into specific classes
string Password
Optional: Password for user
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
static KineticaType fromTypeID(Kinetica kinetica, string typeId)
Create a KineticaType object based on an existing type in the database.
void DecodeRawBinaryDataUsingTypeIDs< T >(IList< string > type_ids, IList< byte[]> records_binary, IList< T > records)
Given IDs of records types registered with Kinetica, decode binary data into distinct records (object...
int ThreadCount
Thread Count
string Username
Optional: User Name for Kinetica security
The default type used by GenericReader and GenericWriter for RecordSchema.
string Url
URL for Kinetica Server (including "http:" and port) as a string
const int END_OF_SET
No Limit
virtual void Write(Schema schema, object value, Encoder encoder)
Examines the schema and dispatches the actual work to one of the other methods of this class.
object Read(object reuse, Schema writerSchema, Schema readerSchema, Decoder d)
Uri URL
URL for Kinetica Server (including "http:" and port)
void DecodeRawBinaryDataUsingSchemaString< T >(string schema_string, IList< byte[]> records_binary, IList< T > records)
Given a schema string for a certain record type, decode binary data into distinct records (objects).
string? Username
Optional: User Name for Kinetica security
Kinetica(string url_str, Options? options=null)
API Constructor
Class for writing data from any specific objects
void AddTableType(string table_name, Type obj_type)
Given a table name, add its record type to enable proper encoding of records for insertion or updates...
Decoder for Avro binary format