4 using System.Collections.Concurrent;
5 using System.Collections.Generic;
82 public string Url {
get;
private set; }
87 public Uri
URL {
get;
private set; }
92 public string Username {
get;
private set; } = null;
97 private string Password {
get;
set; } = null;
102 private string Authorization {
get;
set; } = null;
115 private volatile System.Collections.Concurrent.ConcurrentDictionary<string,
KineticaType> knownTypes =
new ConcurrentDictionary<string, KineticaType>();
118 private Dictionary<string, string> typeNameLookup =
new Dictionary<string, string>();
121 private Dictionary<Type, KineticaType> kineticaTypeLookup =
new Dictionary<Type, KineticaType>();
131 URL =
new Uri( url_str );
132 if ( null != options )
140 Authorization = (
"Basic " +
141 Convert.ToBase64String( Encoding.GetEncoding(
"ISO-8859-1" ).GetBytes(
Username +
":" +
Password ) ) );
165 throw new KineticaException( $
"Could not get type ID for table '{table_name}'" );
166 this.knownTypes.TryAdd( ktype.
getTypeID(), ktype );
169 if ( obj_type != null )
186 this.kineticaTypeLookup.Add( objectType, kineticaType );
201 IList<byte[]> records_binary,
202 IList<T> records ) where T :
new()
205 foreach ( var bin_record
in records_binary )
207 T obj = AvroDecode<T>( bin_record, record_type );
222 IList<byte[]> records_binary,
223 IList<T> records ) where T :
new()
229 foreach ( var bin_record
in records_binary )
231 T obj = AvroDecode<T>( bin_record, ktype );
246 IList<IList<byte[]>> lists_records_binary,
247 IList<IList<T>> record_lists ) where T :
new()
250 if ( schema_strings.Count != lists_records_binary.Count )
251 throw new KineticaException(
"List of schemas and list of binary encoded data do not match in count." );
254 for (
int i = 0; i < schema_strings.Count; ++i )
260 IList<byte[]> records_binary = lists_records_binary[ i ];
263 IList<T> records =
new List<T>();
266 foreach ( var bin_record
in records_binary )
268 T obj = AvroDecode<T>( bin_record, ktype );
272 record_lists.Add( records );
286 IList<byte[]> records_binary,
287 IList<T> records ) where T :
new()
290 if ( type_ids.Count != records_binary.Count )
291 throw new KineticaException(
"Unequal numbers of type IDs and binary encoded data objects provided." );
294 for (
int i = 0; i < records_binary.Count; ++i )
300 T obj = AvroDecode<T>( records_binary[ i ], ktype );
315 IList<IList<byte[]>> lists_records_binary,
316 IList<IList<T>> record_lists ) where T :
new()
319 if ( type_ids.Count != lists_records_binary.Count )
320 throw new KineticaException(
"Unequal numbers of type IDs and binary encoded data objects provided." );
323 for (
int i = 0; i < lists_records_binary.Count; ++i )
329 IList<byte[]> records_binary = lists_records_binary[ i ];
332 IList<T> records =
new List<T>();
335 foreach ( var bin_record
in records_binary )
338 T obj = AvroDecode<T>( bin_record, ktype );
342 record_lists.Add( records );
356 internal TResponse SubmitRequest<TResponse>( Uri url,
object request,
bool enableCompression =
false,
bool avroEncoding = true ) where TResponse :
new()
362 requestBytes = AvroEncode( request );
366 string str = JsonConvert.SerializeObject(request);
367 requestBytes = Encoding.UTF8.GetBytes( str );
371 RawKineticaResponse kineticaResponse = SubmitRequestRaw( url.ToString(), requestBytes, enableCompression, avroEncoding,
false);
376 return AvroDecode<TResponse>( kineticaResponse.
data );
380 kineticaResponse.
data_str = kineticaResponse.
data_str.Replace(
"\\U",
"\\u" );
381 return JsonConvert.DeserializeObject<TResponse>( kineticaResponse.
data_str );
395 private TResponse SubmitRequest<TResponse>(
string endpoint,
object request,
bool enableCompression =
false,
bool avroEncoding =
true) where TResponse :
new()
401 requestBytes = AvroEncode(request);
405 string str = JsonConvert.SerializeObject(request);
406 requestBytes = Encoding.UTF8.GetBytes(str);
410 RawKineticaResponse kineticaResponse = SubmitRequestRaw(endpoint, requestBytes, enableCompression, avroEncoding);
415 return AvroDecode<TResponse>(kineticaResponse.
data);
419 kineticaResponse.
data_str = kineticaResponse.
data_str.Replace(
"\\U",
"\\u");
420 return JsonConvert.DeserializeObject<TResponse>(kineticaResponse.
data_str);
436 private RawKineticaResponse SubmitRequestRaw(
string url, byte[] requestBytes,
bool enableCompression,
bool avroEncoding,
bool only_endpoint_given =
true)
440 if ( only_endpoint_given )
442 var request = (HttpWebRequest)WebRequest.Create( url );
443 request.Method =
"POST";
445 request.ContentType = avroEncoding ?
"application/octet-stream" :
"application/json";
446 request.ContentLength = requestBytes.Length;
449 if ( this.Authorization != null )
451 request.Headers.Add(
"Authorization", Authorization );
456 using ( var dataStream = request.GetRequestStream())
458 dataStream.Write(requestBytes, 0, requestBytes.Length);
462 using (var response = (HttpWebResponse)request.GetResponse())
465 if (response.StatusCode == HttpStatusCode.OK)
467 using (var responseStream = response.GetResponseStream())
471 return AvroDecode<RawKineticaResponse>(responseStream);
475 using (StreamReader reader =
new StreamReader(responseStream, Encoding.UTF8))
477 var responseString = reader.ReadToEnd();
485 catch (
System.Net.WebException ex)
488 if ( ex.Status != WebExceptionStatus.ProtocolError )
492 var response = ex.Response;
493 var responseStream = response.GetResponseStream();
494 string responseString;
499 serverResponse = AvroDecode<RawKineticaResponse>(responseStream);
503 using (StreamReader reader =
new StreamReader(responseStream, Encoding.UTF8))
505 responseString = reader.ReadToEnd();
520 private void SetDecoderIfMissing(
string typeId,
string label,
string schemaString, IDictionary<
string, IList<string>> properties)
524 if (typeId ==
"<collection>")
529 knownTypes.GetOrAdd(typeId, (s) =>
531 return new KineticaType(label, schemaString, properties);
533 typeNameLookup[label] = typeId;
546 if (typeNameLookup.TryGetValue(typeName, out typeId))
548 knownTypes.TryGetValue(typeId, out type);
560 private KineticaType lookupKineticaType( Type objectType )
562 if ( !this.kineticaTypeLookup.ContainsKey( objectType ) )
565 return this.kineticaTypeLookup[ objectType ];
574 internal byte[] AvroEncode(
object obj)
577 using ( var ms =
new MemoryStream())
581 if ( obj is
Avro.Specific.ISpecificRecord)
583 var schema = (obj as
Avro.Specific.ISpecificRecord).Schema;
584 Avro.Specific.SpecificDefaultWriter writer =
new Avro.Specific.SpecificDefaultWriter(schema);
585 writer.Write(schema, obj,
new BinaryEncoder(ms));
590 Type obj_type = obj.GetType();
594 throw new KineticaException(
"No known KineticaType associated with the given object. " +
595 "Need a known KineticaType to encode the object." );
600 var recordToSend = MakeGenericRecord( obj, ktype );
601 var writer =
new Avro.Generic.DefaultWriter(schema);
602 writer.Write(schema, recordToSend,
new BinaryEncoder(ms));
617 private Avro.Generic.GenericRecord MakeGenericRecord(
object obj,
KineticaType ktype )
623 var recordToSend =
new Avro.Generic.GenericRecord(schema);
626 foreach ( var field
in schema.Fields)
628 var
property = obj.GetType()
630 .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
632 if (property == null)
continue;
634 recordToSend.Add(field.Name, property.GetValue(obj, null));
648 private T AvroDecode<T>(byte[] bytes,
KineticaType ktype = null) where T :
new()
654 using (var ms =
new MemoryStream(bytes))
658 if (obj is
Avro.Specific.ISpecificRecord)
660 var reader =
new Avro.Specific.SpecificDefaultReader(schema, schema);
661 reader.Read(obj,
new BinaryDecoder(ms));
666 var reader =
new Avro.Generic.DefaultReader(schema, schema);
667 Avro.Generic.GenericRecord recordToReceive =
new Avro.Generic.GenericRecord(schema);
668 reader.Read(recordToReceive,
new BinaryDecoder(ms));
671 foreach (var field
in schema.Fields)
673 var
property = obj.GetType()
675 .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
677 if (property == null)
continue;
681 if (recordToReceive.TryGetValue(field.Name, out val))
684 property.SetValue(obj, val);
701 private T AvroDecode<T>(Stream stream) where T :
Avro.Specific.ISpecificRecord,
new()
705 var reader =
new Avro.Specific.SpecificReader<T>(schema, schema);
706 return reader.Read(
default(T),
new BinaryDecoder(stream));
int ThreadCount
Thread Count
const int END_OF_SET
No Limit
string Url
URL for Kinetica Server (including "http:" and port) as a string
Uri URL
URL for Kinetica Server (including "http:" and port)
void DecodeRawBinaryDataUsingRecordType< T >(KineticaType record_type, IList< byte[]> records_binary, IList< T > records)
Given a KineticaType object for a certain record type, decode binary data into distinct records (obje...
static string GetApiVersion()
API Version
static RecordSchema SchemaFromType(System.Type t, KineticaType ktype=null)
Create an Avro Schema from a System.Type and a KineticaType.
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
static KineticaType fromTypeID(Kinetica kinetica, string typeId)
Create a KineticaType object based on an existing type in the database.
void DecodeRawBinaryDataUsingTypeIDs< T >(IList< string > type_ids, IList< byte[]> records_binary, IList< T > records)
Given IDs of records types registered with Kinetica, decode binary data into distinct records (object...
void DecodeRawBinaryDataUsingSchemaString< T >(string schema_string, IList< byte[]> records_binary, IList< T > records)
Given a schema string for a certain record type, decode binary data into distinct records (objects)...
string Password
Optional: Password for user
void SetKineticaSourceClassToTypeMapping(Type objectType, KineticaType kineticaType)
Saves an object class type to a KineticaType association.
Kinetica(string url_str, Options options=null)
API Constructor
string Username
Optional: User Name for Kinetica security
KineticaData - class to help with Avro Encoding for Kinetica
A set of parameters for the raw wrapper for Kinetica responses.
void AddTableType(string table_name, Type obj_type)
Given a table name, add its record type to enable proper encoding of records for insertion or updates...
API to talk to Kinetica Database