Kinetica   C#   API  Version 7.2.3.0
Kinetica.cs
Go to the documentation of this file.
1 using Avro.IO;
2 using Newtonsoft.Json;
3 using System;
4 using System.Collections.Concurrent;
5 using System.Collections.Generic;
6 using System.IO;
7 using System.Linq;
8 using System.Net;
9 using System.Text;
10 
34 
35 namespace kinetica
36 {
40  public partial class Kinetica
41  {
45  public const int END_OF_SET = -9999;
46 
50  public class Options
51  {
55  public string Username { get; set; } = string.Empty;
56 
60  public string Password { get; set; } = string.Empty;
61 
65  public string OauthToken { get; set; } = string.Empty;
69  public bool UseSnappy { get; set; } = false;
70 
74  public int ThreadCount { get; set; } = 1;
75  }
76 
81  public static string GetApiVersion() { return API_VERSION; }
82 
86  public string Url { get; private set; }
87 
91  public Uri URL { get; private set; }
92 
96  public string? Username { get; private set; } = null;
97 
101  private string? Password { get; set; } = null;
102 
106  private string? OauthToken { get; set; } = null;
107 
111  private string? Authorization { get; set; } = null;
112 
116  public bool UseSnappy { get; set; } = false;
117 
121  public int ThreadCount { get; set; } = 1;
122 
123  // private string authorization;
124  private volatile System.Collections.Concurrent.ConcurrentDictionary<string, KineticaType> knownTypes = new();
125 
126  // private type label to type ID lookup table
127  private Dictionary<string, string> typeNameLookup = [];
128 
129  // private object class type to KineticaType lookup table
130  private Dictionary<Type, KineticaType> kineticaTypeLookup = [];
131 
137  public Kinetica( string url_str, Options? options = null )
138  {
139  Url = url_str;
140  URL = new Uri( url_str );
141  if ( null != options ) // If caller specified options
142  {
143  Username = options.Username;
144  Password = options.Password;
145  OauthToken = options.OauthToken;
146 
147  // Handle authorization
148  Authorization = CreateAuthorizationHeader();
149 
150  UseSnappy = options.UseSnappy;
151  ThreadCount = options.ThreadCount;
152  // TODO: executor?
153  }
154  }
155 
156  internal string? CreateAuthorizationHeader() {
157  string? authorization = null;
158  // Handle authorization
159  if( OauthToken != null && OauthToken.Length > 0 ) {
160  authorization = "Bearer " + OauthToken;
161  }
162  else if ( ( Username != null && ( Username.Length > 0 ) ) || ( Password != null && ( Password.Length > 0 ) ) )
163  {
164  authorization = ( "Basic " +
165  Convert.ToBase64String( Encoding.GetEncoding( "ISO-8859-1" ).GetBytes( Username + ":" + Password ) ) );
166  }
167 
168  return authorization;
169  }
170 
177  public void AddTableType( string table_name, Type obj_type )
178  {
179  try
180  {
181  // Get the type from the table
182  KineticaType ktype = KineticaType.fromTable( this, table_name );
183  if ( ktype.getTypeID() == null )
184  throw new KineticaException( $"Could not get type ID for table '{table_name}'" );
185  this.knownTypes.TryAdd( ktype.getTypeID(), ktype );
186 
187  // Save a mapping of the object to the KineticaType
188  if ( obj_type != null )
189  this.SetKineticaSourceClassToTypeMapping( obj_type, ktype );
190 
191  } catch ( KineticaException ex )
192  {
193  throw new KineticaException( "Error creating type from table", ex );
194  }
195  } // end AddTableType
196 
203  public void SetKineticaSourceClassToTypeMapping( Type? objectType, KineticaType kineticaType )
204  {
205  if ( objectType != null )
206  this.kineticaTypeLookup.Add( objectType, kineticaType );
207  return;
208  } // end SetKineticaSourceClassToTypeMapping
209 
210 
211 
221  IList<byte[]> records_binary,
222  IList<T> records ) where T : new()
223  {
224  // Using the KineticaType object, decode all the records from avro binary encoding
225  foreach ( var bin_record in records_binary )
226  {
227  T obj = AvroDecode<T>( bin_record, record_type );
228  records.Add( obj );
229  }
230  } // DecodeRawBinaryDataUsingRecordType
231 
232 
241  public void DecodeRawBinaryDataUsingSchemaString<T>( string schema_string,
242  IList<byte[]> records_binary,
243  IList<T> records ) where T : new()
244  {
245  // Create a KineticaType object based on the schema string
246  KineticaType ktype = new("", schema_string, null);
247 
248  // Using the KineticaType object, decode all the records from avro binary encoding
249  foreach ( var bin_record in records_binary )
250  {
251  T obj = AvroDecode<T>( bin_record, ktype );
252  records.Add( obj );
253  }
254  } // DecodeRawBinaryDataUsingSchemaString
255 
265  public void DecodeRawBinaryDataUsingSchemaString<T>( IList<string> schema_strings,
266  IList<IList<byte[]>> lists_records_binary,
267  IList<IList<T>> record_lists ) where T : new()
268  {
269  // Check that the list of schemas and list of binary encode data match in length
270  if ( schema_strings.Count != lists_records_binary.Count )
271  throw new KineticaException( "List of schemas and list of binary encoded data do not match in count." );
272 
273  // Using the KineticaType object, decode all the records from avro binary encoding
274  for ( int i = 0; i < schema_strings.Count; ++i )
275  {
276  // Create a KineticaType object based on the schema string
277  KineticaType ktype = new( "", schema_strings[ i ], null );
278 
279  // Get the binary encoded data for this list
280  IList<byte[]> records_binary = lists_records_binary[ i ];
281 
282  // Create a container to put the decoded records
283  IList<T> records = [];
284 
285  // The inner list actually contains the binary data
286  foreach ( var bin_record in records_binary )
287  {
288  T obj = AvroDecode<T>( bin_record, ktype );
289  records.Add( obj );
290  }
291  // Add the records into the outgoing list
292  record_lists.Add( records );
293  }
294  } // DecodeRawBinaryDataUsingSchemaString
295 
296 
305  public void DecodeRawBinaryDataUsingTypeIDs<T>( IList<string> type_ids,
306  IList<byte[]> records_binary,
307  IList<T> records ) where T : new()
308  {
309  // Make sure that the length of the type IDs and records are the same
310  if ( type_ids.Count != records_binary.Count )
311  throw new KineticaException( "Unequal numbers of type IDs and binary encoded data objects provided." );
312 
313  // Decode all the records
314  for ( int i = 0; i < records_binary.Count; ++i )
315  {
316  // Per object, use the respective type ID to create the appropriate KineticaType
317  KineticaType ktype = KineticaType.fromTypeID( this, type_ids[ i ] );
318 
319  // Using the KineticaType object, decode the record.
320  T obj = AvroDecode<T>( records_binary[ i ], ktype );
321  records.Add( obj );
322  }
323  } // DecodeRawBinaryDataUsingTypeIDs
324 
325 
334  public void DecodeRawBinaryDataUsingTypeIDs<T>( IList<string> type_ids,
335  IList<IList<byte[]>> lists_records_binary,
336  IList<IList<T>> record_lists ) where T : new()
337  {
338  // Make sure that the length of the type IDs and records are the same
339  if ( type_ids.Count != lists_records_binary.Count )
340  throw new KineticaException( "Unequal numbers of type IDs and binary encoded data objects provided." );
341 
342  // Decode all the records
343  for ( int i = 0; i < lists_records_binary.Count; ++i )
344  {
345  // Per object, use the respective type ID to create the appropriate KineticaType
346  KineticaType ktype = KineticaType.fromTypeID( this, type_ids[ i ] );
347 
348  // Get the binary encoded data for this list
349  IList<byte[]> records_binary = lists_records_binary[ i ];
350 
351  // Create a container to put the decoded records
352  IList<T> records = [];
353 
354  // The inner list actually contains the binary data
355  foreach ( var bin_record in records_binary )
356  {
357  // Using the KineticaType object, decode the record.
358  T obj = AvroDecode<T>( bin_record, ktype );
359  records.Add( obj );
360  }
361  // Add the records into the outgoing list
362  record_lists.Add( records );
363  }
364  } // DecodeRawBinaryDataUsingTypeIDs
365 
366 
376  internal TResponse SubmitRequest<TResponse>( Uri url, object request, bool enableCompression = false, bool avroEncoding = true ) where TResponse : new()
377  {
378  // Get the bytes to send, encoded in the requested way
379  byte[] requestBytes;
380  if ( avroEncoding )
381  {
382  requestBytes = AvroEncode( request );
383  }
384  else // JSON
385  {
386  string str = JsonConvert.SerializeObject(request);
387  requestBytes = Encoding.UTF8.GetBytes( str );
388  }
389 
390  // Send request, and receive response
391  RawKineticaResponse kineticaResponse = SubmitRequestRaw( url.ToString(), requestBytes, enableCompression, avroEncoding, false);
392 
393  // Decode response payload
394  if ( avroEncoding )
395  {
396  return AvroDecode<TResponse>( kineticaResponse.data );
397  }
398  else // JSON
399  {
400  kineticaResponse.data_str = kineticaResponse.data_str.Replace( "\\U", "\\u" );
401  return JsonConvert.DeserializeObject<TResponse>( kineticaResponse.data_str );
402  }
403  } // end SubmitRequest( URL )
404 
405 
415  private TResponse SubmitRequest<TResponse>(string endpoint, object request, bool enableCompression = false, bool avroEncoding = true) where TResponse : new()
416  {
417  // Get the bytes to send, encoded in the requested way
418  byte[] requestBytes;
419  if (avroEncoding)
420  {
421  requestBytes = AvroEncode(request);
422  }
423  else // JSON
424  {
425  string str = JsonConvert.SerializeObject(request);
426  requestBytes = Encoding.UTF8.GetBytes(str);
427  }
428 
429  // Send request, and receive response
430  RawKineticaResponse kineticaResponse = SubmitRequestRaw(endpoint, requestBytes, enableCompression, avroEncoding);
431 
432  // Decode response payload
433  if (avroEncoding)
434  {
435  return AvroDecode<TResponse>(kineticaResponse.data);
436  }
437  else // JSON
438  {
439  kineticaResponse.data_str = kineticaResponse.data_str.Replace("\\U", "\\u");
440  return JsonConvert.DeserializeObject<TResponse>(kineticaResponse.data_str);
441  }
442  } // end SubmitRequest( endpoint )
443 
444 
445 
456  private RawKineticaResponse? SubmitRequestRaw(string url, byte[] requestBytes, bool enableCompression, bool avroEncoding, bool only_endpoint_given = true)
457  {
458  try
459  {
460  if ( only_endpoint_given )
461  url = (Url + url);
462  var request = (HttpWebRequest)WebRequest.Create( url );
463  request.Method = "POST";
464  //request.UseDefaultCredentials = true;
465  request.ContentType = avroEncoding ? "application/octet-stream" : "application/json";
466  request.ContentLength = requestBytes.Length;
467 
468  // Handle the authorization
469  if ( this.Authorization != null )
470  {
471  request.Headers.Add( "Authorization", Authorization );
472  }
473 
474 
475  // Write the binary request data
476  using ( var dataStream = request.GetRequestStream())
477  {
478  dataStream.Write(requestBytes, 0, requestBytes.Length);
479  }
480 
481  // Send to the server and await a response
482  using (var response = (HttpWebResponse)request.GetResponse())
483  {
484  // Parse the response
485  if (response.StatusCode == HttpStatusCode.OK)
486  {
487  using (var responseStream = response.GetResponseStream())
488  {
489  if (avroEncoding)
490  {
491  return AvroDecode<RawKineticaResponse>(responseStream);
492  }
493  else // JSON
494  {
495  using (StreamReader reader = new(responseStream, Encoding.UTF8))
496  {
497  var responseString = reader.ReadToEnd();
498  return JsonConvert.DeserializeObject<RawKineticaResponse>(responseString);
499  }
500  }
501  }
502  }
503  }
504  }
505  catch (System.Net.WebException ex)
506  {
507  // Skip trying parsing the message if not a protocol error
508  if ( ex.Status != WebExceptionStatus.ProtocolError )
509  throw new KineticaException( ex.ToString(), ex );
510 
511  // Get the error message from the server response
512  var response = ex.Response;
513  var responseStream = response.GetResponseStream();
514  string responseString;
515  RawKineticaResponse serverResponse;
516  // Decode the response packet
517  if (avroEncoding)
518  {
519  serverResponse = AvroDecode<RawKineticaResponse>(responseStream);
520  }
521  else // JSON
522  {
523  using (StreamReader reader = new(responseStream, Encoding.UTF8))
524  {
525  responseString = reader.ReadToEnd();
526  serverResponse = JsonConvert.DeserializeObject<RawKineticaResponse>(responseString);
527  }
528  }
529  // Throw the error message found within the response packet
530  throw new KineticaException( serverResponse.message );
531  }
532  catch (Exception ex)
533  {
534  throw new KineticaException(ex.ToString(), ex);
535  }
536 
537  return null;
538  }
539 
540  private void SetDecoderIfMissing(string typeId, string label, string schemaString, IDictionary<string, IList<string>> properties)
541  {
542  // If the table is a collection, it does not have a proper type so ignore it
543 
544  if (typeId == "<collection>")
545  {
546  return;
547  }
548 
549  knownTypes.GetOrAdd(typeId, (s) =>
550  {
551  return new KineticaType(label, schemaString, properties);
552  });
553  typeNameLookup[label] = typeId;
554  }
555 
556 
562  private KineticaType? GetType(string typeName)
563  {
564  KineticaType? type = null;
565  if (typeNameLookup.TryGetValue(typeName, out string? typeId))
566  {
567  knownTypes.TryGetValue(typeId, out type);
568  }
569 
570  return type;
571  }
572 
573 
579  private KineticaType? LookupKineticaType( Type objectType )
580  {
581  if (!kineticaTypeLookup.TryGetValue(objectType, out KineticaType? value))
582  return null; // none found
583 
584  return value;
585  } // LookupKineticaType()
586 
587 
593  internal byte[] AvroEncode(object obj)
594  {
595  // Create a stream that will allow us to view the underlying memory
596  using ( var ms = new MemoryStream())
597  {
598  // Write the object to the memory stream
599  // If obj is an ISpecificRecord, this is more efficient
600  if ( obj is Avro.Specific.ISpecificRecord)
601  {
602  var schema = (obj as Avro.Specific.ISpecificRecord).Schema;
603  Avro.Specific.SpecificDefaultWriter writer = new(schema);
604  writer.Write(schema, obj, new BinaryEncoder(ms));
605  }
606  else // Not an ISpecificRecord - this way is less efficient
607  {
608  // Get the KineticaType associated with the object to be encoded
609  Type obj_type = obj.GetType();
610  KineticaType? ktype = LookupKineticaType( obj_type );
611  if ( ktype == null )
612  {
613  throw new KineticaException( "No known KineticaType associated with the given object. " +
614  "Need a known KineticaType to encode the object." );
615  }
616 
617  // Make a copy of the object to send as a GenericRecord, then write that to the memory stream
618  var schema = KineticaData.SchemaFromType( obj.GetType(), ktype );
619  var recordToSend = MakeGenericRecord( obj, ktype );
620  var writer = new Avro.Generic.DefaultWriter(schema);
621  writer.Write(schema, recordToSend, new BinaryEncoder(ms));
622  }
623 
624  // Get the memory from the stream
625  return ms.ToArray();
626  }
627  } // end AvroEncode
628 
636  private Avro.Generic.GenericRecord MakeGenericRecord( object obj, KineticaType ktype )
637  {
638  // Get the schema
639  var schema = KineticaData.SchemaFromType( obj.GetType(), ktype );
640 
641  // Create a new GenericRecord for this schema
642  var recordToSend = new Avro.Generic.GenericRecord(schema);
643 
644  // Copy each field from obj to recordToSend
645  foreach ( var field in schema.Fields)
646  {
647  var property = obj.GetType()
648  .GetProperties()
649  .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
650 
651  if (property == null) continue;
652 
653  recordToSend.Add(field.Name, property.GetValue(obj, null));
654  }
655 
656  // Return the newly created object
657  return recordToSend;
658  }
659 
667  private T AvroDecode<T>(byte[] bytes, KineticaType? ktype = null) where T : new()
668  {
669  // Get the schema
670  var schema = KineticaData.SchemaFromType( typeof(T), ktype );
671 
672  // Create a stream to read the binary data
673  using (var ms = new MemoryStream(bytes))
674  {
675  // Create a new object to return
676  T obj = new();
677  if (obj is Avro.Specific.ISpecificRecord)
678  {
679  var reader = new Avro.Specific.SpecificDefaultReader(schema, schema);
680  reader.Read(obj, new BinaryDecoder(ms));
681  }
682  else
683  {
684  // Not ISpecificRecord, so first read into a new GenericRecord
685  var reader = new Avro.Generic.DefaultReader(schema, schema);
686  Avro.Generic.GenericRecord recordToReceive = new(schema);
687  reader.Read(recordToReceive, new BinaryDecoder(ms));
688 
689  // Now, copy all the fields from the GenericRecord to obj
690  foreach (var field in schema.Fields)
691  {
692  var property = obj.GetType()
693  .GetProperties()
694  .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
695 
696  if (property == null) continue;
697 
698  // Try to get the property
699  if (recordToReceive.TryGetValue(field.Name, out object val))
700  {
701  // If successful, write the property to obj
702  property.SetValue(obj, val);
703  }
704  } // end foreach
705  } // end if-else
706 
707  // Return the new object
708  return obj;
709  } // end using
710  } // end AvroDecode<T>
711 
712 
719  private T AvroDecode<T>(Stream stream) where T : Avro.Specific.ISpecificRecord, new()
720  {
721  // T obj = new T(); // Activator.CreateInstance<T>();
722  var schema = KineticaData.SchemaFromType( typeof(T), null );
723  var reader = new Avro.Specific.SpecificReader<T>(schema, schema);
724  return reader.Read(default, new BinaryDecoder(stream));
725  }
726  /*
727  private T AvroDecode<T>(string str) where T : new()
728  {
729  return AvroDecode<T>(Encoding.UTF8.GetBytes(str));
730  }
731  */
732  } // end class Kinetica
733 } // end namespace kinetica
734 
735 
int ThreadCount
Thread Count
Definition: Kinetica.cs:74
bool UseSnappy
Use Snappy
Definition: Kinetica.cs:69
const int END_OF_SET
No Limit
Definition: Kinetica.cs:45
void SetKineticaSourceClassToTypeMapping(Type? objectType, KineticaType kineticaType)
Saves an object class type to a KineticaType association.
Definition: Kinetica.cs:203
string OauthToken
Optional: OauthToken for user
Definition: Kinetica.cs:65
string Url
URL for Kinetica Server (including "http:" and port) as a string
Definition: Kinetica.cs:86
Uri URL
URL for Kinetica Server (including "http:" and port)
Definition: Kinetica.cs:91
void DecodeRawBinaryDataUsingRecordType< T >(KineticaType record_type, IList< byte[]> records_binary, IList< T > records)
Given a KineticaType object for a certain record type, decode binary data into distinct records (obje...
Definition: Kinetica.cs:220
static string GetApiVersion()
API Version
Definition: Kinetica.cs:81
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
static KineticaType fromTypeID(Kinetica kinetica, string typeId)
Create a KineticaType object based on an existing type in the database.
bool UseSnappy
Use Snappy
Definition: Kinetica.cs:116
void DecodeRawBinaryDataUsingTypeIDs< T >(IList< string > type_ids, IList< byte[]> records_binary, IList< T > records)
Given IDs of records types registered with Kinetica, decode binary data into distinct records (object...
Definition: Kinetica.cs:305
string? Username
Optional: User Name for Kinetica security
Definition: Kinetica.cs:96
const string API_VERSION
Connection Options
Definition: Kinetica.cs:50
void DecodeRawBinaryDataUsingSchemaString< T >(string schema_string, IList< byte[]> records_binary, IList< T > records)
Given a schema string for a certain record type, decode binary data into distinct records (objects).
Definition: Kinetica.cs:241
string Password
Optional: Password for user
Definition: Kinetica.cs:60
string Username
Optional: User Name for Kinetica security
Definition: Kinetica.cs:55
Kinetica(string url_str, Options? options=null)
API Constructor
Definition: Kinetica.cs:137
int ThreadCount
Thread Count
Definition: Kinetica.cs:121
void AddTableType(string table_name, Type obj_type)
Given a table name, add its record type to enable proper encoding of records for insertion or updates...
Definition: Kinetica.cs:177
API to talk to Kinetica Database
Definition: Kinetica.cs:40