Kinetica C# API  Version 6.0.1.0
 All Classes Namespaces Files Functions Variables Enumerations Enumerator Properties Pages
Kinetica.cs
Go to the documentation of this file.
1 using Avro.IO;
2 using Newtonsoft.Json;
3 using System;
4 using System.Collections.Concurrent;
5 using System.Collections.Generic;
6 using System.IO;
7 using System.Linq;
8 using System.Net;
9 using System.Text;
10 
34 
35 namespace kinetica
36 {
40  public partial class Kinetica
41  {
45  public const int END_OF_SET = -9999;
46 
50  public class Options
51  {
55  public string Username { get; set; }
56 
60  public string Password { get; set; }
61 
65  public bool UseSnappy { get; set; } = false;
66 
70  public int ThreadCount { get; set; } = 1;
71  }
72 
77  public static string GetApiVersion() { return API_VERSION; }
78 
82  public string Url { get; private set; }
83 
87  public string Username { get; private set; } = null;
88 
92  private string Password { get; set; } = null;
93 
97  private string Authorization { get; set; } = null;
98 
102  public bool UseSnappy { get; set; } = false;
103 
107  public int ThreadCount { get; set; } = 1;
108 
109  // private string authorization;
110  private volatile System.Collections.Concurrent.ConcurrentDictionary<string, KineticaType> knownTypes = new ConcurrentDictionary<string, KineticaType>();
111 
112  // private type label to type ID lookup table
113  private Dictionary<string, string> typeNameLookup = new Dictionary<string, string>();
114 
115  // private object class type to KineticaType lookup table
116  private Dictionary<Type, KineticaType> kineticaTypeLookup = new Dictionary<Type, KineticaType>();
117 
123  public Kinetica( string url, Options options = null )
124  {
125  Url = url;
126  if ( null != options ) // If caller specified options
127  {
128  Username = options.Username;
129  Password = options.Password;
130 
131  // Handle authorization
132  if ( ( Username != null && ( Username.Length > 0 ) ) || ( Password != null && ( Password.Length > 0 ) ) )
133  {
134  Authorization = ( "Basic " +
135  Convert.ToBase64String( Encoding.GetEncoding( "ISO-8859-1" ).GetBytes( Username + ":" + Password ) ) );
136  }
137 
138 
139  UseSnappy = options.UseSnappy;
140  ThreadCount = options.ThreadCount;
141  // TODO: executor?
142  }
143  }
144 
145 
152  public void AddTableType( string table_name, Type obj_type )
153  {
154  try
155  {
156  // Get the type from the table
157  KineticaType ktype = KineticaType.fromTable( this, table_name );
158  if ( ktype.getTypeID() == null )
159  throw new KineticaException( $"Could not get type ID for table '{table_name}'" );
160  this.knownTypes.TryAdd( ktype.getTypeID(), ktype );
161 
162  // Save a mapping of the object to the KineticaType
163  if ( obj_type != null )
164  this.SetKineticaSourceClassToTypeMapping( obj_type, ktype );
165 
166  } catch ( KineticaException ex )
167  {
168  throw new KineticaException( "Error creating type from table", ex );
169  }
170  } // end AddTableType
171 
178  public void SetKineticaSourceClassToTypeMapping( Type objectType, KineticaType kineticaType )
179  {
180  this.kineticaTypeLookup.Add( objectType, kineticaType );
181  return;
182  } // end SetKineticaSourceClassToTypeMapping
183 
184 
185 
194  public void DecodeRawBinaryDataUsingSchemaString<T>( string schema_string,
195  IList<byte[]> records_binary,
196  IList<T> records ) where T : new()
197  {
198  // Create a KineticaType object based on the schema string
199  KineticaType ktype = new KineticaType( "", schema_string, null );
200 
201  // Using the KineticaType object, decode all the records from avro binary encoding
202  foreach ( var bin_record in records_binary )
203  {
204  T obj = AvroDecode<T>( bin_record, ktype );
205  records.Add( obj );
206  }
207  } // DecodeRawBinaryDataUsingSchemaString
208 
218  public void DecodeRawBinaryDataUsingSchemaString<T>( IList<string> schema_strings,
219  IList<IList<byte[]>> lists_records_binary,
220  IList<IList<T>> record_lists ) where T : new()
221  {
222  // Check that the list of schemas and list of binary encode data match in length
223  if ( schema_strings.Count != lists_records_binary.Count )
224  throw new KineticaException( "List of schemas and list of binary encoded data do not match in count." );
225 
226  // Using the KineticaType object, decode all the records from avro binary encoding
227  for ( int i = 0; i < schema_strings.Count; ++i )
228  {
229  // Create a KineticaType object based on the schema string
230  KineticaType ktype = new KineticaType( "", schema_strings[ i ], null );
231 
232  // Get the binary encoded data for this list
233  IList<byte[]> records_binary = lists_records_binary[ i ];
234 
235  // Create a container to put the decoded records
236  IList<T> records = new List<T>();
237 
238  // The inner list actually contains the binary data
239  foreach ( var bin_record in records_binary )
240  {
241  T obj = AvroDecode<T>( bin_record, ktype );
242  records.Add( obj );
243  }
244  // Add the records into the outgoing list
245  record_lists.Add( records );
246  }
247  } // DecodeRawBinaryDataUsingSchemaString
248 
249 
258  public void DecodeRawBinaryDataUsingTypeIDs<T>( IList<string> type_ids,
259  IList<byte[]> records_binary,
260  IList<T> records ) where T : new()
261  {
262  // Make sure that the length of the type IDs and records are the same
263  if ( type_ids.Count != records_binary.Count )
264  throw new KineticaException( "Unequal numbers of type IDs and binary encoded data objects provided." );
265 
266  // Decode all the records
267  for ( int i = 0; i < records_binary.Count; ++i )
268  {
269  // Per object, use the respective type ID to create the appropriate KineticaType
270  KineticaType ktype = KineticaType.fromTypeID( this, type_ids[ i ] );
271 
272  // Using the KineticaType object, decode the record.
273  T obj = AvroDecode<T>( records_binary[ i ], ktype );
274  records.Add( obj );
275  }
276  } // DecodeRawBinaryDataUsingTypeIDs
277 
278 
287  public void DecodeRawBinaryDataUsingTypeIDs<T>( IList<string> type_ids,
288  IList<IList<byte[]>> lists_records_binary,
289  IList<IList<T>> record_lists ) where T : new()
290  {
291  // Make sure that the length of the type IDs and records are the same
292  if ( type_ids.Count != lists_records_binary.Count )
293  throw new KineticaException( "Unequal numbers of type IDs and binary encoded data objects provided." );
294 
295  // Decode all the records
296  for ( int i = 0; i < lists_records_binary.Count; ++i )
297  {
298  // Per object, use the respective type ID to create the appropriate KineticaType
299  KineticaType ktype = KineticaType.fromTypeID( this, type_ids[ i ] );
300 
301  // Get the binary encoded data for this list
302  IList<byte[]> records_binary = lists_records_binary[ i ];
303 
304  // Create a container to put the decoded records
305  IList<T> records = new List<T>();
306 
307  // The inner list actually contains the binary data
308  foreach ( var bin_record in records_binary )
309  {
310  // Using the KineticaType object, decode the record.
311  T obj = AvroDecode<T>( bin_record, ktype );
312  records.Add( obj );
313  }
314  // Add the records into the outgoing list
315  record_lists.Add( records );
316  }
317  } // DecodeRawBinaryDataUsingTypeIDs
318 
319 
329  internal TResponse SubmitRequest<TResponse>( Uri url, object request, bool enableCompression = false, bool avroEncoding = true ) where TResponse : new()
330  {
331  // Get the bytes to send, encoded in the requested way
332  byte[] requestBytes;
333  if ( avroEncoding )
334  {
335  requestBytes = AvroEncode( request );
336  }
337  else // JSON
338  {
339  string str = JsonConvert.SerializeObject(request);
340  requestBytes = Encoding.UTF8.GetBytes( str );
341  }
342 
343  // Send request, and receive response
344  KineticaResponse kineticaResponse = SubmitRequestRaw( url.ToString(), requestBytes, enableCompression, avroEncoding, false);
345 
346  // Decode response payload
347  if ( avroEncoding )
348  {
349  return AvroDecode<TResponse>( kineticaResponse.data );
350  }
351  else // JSON
352  {
353  kineticaResponse.data_str = kineticaResponse.data_str.Replace( "\\U", "\\u" );
354  return JsonConvert.DeserializeObject<TResponse>( kineticaResponse.data_str );
355  }
356  } // end SubmitRequest( URL )
357 
358 
368  private TResponse SubmitRequest<TResponse>(string endpoint, object request, bool enableCompression = false, bool avroEncoding = true) where TResponse : new()
369  {
370  // Get the bytes to send, encoded in the requested way
371  byte[] requestBytes;
372  if (avroEncoding)
373  {
374  requestBytes = AvroEncode(request);
375  }
376  else // JSON
377  {
378  string str = JsonConvert.SerializeObject(request);
379  requestBytes = Encoding.UTF8.GetBytes(str);
380  }
381 
382  // Send request, and receive response
383  KineticaResponse kineticaResponse = SubmitRequestRaw(endpoint, requestBytes, enableCompression, avroEncoding);
384 
385  // Decode response payload
386  if (avroEncoding)
387  {
388  return AvroDecode<TResponse>(kineticaResponse.data);
389  }
390  else // JSON
391  {
392  kineticaResponse.data_str = kineticaResponse.data_str.Replace("\\U", "\\u");
393  return JsonConvert.DeserializeObject<TResponse>(kineticaResponse.data_str);
394  }
395  } // end SubmitRequest( endpoint )
396 
397 
398 
409  private KineticaResponse SubmitRequestRaw(string url, byte[] requestBytes, bool enableCompression, bool avroEncoding, bool only_endpoint_given = true)
410  {
411  try
412  {
413  if ( only_endpoint_given )
414  url = (Url + url);
415  var request = (HttpWebRequest)WebRequest.Create( url );
416  request.Method = "POST";
417  //request.UseDefaultCredentials = true;
418  request.ContentType = avroEncoding ? "application/octet-stream" : "application/json";
419  request.ContentLength = requestBytes.Length;
420 
421  // Handle the authorization
422  if ( this.Authorization != null )
423  {
424  request.Headers.Add( "Authorization", Authorization );
425  }
426 
427 
428  // Write the binary request data
429  using ( var dataStream = request.GetRequestStream())
430  {
431  dataStream.Write(requestBytes, 0, requestBytes.Length);
432  }
433 
434  // Send to the server and await a response
435  using (var response = (HttpWebResponse)request.GetResponse())
436  {
437  // Parse the response
438  if (response.StatusCode == HttpStatusCode.OK)
439  {
440  using (var responseStream = response.GetResponseStream())
441  {
442  if (avroEncoding)
443  {
444  return AvroDecode<KineticaResponse>(responseStream);
445  }
446  else // JSON
447  {
448  using (StreamReader reader = new StreamReader(responseStream, Encoding.UTF8))
449  {
450  var responseString = reader.ReadToEnd();
451  return JsonConvert.DeserializeObject<KineticaResponse>(responseString);
452  }
453  }
454  }
455  }
456  }
457  }
458  catch (System.Net.WebException ex)
459  {
460  // Skip trying parsing the message if not a protocol error
461  if ( ex.Status != WebExceptionStatus.ProtocolError )
462  throw new KineticaException( ex.ToString(), ex );
463 
464  // Get the error message from the server response
465  var response = ex.Response;
466  var responseStream = response.GetResponseStream();
467  string responseString;
468  KineticaResponse serverResponse;
469  // Decode the response packet
470  if (avroEncoding)
471  {
472  serverResponse = AvroDecode<KineticaResponse>(responseStream);
473  }
474  else // JSON
475  {
476  using (StreamReader reader = new StreamReader(responseStream, Encoding.UTF8))
477  {
478  responseString = reader.ReadToEnd();
479  serverResponse = JsonConvert.DeserializeObject<KineticaResponse>(responseString);
480  }
481  }
482  // Throw the error message found within the response packet
483  throw new KineticaException( serverResponse.message );
484  }
485  catch (Exception ex)
486  {
487  throw new KineticaException(ex.ToString(), ex);
488  }
489 
490  return null;
491  }
492 
493  private void SetDecoderIfMissing(string typeId, string label, string schemaString, IDictionary<string, IList<string>> properties)
494  {
495  // If the table is a collection, it does not have a proper type so ignore it
496 
497  if (typeId == "<collection>")
498  {
499  return;
500  }
501 
502  knownTypes.GetOrAdd(typeId, (s) =>
503  {
504  return new KineticaType(label, schemaString, properties);
505  });
506  typeNameLookup[label] = typeId;
507  }
508 
509 
515  private KineticaType GetType(string typeName)
516  {
517  KineticaType type = null;
518  string typeId;
519  if (typeNameLookup.TryGetValue(typeName, out typeId))
520  {
521  knownTypes.TryGetValue(typeId, out type);
522  }
523 
524  return type;
525  }
526 
527 
533  private KineticaType lookupKineticaType( Type objectType )
534  {
535  if ( !this.kineticaTypeLookup.ContainsKey( objectType ) )
536  return null; // none found
537 
538  return this.kineticaTypeLookup[ objectType ];
539  } // lookupKineticaType()
540 
541 
547  internal byte[] AvroEncode(object obj)
548  {
549  // Create a stream that will allow us to view the underlying memory
550  using ( var ms = new MemoryStream())
551  {
552  // Write the object to the memory stream
553  // If obj is an ISpecificRecord, this is more efficient
554  if ( obj is Avro.Specific.ISpecificRecord)
555  {
556  var schema = (obj as Avro.Specific.ISpecificRecord).Schema;
557  Avro.Specific.SpecificDefaultWriter writer = new Avro.Specific.SpecificDefaultWriter(schema);
558  writer.Write(schema, obj, new BinaryEncoder(ms));
559  }
560  else // Not an ISpecificRecord - this way is less efficient
561  {
562  // Get the KineticaType associated with the object to be encoded
563  Type obj_type = obj.GetType();
564  KineticaType ktype = lookupKineticaType( obj_type );
565  if ( ktype == null )
566  {
567  throw new KineticaException( "No known KineticaType associated with the given object. " +
568  "Need a known KineticaType to encode the object." );
569  }
570 
571  // Make a copy of the object to send as a GenericRecord, then write that to the memory stream
572  var schema = KineticaData.SchemaFromType( obj.GetType(), ktype );
573  var recordToSend = MakeGenericRecord( obj, ktype );
574  var writer = new Avro.Generic.DefaultWriter(schema);
575  writer.Write(schema, recordToSend, new BinaryEncoder(ms));
576  }
577 
578  // Get the memory from the stream
579  return ms.ToArray();
580  }
581  } // end AvroEncode
582 
590  private Avro.Generic.GenericRecord MakeGenericRecord( object obj, KineticaType ktype )
591  {
592  // Get the schema
593  var schema = KineticaData.SchemaFromType( obj.GetType(), ktype );
594 
595  // Create a new GenericRecord for this schema
596  var recordToSend = new Avro.Generic.GenericRecord(schema);
597 
598  // Copy each field from obj to recordToSend
599  foreach ( var field in schema.Fields)
600  {
601  var property = obj.GetType()
602  .GetProperties()
603  .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
604 
605  if (property == null) continue;
606 
607  recordToSend.Add(field.Name, property.GetValue(obj, null));
608  }
609 
610  // Return the newly created object
611  return recordToSend;
612  }
613 
621  private T AvroDecode<T>(byte[] bytes, KineticaType ktype = null) where T : new()
622  {
623  // Get the schema
624  var schema = KineticaData.SchemaFromType( typeof(T), ktype );
625 
626  // Create a stream to read the binary data
627  using (var ms = new MemoryStream(bytes))
628  {
629  // Create a new object to return
630  T obj = new T();
631  if (obj is Avro.Specific.ISpecificRecord)
632  {
633  var reader = new Avro.Specific.SpecificDefaultReader(schema, schema);
634  reader.Read(obj, new BinaryDecoder(ms));
635  }
636  else
637  {
638  // Not ISpecificRecord, so first read into a new GenericRecord
639  var reader = new Avro.Generic.DefaultReader(schema, schema);
640  Avro.Generic.GenericRecord recordToReceive = new Avro.Generic.GenericRecord(schema);
641  reader.Read(recordToReceive, new BinaryDecoder(ms));
642 
643  // Now, copy all the fields from the GenericRecord to obj
644  foreach (var field in schema.Fields)
645  {
646  var property = obj.GetType()
647  .GetProperties()
648  .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
649 
650  if (property == null) continue;
651 
652  object val;
653  // Try to get the property
654  if (recordToReceive.TryGetValue(field.Name, out val))
655  {
656  // If successful, write the property to obj
657  property.SetValue(obj, val);
658  }
659  } // end foreach
660  } // end if-else
661 
662  // Return the new object
663  return obj;
664  } // end using
665  } // end AvroDecode<T>
666 
667 
674  private T AvroDecode<T>(Stream stream) where T : Avro.Specific.ISpecificRecord, new()
675  {
676  // T obj = new T(); // Activator.CreateInstance<T>();
677  var schema = KineticaData.SchemaFromType( typeof(T), null );
678  var reader = new Avro.Specific.SpecificReader<T>(schema, schema);
679  return reader.Read(default(T), new BinaryDecoder(stream));
680  }
681  /*
682  private T AvroDecode<T>(string str) where T : new()
683  {
684  return AvroDecode<T>(Encoding.UTF8.GetBytes(str));
685  }
686  */
687  } // end class Kinetica
688 } // end namespace kinetica
689 
690 
int ThreadCount
Thread Count
Definition: Kinetica.cs:70
Kinetica(string url, Options options=null)
API Constructor
Definition: Kinetica.cs:123
bool UseSnappy
Use Snappy
Definition: Kinetica.cs:65
const int END_OF_SET
No Limit
Definition: Kinetica.cs:45
string Url
URL for Kinetica Server (including &quot;http:&quot; and port)
Definition: Kinetica.cs:82
static string GetApiVersion()
API Version
Definition: Kinetica.cs:77
bool UseSnappy
Use Snappy
Definition: Kinetica.cs:102
void DecodeRawBinaryDataUsingTypeIDs< T >(IList< string > type_ids, IList< byte[]> records_binary, IList< T > records)
Given IDs of records types registered with Kinetica, decode binary data into distinct records (object...
Definition: Kinetica.cs:258
const string API_VERSION
Connection Options
Definition: Kinetica.cs:50
void DecodeRawBinaryDataUsingSchemaString< T >(string schema_string, IList< byte[]> records_binary, IList< T > records)
Given a schema string for a certain record type, decode binary data into distinct records (objects)...
Definition: Kinetica.cs:194
string Password
Optional: Password for user
Definition: Kinetica.cs:60
string Username
Optional: User Name for Kinetica security
Definition: Kinetica.cs:87
void SetKineticaSourceClassToTypeMapping(Type objectType, KineticaType kineticaType)
Saves an object class type to a KineticaType association.
Definition: Kinetica.cs:178
string Username
Optional: User Name for Kinetica security
Definition: Kinetica.cs:55
int ThreadCount
Thread Count
Definition: Kinetica.cs:107
void AddTableType(string table_name, Type obj_type)
Given a table name, add its record type to enable proper encoding of records for insertion or updates...
Definition: Kinetica.cs:152
API to talk to Kinetica Database
Definition: Kinetica.cs:40