4 using System.Collections.Concurrent;
5 using System.Collections.Generic;
82 public string Url {
get;
private set; }
87 public string Username {
get;
private set; } = null;
92 private string Password {
get; set; } = null;
97 private string Authorization {
get; set; } = null;
110 private volatile System.Collections.Concurrent.ConcurrentDictionary<string,
KineticaType> knownTypes =
new ConcurrentDictionary<string, KineticaType>();
113 private Dictionary<string, string> typeNameLookup =
new Dictionary<string, string>();
116 private Dictionary<Type, KineticaType> kineticaTypeLookup =
new Dictionary<Type, KineticaType>();
126 if ( null != options )
129 Password = options.Password;
132 if ( (
Username != null && (
Username.Length > 0 ) ) || ( Password != null && ( Password.Length > 0 ) ) )
134 Authorization = (
"Basic " +
135 Convert.ToBase64String( Encoding.GetEncoding(
"ISO-8859-1" ).GetBytes(
Username +
":" + Password ) ) );
157 KineticaType ktype = KineticaType.fromTable(
this, table_name );
159 throw new KineticaException( $
"Could not get type ID for table '{table_name}'" );
160 this.knownTypes.TryAdd( ktype.getTypeID(), ktype );
163 if ( obj_type != null )
164 this.SetKineticaSourceClassToTypeMapping( obj_type, ktype );
180 this.kineticaTypeLookup.Add( objectType, kineticaType );
195 IList<byte[]> records_binary,
196 IList<T> records ) where T :
new()
202 foreach ( var bin_record
in records_binary )
204 T obj = AvroDecode<T>( bin_record, ktype );
219 IList<IList<byte[]>> lists_records_binary,
220 IList<IList<T>> record_lists ) where T :
new()
223 if ( schema_strings.Count != lists_records_binary.Count )
224 throw new KineticaException(
"List of schemas and list of binary encoded data do not match in count." );
227 for (
int i = 0; i < schema_strings.Count; ++i )
233 IList<byte[]> records_binary = lists_records_binary[ i ];
236 IList<T> records =
new List<T>();
239 foreach ( var bin_record
in records_binary )
241 T obj = AvroDecode<T>( bin_record, ktype );
245 record_lists.Add( records );
259 IList<byte[]> records_binary,
260 IList<T> records ) where T :
new()
263 if ( type_ids.Count != records_binary.Count )
264 throw new KineticaException(
"Unequal numbers of type IDs and binary encoded data objects provided." );
267 for (
int i = 0; i < records_binary.Count; ++i )
270 KineticaType ktype = KineticaType.fromTypeID(
this, type_ids[ i ] );
273 T obj = AvroDecode<T>( records_binary[ i ], ktype );
288 IList<IList<byte[]>> lists_records_binary,
289 IList<IList<T>> record_lists ) where T :
new()
292 if ( type_ids.Count != lists_records_binary.Count )
293 throw new KineticaException(
"Unequal numbers of type IDs and binary encoded data objects provided." );
296 for (
int i = 0; i < lists_records_binary.Count; ++i )
299 KineticaType ktype = KineticaType.fromTypeID(
this, type_ids[ i ] );
302 IList<byte[]> records_binary = lists_records_binary[ i ];
305 IList<T> records =
new List<T>();
308 foreach ( var bin_record
in records_binary )
311 T obj = AvroDecode<T>( bin_record, ktype );
315 record_lists.Add( records );
329 internal TResponse SubmitRequest<TResponse>( Uri url,
object request,
bool enableCompression =
false,
bool avroEncoding = true ) where TResponse :
new()
335 requestBytes = AvroEncode( request );
339 string str = JsonConvert.SerializeObject(request);
340 requestBytes = Encoding.UTF8.GetBytes( str );
344 KineticaResponse kineticaResponse = SubmitRequestRaw( url.ToString(), requestBytes, enableCompression, avroEncoding,
false);
349 return AvroDecode<TResponse>( kineticaResponse.data );
353 kineticaResponse.data_str = kineticaResponse.data_str.Replace(
"\\U",
"\\u" );
354 return JsonConvert.DeserializeObject<TResponse>( kineticaResponse.data_str );
368 private TResponse SubmitRequest<TResponse>(
string endpoint,
object request,
bool enableCompression =
false,
bool avroEncoding =
true) where TResponse :
new()
374 requestBytes = AvroEncode(request);
378 string str = JsonConvert.SerializeObject(request);
379 requestBytes = Encoding.UTF8.GetBytes(str);
383 KineticaResponse kineticaResponse = SubmitRequestRaw(endpoint, requestBytes, enableCompression, avroEncoding);
388 return AvroDecode<TResponse>(kineticaResponse.data);
392 kineticaResponse.data_str = kineticaResponse.data_str.Replace(
"\\U",
"\\u");
393 return JsonConvert.DeserializeObject<TResponse>(kineticaResponse.data_str);
409 private KineticaResponse SubmitRequestRaw(
string url, byte[] requestBytes,
bool enableCompression,
bool avroEncoding,
bool only_endpoint_given =
true)
413 if ( only_endpoint_given )
415 var request = (HttpWebRequest)WebRequest.Create( url );
416 request.Method =
"POST";
418 request.ContentType = avroEncoding ?
"application/octet-stream" :
"application/json";
419 request.ContentLength = requestBytes.Length;
422 if ( this.Authorization != null )
424 request.Headers.Add(
"Authorization", Authorization );
429 using ( var dataStream = request.GetRequestStream())
431 dataStream.Write(requestBytes, 0, requestBytes.Length);
435 using (var response = (HttpWebResponse)request.GetResponse())
438 if (response.StatusCode == HttpStatusCode.OK)
440 using (var responseStream = response.GetResponseStream())
444 return AvroDecode<KineticaResponse>(responseStream);
448 using (StreamReader reader =
new StreamReader(responseStream, Encoding.UTF8))
450 var responseString = reader.ReadToEnd();
451 return JsonConvert.DeserializeObject<KineticaResponse>(responseString);
458 catch (System.Net.WebException ex)
461 if ( ex.Status != WebExceptionStatus.ProtocolError )
462 throw new KineticaException( ex.ToString(), ex );
465 var response = ex.Response;
466 var responseStream = response.GetResponseStream();
467 string responseString;
468 KineticaResponse serverResponse;
472 serverResponse = AvroDecode<KineticaResponse>(responseStream);
476 using (StreamReader reader =
new StreamReader(responseStream, Encoding.UTF8))
478 responseString = reader.ReadToEnd();
479 serverResponse = JsonConvert.DeserializeObject<KineticaResponse>(responseString);
483 throw new KineticaException( serverResponse.message );
487 throw new KineticaException(ex.ToString(), ex);
493 private void SetDecoderIfMissing(
string typeId,
string label,
string schemaString, IDictionary<
string, IList<string>> properties)
497 if (typeId ==
"<collection>")
502 knownTypes.GetOrAdd(typeId, (s) =>
504 return new KineticaType(label, schemaString, properties);
506 typeNameLookup[label] = typeId;
515 private KineticaType GetType(
string typeName)
517 KineticaType type = null;
519 if (typeNameLookup.TryGetValue(typeName, out typeId))
521 knownTypes.TryGetValue(typeId, out type);
533 private KineticaType lookupKineticaType( Type objectType )
535 if ( !this.kineticaTypeLookup.ContainsKey( objectType ) )
538 return this.kineticaTypeLookup[ objectType ];
547 internal byte[] AvroEncode(
object obj)
550 using ( var ms =
new MemoryStream())
554 if ( obj is Avro.Specific.ISpecificRecord)
556 var schema = (obj as Avro.Specific.ISpecificRecord).Schema;
557 Avro.Specific.SpecificDefaultWriter writer =
new Avro.Specific.SpecificDefaultWriter(schema);
558 writer.Write(schema, obj,
new BinaryEncoder(ms));
563 Type obj_type = obj.GetType();
564 KineticaType ktype = lookupKineticaType( obj_type );
567 throw new KineticaException(
"No known KineticaType associated with the given object. " +
568 "Need a known KineticaType to encode the object." );
572 var schema = KineticaData.SchemaFromType( obj.GetType(), ktype );
573 var recordToSend = MakeGenericRecord( obj, ktype );
574 var writer =
new Avro.Generic.DefaultWriter(schema);
575 writer.Write(schema, recordToSend,
new BinaryEncoder(ms));
590 private Avro.Generic.GenericRecord MakeGenericRecord(
object obj, KineticaType ktype )
593 var schema = KineticaData.SchemaFromType( obj.GetType(), ktype );
596 var recordToSend =
new Avro.Generic.GenericRecord(schema);
599 foreach ( var field
in schema.Fields)
601 var
property = obj.GetType()
603 .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
605 if (property == null)
continue;
607 recordToSend.Add(field.Name, property.GetValue(obj, null));
621 private T AvroDecode<T>(byte[] bytes, KineticaType ktype = null) where T :
new()
624 var schema = KineticaData.SchemaFromType( typeof(T), ktype );
627 using (var ms =
new MemoryStream(bytes))
631 if (obj is Avro.Specific.ISpecificRecord)
633 var reader =
new Avro.Specific.SpecificDefaultReader(schema, schema);
634 reader.Read(obj,
new BinaryDecoder(ms));
639 var reader =
new Avro.Generic.DefaultReader(schema, schema);
640 Avro.Generic.GenericRecord recordToReceive =
new Avro.Generic.GenericRecord(schema);
641 reader.Read(recordToReceive,
new BinaryDecoder(ms));
644 foreach (var field
in schema.Fields)
646 var
property = obj.GetType()
648 .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
650 if (property == null)
continue;
654 if (recordToReceive.TryGetValue(field.Name, out val))
657 property.SetValue(obj, val);
674 private T AvroDecode<T>(Stream stream) where T : Avro.Specific.ISpecificRecord,
new()
677 var schema = KineticaData.SchemaFromType( typeof(T), null );
678 var reader =
new Avro.Specific.SpecificReader<T>(schema, schema);
679 return reader.Read(
default(T),
new BinaryDecoder(stream));
int ThreadCount
Thread Count
Kinetica(string url, Options options=null)
API Constructor
const int END_OF_SET
No Limit
string Url
URL for Kinetica Server (including "http:" and port)
static string GetApiVersion()
API Version
void DecodeRawBinaryDataUsingTypeIDs< T >(IList< string > type_ids, IList< byte[]> records_binary, IList< T > records)
Given IDs of records types registered with Kinetica, decode binary data into distinct records (object...
void DecodeRawBinaryDataUsingSchemaString< T >(string schema_string, IList< byte[]> records_binary, IList< T > records)
Given a schema string for a certain record type, decode binary data into distinct records (objects)...
string Password
Optional: Password for user
string Username
Optional: User Name for Kinetica security
void SetKineticaSourceClassToTypeMapping(Type objectType, KineticaType kineticaType)
Saves an object class type to a KineticaType association.
string Username
Optional: User Name for Kinetica security
int ThreadCount
Thread Count
void AddTableType(string table_name, Type obj_type)
Given a table name, add its record type to enable proper encoding of records for insertion or updates...
API to talk to Kinetica Database