Kinetica C# API  Version 6.2.0.1
Kinetica.cs
Go to the documentation of this file.
1 using Avro.IO;
2 using Newtonsoft.Json;
3 using System;
4 using System.Collections.Concurrent;
5 using System.Collections.Generic;
6 using System.IO;
7 using System.Linq;
8 using System.Net;
9 using System.Text;
10 
34 
35 namespace kinetica
36 {
40  public partial class Kinetica
41  {
45  public const int END_OF_SET = -9999;
46 
50  public class Options
51  {
55  public string Username { get; set; }
56 
60  public string Password { get; set; }
61 
65  public bool UseSnappy { get; set; } = false;
66 
70  public int ThreadCount { get; set; } = 1;
71  }
72 
77  public static string GetApiVersion() { return API_VERSION; }
78 
82  public string Url { get; private set; }
83 
87  public Uri URL { get; private set; }
88 
92  public string Username { get; private set; } = null;
93 
97  private string Password { get; set; } = null;
98 
102  private string Authorization { get; set; } = null;
103 
107  public bool UseSnappy { get; set; } = false;
108 
112  public int ThreadCount { get; set; } = 1;
113 
114  // private string authorization;
115  private volatile System.Collections.Concurrent.ConcurrentDictionary<string, KineticaType> knownTypes = new ConcurrentDictionary<string, KineticaType>();
116 
117  // private type label to type ID lookup table
118  private Dictionary<string, string> typeNameLookup = new Dictionary<string, string>();
119 
120  // private object class type to KineticaType lookup table
121  private Dictionary<Type, KineticaType> kineticaTypeLookup = new Dictionary<Type, KineticaType>();
122 
128  public Kinetica( string url_str, Options options = null )
129  {
130  Url = url_str;
131  URL = new Uri( url_str );
132  if ( null != options ) // If caller specified options
133  {
134  Username = options.Username;
135  Password = options.Password;
136 
137  // Handle authorization
138  if ( ( Username != null && ( Username.Length > 0 ) ) || ( Password != null && ( Password.Length > 0 ) ) )
139  {
140  Authorization = ( "Basic " +
141  Convert.ToBase64String( Encoding.GetEncoding( "ISO-8859-1" ).GetBytes( Username + ":" + Password ) ) );
142  }
143 
144 
145  UseSnappy = options.UseSnappy;
146  ThreadCount = options.ThreadCount;
147  // TODO: executor?
148  }
149  }
150 
151 
158  public void AddTableType( string table_name, Type obj_type )
159  {
160  try
161  {
162  // Get the type from the table
163  KineticaType ktype = KineticaType.fromTable( this, table_name );
164  if ( ktype.getTypeID() == null )
165  throw new KineticaException( $"Could not get type ID for table '{table_name}'" );
166  this.knownTypes.TryAdd( ktype.getTypeID(), ktype );
167 
168  // Save a mapping of the object to the KineticaType
169  if ( obj_type != null )
170  this.SetKineticaSourceClassToTypeMapping( obj_type, ktype );
171 
172  } catch ( KineticaException ex )
173  {
174  throw new KineticaException( "Error creating type from table", ex );
175  }
176  } // end AddTableType
177 
184  public void SetKineticaSourceClassToTypeMapping( Type objectType, KineticaType kineticaType )
185  {
186  this.kineticaTypeLookup.Add( objectType, kineticaType );
187  return;
188  } // end SetKineticaSourceClassToTypeMapping
189 
190 
191 
201  IList<byte[]> records_binary,
202  IList<T> records ) where T : new()
203  {
204  // Using the KineticaType object, decode all the records from avro binary encoding
205  foreach ( var bin_record in records_binary )
206  {
207  T obj = AvroDecode<T>( bin_record, record_type );
208  records.Add( obj );
209  }
210  } // DecodeRawBinaryDataUsingRecordType
211 
212 
221  public void DecodeRawBinaryDataUsingSchemaString<T>( string schema_string,
222  IList<byte[]> records_binary,
223  IList<T> records ) where T : new()
224  {
225  // Create a KineticaType object based on the schema string
226  KineticaType ktype = new KineticaType( "", schema_string, null );
227 
228  // Using the KineticaType object, decode all the records from avro binary encoding
229  foreach ( var bin_record in records_binary )
230  {
231  T obj = AvroDecode<T>( bin_record, ktype );
232  records.Add( obj );
233  }
234  } // DecodeRawBinaryDataUsingSchemaString
235 
245  public void DecodeRawBinaryDataUsingSchemaString<T>( IList<string> schema_strings,
246  IList<IList<byte[]>> lists_records_binary,
247  IList<IList<T>> record_lists ) where T : new()
248  {
249  // Check that the list of schemas and list of binary encode data match in length
250  if ( schema_strings.Count != lists_records_binary.Count )
251  throw new KineticaException( "List of schemas and list of binary encoded data do not match in count." );
252 
253  // Using the KineticaType object, decode all the records from avro binary encoding
254  for ( int i = 0; i < schema_strings.Count; ++i )
255  {
256  // Create a KineticaType object based on the schema string
257  KineticaType ktype = new KineticaType( "", schema_strings[ i ], null );
258 
259  // Get the binary encoded data for this list
260  IList<byte[]> records_binary = lists_records_binary[ i ];
261 
262  // Create a container to put the decoded records
263  IList<T> records = new List<T>();
264 
265  // The inner list actually contains the binary data
266  foreach ( var bin_record in records_binary )
267  {
268  T obj = AvroDecode<T>( bin_record, ktype );
269  records.Add( obj );
270  }
271  // Add the records into the outgoing list
272  record_lists.Add( records );
273  }
274  } // DecodeRawBinaryDataUsingSchemaString
275 
276 
285  public void DecodeRawBinaryDataUsingTypeIDs<T>( IList<string> type_ids,
286  IList<byte[]> records_binary,
287  IList<T> records ) where T : new()
288  {
289  // Make sure that the length of the type IDs and records are the same
290  if ( type_ids.Count != records_binary.Count )
291  throw new KineticaException( "Unequal numbers of type IDs and binary encoded data objects provided." );
292 
293  // Decode all the records
294  for ( int i = 0; i < records_binary.Count; ++i )
295  {
296  // Per object, use the respective type ID to create the appropriate KineticaType
297  KineticaType ktype = KineticaType.fromTypeID( this, type_ids[ i ] );
298 
299  // Using the KineticaType object, decode the record.
300  T obj = AvroDecode<T>( records_binary[ i ], ktype );
301  records.Add( obj );
302  }
303  } // DecodeRawBinaryDataUsingTypeIDs
304 
305 
314  public void DecodeRawBinaryDataUsingTypeIDs<T>( IList<string> type_ids,
315  IList<IList<byte[]>> lists_records_binary,
316  IList<IList<T>> record_lists ) where T : new()
317  {
318  // Make sure that the length of the type IDs and records are the same
319  if ( type_ids.Count != lists_records_binary.Count )
320  throw new KineticaException( "Unequal numbers of type IDs and binary encoded data objects provided." );
321 
322  // Decode all the records
323  for ( int i = 0; i < lists_records_binary.Count; ++i )
324  {
325  // Per object, use the respective type ID to create the appropriate KineticaType
326  KineticaType ktype = KineticaType.fromTypeID( this, type_ids[ i ] );
327 
328  // Get the binary encoded data for this list
329  IList<byte[]> records_binary = lists_records_binary[ i ];
330 
331  // Create a container to put the decoded records
332  IList<T> records = new List<T>();
333 
334  // The inner list actually contains the binary data
335  foreach ( var bin_record in records_binary )
336  {
337  // Using the KineticaType object, decode the record.
338  T obj = AvroDecode<T>( bin_record, ktype );
339  records.Add( obj );
340  }
341  // Add the records into the outgoing list
342  record_lists.Add( records );
343  }
344  } // DecodeRawBinaryDataUsingTypeIDs
345 
346 
356  internal TResponse SubmitRequest<TResponse>( Uri url, object request, bool enableCompression = false, bool avroEncoding = true ) where TResponse : new()
357  {
358  // Get the bytes to send, encoded in the requested way
359  byte[] requestBytes;
360  if ( avroEncoding )
361  {
362  requestBytes = AvroEncode( request );
363  }
364  else // JSON
365  {
366  string str = JsonConvert.SerializeObject(request);
367  requestBytes = Encoding.UTF8.GetBytes( str );
368  }
369 
370  // Send request, and receive response
371  RawKineticaResponse kineticaResponse = SubmitRequestRaw( url.ToString(), requestBytes, enableCompression, avroEncoding, false);
372 
373  // Decode response payload
374  if ( avroEncoding )
375  {
376  return AvroDecode<TResponse>( kineticaResponse.data );
377  }
378  else // JSON
379  {
380  kineticaResponse.data_str = kineticaResponse.data_str.Replace( "\\U", "\\u" );
381  return JsonConvert.DeserializeObject<TResponse>( kineticaResponse.data_str );
382  }
383  } // end SubmitRequest( URL )
384 
385 
395  private TResponse SubmitRequest<TResponse>(string endpoint, object request, bool enableCompression = false, bool avroEncoding = true) where TResponse : new()
396  {
397  // Get the bytes to send, encoded in the requested way
398  byte[] requestBytes;
399  if (avroEncoding)
400  {
401  requestBytes = AvroEncode(request);
402  }
403  else // JSON
404  {
405  string str = JsonConvert.SerializeObject(request);
406  requestBytes = Encoding.UTF8.GetBytes(str);
407  }
408 
409  // Send request, and receive response
410  RawKineticaResponse kineticaResponse = SubmitRequestRaw(endpoint, requestBytes, enableCompression, avroEncoding);
411 
412  // Decode response payload
413  if (avroEncoding)
414  {
415  return AvroDecode<TResponse>(kineticaResponse.data);
416  }
417  else // JSON
418  {
419  kineticaResponse.data_str = kineticaResponse.data_str.Replace("\\U", "\\u");
420  return JsonConvert.DeserializeObject<TResponse>(kineticaResponse.data_str);
421  }
422  } // end SubmitRequest( endpoint )
423 
424 
425 
436  private RawKineticaResponse SubmitRequestRaw(string url, byte[] requestBytes, bool enableCompression, bool avroEncoding, bool only_endpoint_given = true)
437  {
438  try
439  {
440  if ( only_endpoint_given )
441  url = (Url + url);
442  var request = (HttpWebRequest)WebRequest.Create( url );
443  request.Method = "POST";
444  //request.UseDefaultCredentials = true;
445  request.ContentType = avroEncoding ? "application/octet-stream" : "application/json";
446  request.ContentLength = requestBytes.Length;
447 
448  // Handle the authorization
449  if ( this.Authorization != null )
450  {
451  request.Headers.Add( "Authorization", Authorization );
452  }
453 
454 
455  // Write the binary request data
456  using ( var dataStream = request.GetRequestStream())
457  {
458  dataStream.Write(requestBytes, 0, requestBytes.Length);
459  }
460 
461  // Send to the server and await a response
462  using (var response = (HttpWebResponse)request.GetResponse())
463  {
464  // Parse the response
465  if (response.StatusCode == HttpStatusCode.OK)
466  {
467  using (var responseStream = response.GetResponseStream())
468  {
469  if (avroEncoding)
470  {
471  return AvroDecode<RawKineticaResponse>(responseStream);
472  }
473  else // JSON
474  {
475  using (StreamReader reader = new StreamReader(responseStream, Encoding.UTF8))
476  {
477  var responseString = reader.ReadToEnd();
478  return JsonConvert.DeserializeObject<RawKineticaResponse>(responseString);
479  }
480  }
481  }
482  }
483  }
484  }
485  catch (System.Net.WebException ex)
486  {
487  // Skip trying parsing the message if not a protocol error
488  if ( ex.Status != WebExceptionStatus.ProtocolError )
489  throw new KineticaException( ex.ToString(), ex );
490 
491  // Get the error message from the server response
492  var response = ex.Response;
493  var responseStream = response.GetResponseStream();
494  string responseString;
495  RawKineticaResponse serverResponse;
496  // Decode the response packet
497  if (avroEncoding)
498  {
499  serverResponse = AvroDecode<RawKineticaResponse>(responseStream);
500  }
501  else // JSON
502  {
503  using (StreamReader reader = new StreamReader(responseStream, Encoding.UTF8))
504  {
505  responseString = reader.ReadToEnd();
506  serverResponse = JsonConvert.DeserializeObject<RawKineticaResponse>(responseString);
507  }
508  }
509  // Throw the error message found within the response packet
510  throw new KineticaException( serverResponse.message );
511  }
512  catch (Exception ex)
513  {
514  throw new KineticaException(ex.ToString(), ex);
515  }
516 
517  return null;
518  }
519 
520  private void SetDecoderIfMissing(string typeId, string label, string schemaString, IDictionary<string, IList<string>> properties)
521  {
522  // If the table is a collection, it does not have a proper type so ignore it
523 
524  if (typeId == "<collection>")
525  {
526  return;
527  }
528 
529  knownTypes.GetOrAdd(typeId, (s) =>
530  {
531  return new KineticaType(label, schemaString, properties);
532  });
533  typeNameLookup[label] = typeId;
534  }
535 
536 
542  private KineticaType GetType(string typeName)
543  {
544  KineticaType type = null;
545  string typeId;
546  if (typeNameLookup.TryGetValue(typeName, out typeId))
547  {
548  knownTypes.TryGetValue(typeId, out type);
549  }
550 
551  return type;
552  }
553 
554 
560  private KineticaType lookupKineticaType( Type objectType )
561  {
562  if ( !this.kineticaTypeLookup.ContainsKey( objectType ) )
563  return null; // none found
564 
565  return this.kineticaTypeLookup[ objectType ];
566  } // lookupKineticaType()
567 
568 
574  internal byte[] AvroEncode(object obj)
575  {
576  // Create a stream that will allow us to view the underlying memory
577  using ( var ms = new MemoryStream())
578  {
579  // Write the object to the memory stream
580  // If obj is an ISpecificRecord, this is more efficient
581  if ( obj is Avro.Specific.ISpecificRecord)
582  {
583  var schema = (obj as Avro.Specific.ISpecificRecord).Schema;
584  Avro.Specific.SpecificDefaultWriter writer = new Avro.Specific.SpecificDefaultWriter(schema);
585  writer.Write(schema, obj, new BinaryEncoder(ms));
586  }
587  else // Not an ISpecificRecord - this way is less efficient
588  {
589  // Get the KineticaType associated with the object to be encoded
590  Type obj_type = obj.GetType();
591  KineticaType ktype = lookupKineticaType( obj_type );
592  if ( ktype == null )
593  {
594  throw new KineticaException( "No known KineticaType associated with the given object. " +
595  "Need a known KineticaType to encode the object." );
596  }
597 
598  // Make a copy of the object to send as a GenericRecord, then write that to the memory stream
599  var schema = KineticaData.SchemaFromType( obj.GetType(), ktype );
600  var recordToSend = MakeGenericRecord( obj, ktype );
601  var writer = new Avro.Generic.DefaultWriter(schema);
602  writer.Write(schema, recordToSend, new BinaryEncoder(ms));
603  }
604 
605  // Get the memory from the stream
606  return ms.ToArray();
607  }
608  } // end AvroEncode
609 
617  private Avro.Generic.GenericRecord MakeGenericRecord( object obj, KineticaType ktype )
618  {
619  // Get the schema
620  var schema = KineticaData.SchemaFromType( obj.GetType(), ktype );
621 
622  // Create a new GenericRecord for this schema
623  var recordToSend = new Avro.Generic.GenericRecord(schema);
624 
625  // Copy each field from obj to recordToSend
626  foreach ( var field in schema.Fields)
627  {
628  var property = obj.GetType()
629  .GetProperties()
630  .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
631 
632  if (property == null) continue;
633 
634  recordToSend.Add(field.Name, property.GetValue(obj, null));
635  }
636 
637  // Return the newly created object
638  return recordToSend;
639  }
640 
648  private T AvroDecode<T>(byte[] bytes, KineticaType ktype = null) where T : new()
649  {
650  // Get the schema
651  var schema = KineticaData.SchemaFromType( typeof(T), ktype );
652 
653  // Create a stream to read the binary data
654  using (var ms = new MemoryStream(bytes))
655  {
656  // Create a new object to return
657  T obj = new T();
658  if (obj is Avro.Specific.ISpecificRecord)
659  {
660  var reader = new Avro.Specific.SpecificDefaultReader(schema, schema);
661  reader.Read(obj, new BinaryDecoder(ms));
662  }
663  else
664  {
665  // Not ISpecificRecord, so first read into a new GenericRecord
666  var reader = new Avro.Generic.DefaultReader(schema, schema);
667  Avro.Generic.GenericRecord recordToReceive = new Avro.Generic.GenericRecord(schema);
668  reader.Read(recordToReceive, new BinaryDecoder(ms));
669 
670  // Now, copy all the fields from the GenericRecord to obj
671  foreach (var field in schema.Fields)
672  {
673  var property = obj.GetType()
674  .GetProperties()
675  .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
676 
677  if (property == null) continue;
678 
679  object val;
680  // Try to get the property
681  if (recordToReceive.TryGetValue(field.Name, out val))
682  {
683  // If successful, write the property to obj
684  property.SetValue(obj, val);
685  }
686  } // end foreach
687  } // end if-else
688 
689  // Return the new object
690  return obj;
691  } // end using
692  } // end AvroDecode<T>
693 
694 
701  private T AvroDecode<T>(Stream stream) where T : Avro.Specific.ISpecificRecord, new()
702  {
703  // T obj = new T(); // Activator.CreateInstance<T>();
704  var schema = KineticaData.SchemaFromType( typeof(T), null );
705  var reader = new Avro.Specific.SpecificReader<T>(schema, schema);
706  return reader.Read(default(T), new BinaryDecoder(stream));
707  }
708  /*
709  private T AvroDecode<T>(string str) where T : new()
710  {
711  return AvroDecode<T>(Encoding.UTF8.GetBytes(str));
712  }
713  */
714  } // end class Kinetica
715 } // end namespace kinetica
716 
717 
int ThreadCount
Thread Count
Definition: Kinetica.cs:70
bool UseSnappy
Use Snappy
Definition: Kinetica.cs:65
const int END_OF_SET
No Limit
Definition: Kinetica.cs:45
string Url
URL for Kinetica Server (including "http:" and port) as a string
Definition: Kinetica.cs:82
Uri URL
URL for Kinetica Server (including "http:" and port)
Definition: Kinetica.cs:87
void DecodeRawBinaryDataUsingRecordType< T >(KineticaType record_type, IList< byte[]> records_binary, IList< T > records)
Given a KineticaType object for a certain record type, decode binary data into distinct records (obje...
Definition: Kinetica.cs:200
static string GetApiVersion()
API Version
Definition: Kinetica.cs:77
static RecordSchema SchemaFromType(System.Type t, KineticaType ktype=null)
Create an Avro Schema from a System.Type and a KineticaType.
Definition: KineticaData.cs:79
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
static KineticaType fromTypeID(Kinetica kinetica, string typeId)
Create a KineticaType object based on an existing type in the database.
void DecodeRawBinaryDataUsingTypeIDs< T >(IList< string > type_ids, IList< byte[]> records_binary, IList< T > records)
Given IDs of records types registered with Kinetica, decode binary data into distinct records (object...
Definition: Kinetica.cs:285
const string API_VERSION
Connection Options
Definition: Kinetica.cs:50
void DecodeRawBinaryDataUsingSchemaString< T >(string schema_string, IList< byte[]> records_binary, IList< T > records)
Given a schema string for a certain record type, decode binary data into distinct records (objects)...
Definition: Kinetica.cs:221
string Password
Optional: Password for user
Definition: Kinetica.cs:60
void SetKineticaSourceClassToTypeMapping(Type objectType, KineticaType kineticaType)
Saves an object class type to a KineticaType association.
Definition: Kinetica.cs:184
Kinetica(string url_str, Options options=null)
API Constructor
Definition: Kinetica.cs:128
string Username
Optional: User Name for Kinetica security
Definition: Kinetica.cs:55
KineticaData - class to help with Avro Encoding for Kinetica
Definition: KineticaData.cs:14
A set of parameters for the raw wrapper for Kinetica responses.
void AddTableType(string table_name, Type obj_type)
Given a table name, add its record type to enable proper encoding of records for insertion or updates...
Definition: Kinetica.cs:158
API to talk to Kinetica Database
Definition: Kinetica.cs:40