Kinetica   C#   API  Version 7.2.3.0
Kinetica.cs
Go to the documentation of this file.
1 using Avro.IO;
2 using Newtonsoft.Json;
3 using System;
4 using System.Collections.Concurrent;
5 using System.Collections.Generic;
6 using System.IO;
7 using System.Linq;
8 using System.Net;
9 using System.Text;
10 
34 
35 namespace kinetica
36 {
40  public partial class Kinetica
41  {
45  public const int END_OF_SET = -9999;
46 
50  public class Options
51  {
55  public string Username { get; set; } = string.Empty;
56 
60  public string Password { get; set; } = string.Empty;
61 
65  public string OauthToken { get; set; } = string.Empty;
69  public bool UseSnappy { get; set; } = false;
70 
74  public int ThreadCount { get; set; } = 1;
75  }
76 
81  public static string GetApiVersion() { return API_VERSION; }
82 
86  public string Url { get; private set; }
87 
91  public Uri URL { get; private set; }
92 
96  public string? Username { get; private set; } = null;
97 
101  private string? Password { get; set; } = null;
102 
106  private string? OauthToken { get; set; } = null;
107 
111  private string? Authorization { get; set; } = null;
112 
116  public bool UseSnappy { get; set; } = false;
117 
121  public int ThreadCount { get; set; } = 1;
122 
123  // private string authorization;
124  private volatile System.Collections.Concurrent.ConcurrentDictionary<string, KineticaType> knownTypes = new();
125 
126  // private type label to type ID lookup table
127  private Dictionary<string, string> typeNameLookup = [];
128 
129  // private object class type to KineticaType lookup table
130  private Dictionary<Type, KineticaType> kineticaTypeLookup = [];
131 
137  public Kinetica( string url_str, Options? options = null )
138  {
139  Url = url_str;
140  URL = new Uri( url_str );
141  if ( null != options ) // If caller specified options
142  {
143  Username = options.Username;
144  Password = options.Password;
145  OauthToken = options.OauthToken;
146 
147  // Handle authorization
148  Authorization = CreateAuthorizationHeader();
149 
150  UseSnappy = options.UseSnappy;
151  ThreadCount = options.ThreadCount;
152  // TODO: executor?
153  }
154  }
155 
156  internal string? CreateAuthorizationHeader() {
157  string? authorization = null;
158  // Handle authorization
159  if( OauthToken != null && OauthToken.Length > 0 ) {
160  authorization = "Bearer " + OauthToken;
161  }
162  else if ( ( Username != null && ( Username.Length > 0 ) ) || ( Password != null && ( Password.Length > 0 ) ) )
163  {
164  authorization = ( "Basic " +
165  Convert.ToBase64String( Encoding.GetEncoding( "ISO-8859-1" ).GetBytes( Username + ":" + Password ) ) );
166  }
167 
168  return authorization;
169  }
170 
177  public void AddTableType( string table_name, Type obj_type )
178  {
179  try
180  {
181  // Get the type from the table
182  KineticaType ktype = KineticaType.fromTable( this, table_name );
183  if ( ktype.getTypeID() == null )
184  throw new KineticaException( $"Could not get type ID for table '{table_name}'" );
185  this.knownTypes.TryAdd( ktype.getTypeID(), ktype );
186 
187  // Save a mapping of the object to the KineticaType
188  if ( obj_type != null )
189  this.SetKineticaSourceClassToTypeMapping( obj_type, ktype );
190 
191  } catch ( KineticaException ex )
192  {
193  throw new KineticaException( "Error creating type from table", ex );
194  }
195  } // end AddTableType
196 
203  public void SetKineticaSourceClassToTypeMapping( Type? objectType, KineticaType kineticaType )
204  {
205  if ( objectType != null )
206  this.kineticaTypeLookup.Add( objectType, kineticaType );
207  return;
208  } // end SetKineticaSourceClassToTypeMapping
209 
210 
211 
221  IList<byte[]> records_binary,
222  IList<T> records ) where T : new()
223  {
224  // Using the KineticaType object, decode all the records from avro binary encoding
225  foreach ( var bin_record in records_binary )
226  {
227  T obj = AvroDecode<T>( bin_record, record_type );
228  records.Add( obj );
229  }
230  } // DecodeRawBinaryDataUsingRecordType
231 
232 
241  public void DecodeRawBinaryDataUsingSchemaString<T>( string schema_string,
242  IList<byte[]> records_binary,
243  IList<T> records ) where T : new()
244  {
245  // Create a KineticaType object based on the schema string
246  KineticaType ktype = new("", schema_string, null);
247 
248  // Using the KineticaType object, decode all the records from avro binary encoding
249  foreach ( var bin_record in records_binary )
250  {
251  T obj = AvroDecode<T>( bin_record, ktype );
252  records.Add( obj );
253  }
254  } // DecodeRawBinaryDataUsingSchemaString
255 
265  public void DecodeRawBinaryDataUsingSchemaString<T>( IList<string> schema_strings,
266  IList<IList<byte[]>> lists_records_binary,
267  IList<IList<T>> record_lists ) where T : new()
268  {
269  // Check that the list of schemas and list of binary encode data match in length
270  if ( schema_strings.Count != lists_records_binary.Count )
271  throw new KineticaException( "List of schemas and list of binary encoded data do not match in count." );
272 
273  // Using the KineticaType object, decode all the records from avro binary encoding
274  for ( int i = 0; i < schema_strings.Count; ++i )
275  {
276  // Create a KineticaType object based on the schema string
277  KineticaType ktype = new( "", schema_strings[ i ], null );
278 
279  // Get the binary encoded data for this list
280  IList<byte[]> records_binary = lists_records_binary[ i ];
281 
282  // Create a container to put the decoded records
283  IList<T> records = [];
284 
285  // The inner list actually contains the binary data
286  foreach ( var bin_record in records_binary )
287  {
288  T obj = AvroDecode<T>( bin_record, ktype );
289  records.Add( obj );
290  }
291  // Add the records into the outgoing list
292  record_lists.Add( records );
293  }
294  } // DecodeRawBinaryDataUsingSchemaString
295 
296 
305  public void DecodeRawBinaryDataUsingTypeIDs<T>( IList<string> type_ids,
306  IList<byte[]> records_binary,
307  IList<T> records ) where T : new()
308  {
309  // Make sure that the length of the type IDs and records are the same
310  if ( type_ids.Count != records_binary.Count )
311  throw new KineticaException( "Unequal numbers of type IDs and binary encoded data objects provided." );
312 
313  // Decode all the records
314  for ( int i = 0; i < records_binary.Count; ++i )
315  {
316  // Per object, use the respective type ID to create the appropriate KineticaType
317  KineticaType ktype = KineticaType.fromTypeID( this, type_ids[ i ] );
318 
319  // Using the KineticaType object, decode the record.
320  T obj = AvroDecode<T>( records_binary[ i ], ktype );
321  records.Add( obj );
322  }
323  } // DecodeRawBinaryDataUsingTypeIDs
324 
325 
334  public void DecodeRawBinaryDataUsingTypeIDs<T>( IList<string> type_ids,
335  IList<IList<byte[]>> lists_records_binary,
336  IList<IList<T>> record_lists ) where T : new()
337  {
338  // Make sure that the length of the type IDs and records are the same
339  if ( type_ids.Count != lists_records_binary.Count )
340  throw new KineticaException( "Unequal numbers of type IDs and binary encoded data objects provided." );
341 
342  // Decode all the records
343  for ( int i = 0; i < lists_records_binary.Count; ++i )
344  {
345  // Per object, use the respective type ID to create the appropriate KineticaType
346  KineticaType ktype = KineticaType.fromTypeID( this, type_ids[ i ] );
347 
348  // Get the binary encoded data for this list
349  IList<byte[]> records_binary = lists_records_binary[ i ];
350 
351  // Create a container to put the decoded records
352  IList<T> records = [];
353 
354  // The inner list actually contains the binary data
355  foreach ( var bin_record in records_binary )
356  {
357  // Using the KineticaType object, decode the record.
358  T obj = AvroDecode<T>( bin_record, ktype );
359  records.Add( obj );
360  }
361  // Add the records into the outgoing list
362  record_lists.Add( records );
363  }
364  } // DecodeRawBinaryDataUsingTypeIDs
365 
366 
376  internal TResponse SubmitRequest<TResponse>( Uri url, object request, bool enableCompression = false, bool avroEncoding = true ) where TResponse : new()
377  {
378  // Get the bytes to send, encoded in the requested way
379  byte[] requestBytes;
380  if ( avroEncoding )
381  {
382  requestBytes = AvroEncode( request );
383  }
384  else // JSON
385  {
386  string str = JsonConvert.SerializeObject(request);
387  requestBytes = Encoding.UTF8.GetBytes( str );
388  }
389 
390  // Send request, and receive response
391  RawKineticaResponse kineticaResponse = SubmitRequestRaw( url.ToString(), requestBytes, enableCompression, avroEncoding, false);
392 
393  // Decode response payload
394  if ( avroEncoding )
395  {
396  return AvroDecode<TResponse>( kineticaResponse.data );
397  }
398  else // JSON
399  {
400  kineticaResponse.data_str = kineticaResponse.data_str.Replace( "\\U", "\\u" );
401  return JsonConvert.DeserializeObject<TResponse>( kineticaResponse.data_str );
402  }
403  } // end SubmitRequest( URL )
404 
405 
415  private TResponse SubmitRequest<TResponse>(string endpoint, object request, bool enableCompression = false, bool avroEncoding = true) where TResponse : new()
416  {
417  // Get the bytes to send, encoded in the requested way
418  byte[] requestBytes;
419  if (avroEncoding)
420  {
421  requestBytes = AvroEncode(request);
422  }
423  else // JSON
424  {
425  string str = JsonConvert.SerializeObject(request);
426  requestBytes = Encoding.UTF8.GetBytes(str);
427  }
428 
429  // Send request, and receive response
430  RawKineticaResponse kineticaResponse = SubmitRequestRaw(endpoint, requestBytes, enableCompression, avroEncoding);
431 
432  // Decode response payload
433  if (avroEncoding)
434  {
435  return AvroDecode<TResponse>(kineticaResponse.data);
436  }
437  else // JSON
438  {
439  kineticaResponse.data_str = kineticaResponse.data_str.Replace("\\U", "\\u");
440  return JsonConvert.DeserializeObject<TResponse>(kineticaResponse.data_str);
441  }
442  } // end SubmitRequest( endpoint )
443 
444 
445 
456  private RawKineticaResponse? SubmitRequestRaw(string url, byte[] requestBytes, bool enableCompression, bool avroEncoding, bool only_endpoint_given = true)
457  {
458  try
459  {
460  if ( only_endpoint_given )
461  url = (Url + url);
462  var request = (HttpWebRequest)WebRequest.Create( url );
463  request.Method = "POST";
464  //request.UseDefaultCredentials = true;
465  request.ContentType = avroEncoding ? "application/octet-stream" : "application/json";
466  request.ContentLength = requestBytes.Length;
467 
468  // Handle the authorization
469  if ( this.Authorization != null )
470  {
471  request.Headers.Add( "Authorization", Authorization );
472  }
473 
474 
475  // Write the binary request data
476  using ( var dataStream = request.GetRequestStream())
477  {
478  dataStream.Write(requestBytes, 0, requestBytes.Length);
479  }
480 
481  // Send to the server and await a response
482  using (var response = (HttpWebResponse)request.GetResponse())
483  {
484  // Parse the response
485  if (response.StatusCode == HttpStatusCode.OK)
486  {
487  using (var responseStream = response.GetResponseStream())
488  {
489  if (avroEncoding)
490  {
491  return AvroDecode<RawKineticaResponse>(responseStream);
492  }
493  else // JSON
494  {
495  using (StreamReader reader = new(responseStream, Encoding.UTF8))
496  {
497  var responseString = reader.ReadToEnd();
498  return JsonConvert.DeserializeObject<RawKineticaResponse>(responseString);
499  }
500  }
501  }
502  }
503  }
504  }
505  catch (System.Net.WebException ex)
506  {
507  // Skip trying parsing the message if not a protocol error
508  if ( ex.Status != WebExceptionStatus.ProtocolError )
509  throw new KineticaException( ex.ToString(), ex );
510 
511  // Get the error message from the server response
512  var response = ex.Response;
513  var responseStream = response.GetResponseStream();
514  string responseString;
515  RawKineticaResponse serverResponse;
516  // Decode the response packet
517  if (avroEncoding)
518  {
519  serverResponse = AvroDecode<RawKineticaResponse>(responseStream);
520  }
521  else // JSON
522  {
523  using (StreamReader reader = new(responseStream, Encoding.UTF8))
524  {
525  responseString = reader.ReadToEnd();
526  serverResponse = JsonConvert.DeserializeObject<RawKineticaResponse>(responseString);
527  }
528  }
529  // Throw the error message found within the response packet
530  throw new KineticaException( serverResponse.message );
531  }
532  catch (Exception ex)
533  {
534  throw new KineticaException(ex.ToString(), ex);
535  }
536 
537  return null;
538  }
539 
540  private void SetDecoderIfMissing(string typeId, string label, string schemaString, IDictionary<string, IList<string>> properties)
541  {
542  // If the table is a collection, it does not have a proper type so ignore it
543 
544  if (typeId == "<collection>")
545  {
546  return;
547  }
548 
549  knownTypes.GetOrAdd(typeId, (s) =>
550  {
551  return new KineticaType(label, schemaString, properties);
552  });
553  typeNameLookup[label] = typeId;
554  }
555 
556 
562  private KineticaType? GetType(string typeName)
563  {
564  KineticaType? type = null;
565  if (typeNameLookup.TryGetValue(typeName, out string? typeId))
566  {
567  knownTypes.TryGetValue(typeId, out type);
568  }
569 
570  return type;
571  }
572 
573 
579  private KineticaType? LookupKineticaType( Type objectType )
580  {
581  if (!kineticaTypeLookup.TryGetValue(objectType, out KineticaType? value))
582  return null; // none found
583 
584  return value;
585  } // LookupKineticaType()
586 
587 
593  internal byte[] AvroEncode(object obj)
594  {
595  // Create a stream that will allow us to view the underlying memory
596  using ( var ms = new MemoryStream())
597  {
598  // Write the object to the memory stream
599  // If obj is an ISpecificRecord, this is more efficient
600  if ( obj is Avro.Specific.ISpecificRecord)
601  {
602  var schema = (obj as Avro.Specific.ISpecificRecord).Schema;
603  Avro.Specific.SpecificDefaultWriter writer = new(schema);
604  writer.Write(schema, obj, new BinaryEncoder(ms));
605  }
606  else // Not an ISpecificRecord - this way is less efficient
607  {
608  // Get the KineticaType associated with the object to be encoded
609  Type obj_type = obj.GetType();
610  KineticaType? ktype = LookupKineticaType( obj_type );
611  if ( ktype == null )
612  {
613  throw new KineticaException( "No known KineticaType associated with the given object. " +
614  "Need a known KineticaType to encode the object." );
615  }
616 
617  // Make a copy of the object to send as a GenericRecord, then write that to the memory stream
618  var schema = KineticaData.SchemaFromType( obj.GetType(), ktype );
619  var recordToSend = MakeGenericRecord( obj, ktype );
620  var writer = new Avro.Generic.DefaultWriter(schema);
621  writer.Write(schema, recordToSend, new BinaryEncoder(ms));
622  }
623 
624  // Get the memory from the stream
625  return ms.ToArray();
626  }
627  } // end AvroEncode
628 
636  private Avro.Generic.GenericRecord MakeGenericRecord( object obj, KineticaType ktype )
637  {
638  // Get the schema
639  var schema = KineticaData.SchemaFromType( obj.GetType(), ktype );
640 
641  // Create a new GenericRecord for this schema
642  var recordToSend = new Avro.Generic.GenericRecord(schema);
643 
644  // Copy each field from obj to recordToSend
645  foreach ( var field in schema.Fields)
646  {
647  var property = obj.GetType()
648  .GetProperties()
649  .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
650 
651  if (property == null) continue;
652 
653  recordToSend.Add(field.Name, property.GetValue(obj, null));
654  }
655 
656  // Return the newly created object
657  return recordToSend;
658  }
659 
667  private T AvroDecode<T>(byte[] bytes, KineticaType? ktype = null) where T : new()
668  {
669  // Get the schema
670  var schema = KineticaData.SchemaFromType( typeof(T), ktype );
671 
672  // Create a stream to read the binary data
673  using (var ms = new MemoryStream(bytes))
674  {
675  // Create a new object to return
676  T obj = new();
677  if (obj is Avro.Specific.ISpecificRecord)
678  {
679  var reader = new Avro.Specific.SpecificDefaultReader(schema, schema);
680  reader.Read(obj, new BinaryDecoder(ms));
681  }
682  else
683  {
684  // Not ISpecificRecord, so first read into a new GenericRecord
685  var reader = new Avro.Generic.DefaultReader(schema, schema);
686  Avro.Generic.GenericRecord recordToReceive = new(schema);
687  reader.Read(recordToReceive, new BinaryDecoder(ms));
688 
689  // Now, copy all the fields from the GenericRecord to obj
690  foreach (var field in schema.Fields)
691  {
692  var property = obj.GetType()
693  .GetProperties()
694  .FirstOrDefault(prop => prop.Name.ToLowerInvariant() == field.Name.ToLowerInvariant());
695 
696  if (property == null) continue;
697 
698  // Try to get the property
699  if (recordToReceive.TryGetValue(field.Name, out object val))
700  {
701  // If successful, write the property to obj
702  property.SetValue(obj, val);
703  }
704  } // end foreach
705  } // end if-else
706 
707  // Return the new object
708  return obj;
709  } // end using
710  } // end AvroDecode<T>
711 
712 
719  private T AvroDecode<T>(Stream stream) where T : Avro.Specific.ISpecificRecord, new()
720  {
721  // T obj = new T(); // Activator.CreateInstance<T>();
722  var schema = KineticaData.SchemaFromType( typeof(T), null );
723  var reader = new Avro.Specific.SpecificReader<T>(schema, schema);
724  return reader.Read(default, new BinaryDecoder(stream));
725  }
726  /*
727  private T AvroDecode<T>(string str) where T : new()
728  {
729  return AvroDecode<T>(Encoding.UTF8.GetBytes(str));
730  }
731  */
732  } // end class Kinetica
733 } // end namespace kinetica
734 
735 
Reader wrapper class for reading data and storing into specific classes
bool UseSnappy
Use Snappy
Definition: Kinetica.cs:116
void Add(string fieldName, object fieldValue)
void SetKineticaSourceClassToTypeMapping(Type? objectType, KineticaType kineticaType)
Saves an object class type to a KineticaType association.
Definition: Kinetica.cs:203
The default implementation for the generic reader.
T Read(T reuse, Decoder dec)
Generic read function
string OauthToken
Optional: OauthToken for user
Definition: Kinetica.cs:65
int ThreadCount
Thread Count
Definition: Kinetica.cs:121
void DecodeRawBinaryDataUsingRecordType< T >(KineticaType record_type, IList< byte[]> records_binary, IList< T > records)
Given a KineticaType object for a certain record type, decode binary data into distinct records (obje...
Definition: Kinetica.cs:220
const string API_VERSION
static string GetApiVersion()
API Version
Definition: Kinetica.cs:81
A General purpose writer for serializing objects into a Stream using Avro.
Interface class for generated classes
Reader class for reading data and storing into specific classes
string Password
Optional: Password for user
Definition: Kinetica.cs:60
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
static KineticaType fromTypeID(Kinetica kinetica, string typeId)
Create a KineticaType object based on an existing type in the database.
void DecodeRawBinaryDataUsingTypeIDs< T >(IList< string > type_ids, IList< byte[]> records_binary, IList< T > records)
Given IDs of records types registered with Kinetica, decode binary data into distinct records (object...
Definition: Kinetica.cs:305
int ThreadCount
Thread Count
Definition: Kinetica.cs:74
string Username
Optional: User Name for Kinetica security
Definition: Kinetica.cs:55
Write leaf values.
The default type used by GenericReader and GenericWriter for RecordSchema.
string Url
URL for Kinetica Server (including "http:" and port) as a string
Definition: Kinetica.cs:86
const int END_OF_SET
No Limit
Definition: Kinetica.cs:45
bool UseSnappy
Use Snappy
Definition: Kinetica.cs:69
virtual void Write(Schema schema, object value, Encoder encoder)
Examines the schema and dispatches the actual work to one of the other methods of this class.
object Read(object reuse, Schema writerSchema, Schema readerSchema, Decoder d)
Uri URL
URL for Kinetica Server (including "http:" and port)
Definition: Kinetica.cs:91
Connection Options
Definition: Kinetica.cs:50
void DecodeRawBinaryDataUsingSchemaString< T >(string schema_string, IList< byte[]> records_binary, IList< T > records)
Given a schema string for a certain record type, decode binary data into distinct records (objects).
Definition: Kinetica.cs:241
string? Username
Optional: User Name for Kinetica security
Definition: Kinetica.cs:96
Kinetica(string url_str, Options? options=null)
API Constructor
Definition: Kinetica.cs:137
Class for writing data from any specific objects
void AddTableType(string table_name, Type obj_type)
Given a table name, add its record type to enable proper encoding of records for insertion or updates...
Definition: Kinetica.cs:177
Decoder for Avro binary format