Kinetica   C#   API  Version 7.2.3.0
KineticaRecord.cs
Go to the documentation of this file.
1 using Avro;
2 using Avro.IO;
3 using System;
4 using System.Collections.Generic;
5 using System.IO;
6 
7 
8 
9 namespace kinetica
10 {
14  public class KineticaRecord : Avro.Generic.GenericRecord
15  {
16  public KineticaRecord( RecordSchema schema ) : base( schema ) {}
17 
22  public string ContentsToString()
23  {
24  System.Text.StringBuilder sb = new System.Text.StringBuilder();
25  sb.Append( "Contents: " );
26  sb.Append( "{ " );
27  object value;
28  string value_string;
29  foreach ( Avro.Field field in this.Schema.Fields )
30  //foreach ( KeyValuePair<string, object> kv in this.contents )
31  {
32  // Append the field name
33  sb.Append( field.Name );
34  sb.Append( ": " );
35 
36  // Get the field value (handle nulls)
37  this.TryGetValue( field.Name, out value );
38  if ( value != null )
39  value_string = $"{value}";
40  else
41  value_string = "<null>";
42 
43  // Append the field value
44  sb.Append( value_string );
45  sb.Append( ", " );
46  }
47  sb.Remove( (sb.Length - 2), 2 ); // remove the trailing comma and space
48  sb.Append( " }" );
49  return sb.ToString();
50  } // end ContentsToString
51 
52 
59  public static IList<KineticaRecord> DecodeDynamicTableRecords( string dynamic_table_schema_string, byte[] encoded_data )
60  {
61  // Get a record schema from the schema string
62  Schema dynamic_table_schema;
63  try
64  {
65  dynamic_table_schema = Avro.Schema.Parse( dynamic_table_schema_string );
66  }
67  catch ( Exception ex )
68  {
69  throw new KineticaException( ex.ToString() );
70  }
71 
72  // The container for the decoded data (put into distinct records)
73  IList<KineticaRecord> records = new List<KineticaRecord>();
74 
75  // Decode the dynamic table schema to extract the column names and types.
76  // Then, decode each individual record.
77  using ( var ms = new MemoryStream( encoded_data ) )
78  {
79  // Read the table schema into a new GenericRecord
80  // ----------------------------------------------
81  var reader = new Avro.Generic.DefaultReader(dynamic_table_schema, dynamic_table_schema);
82  BinaryDecoder decoder = new BinaryDecoder( ms );
83  Avro.Generic.GenericRecord obj = (Avro.Generic.GenericRecord) reader.Read( null, dynamic_table_schema, dynamic_table_schema, decoder );
84 
85  // Extract the column names from the encoded data
86  object column_headers_0 = new object();
87  Object[] column_headers = null;
88  if ( obj.TryGetValue( "column_headers", out column_headers_0 ) ) // try to get the data out
89  {
90  column_headers = ( Object[] ) column_headers_0;
91  }
92 
93  // Extract the column types from the encoded data
94  object column_types_0 = new object();
95  Object[] column_types = null;
96  if ( obj.TryGetValue( "column_datatypes", out column_types_0 ) ) // try to get the data out
97  {
98  column_types = ( Object[] ) column_types_0;
99  }
100 
101  // Find out how many columns are returned
102  int num_columns = column_headers.Length;
103 
104  // Extract the column data from the encoded data (ignore the headers and types)
105  // and create a list with only the record data
106  Object[][] encoded_column_data = new Object[ num_columns ][];
107 
108  for ( int i = 0; i < num_columns; ++i )
109  {
110  // Get the column name (e.g. the first column is titled "column_1")
111  string column_name = $"column_{i+1}";
112 
113  // Get the column data out
114  object column_data_0 = new object();
115  Object[] column_data = null;
116  if ( obj.TryGetValue( column_name, out column_data_0 ) ) // try to get the data out
117  {
118  column_data = ( Object[] ) column_data_0;
119  }
120 
121  // Save this column's data in the 2D array declared above
122  encoded_column_data[i] = column_data;
123  } // done separating the column data from the headers and type
124 
125 
126  // Find out how many values per column are returned
127  int num_records = encoded_column_data[0].Length;
128 
129  // Make sure that all the column data are of the same length
130  foreach ( Object[] l in encoded_column_data )
131  {
132  if ( l.Length != num_records )
133  throw new KineticaException( "Dynamic table has uneven column data lengths" );
134  }
135 
136  // Based on the column headers and types, create a KineticaType
137  KineticaType dynamic_record_type = KineticaType.fromDynamicSchema( dynamic_table_schema_string, column_headers, column_types );
138 
139  // Using the dynamic record type, create a RecordSchema object
140  // ( off which we'll create the records)
141  Avro.RecordSchema record_schema = (Avro.RecordSchema) dynamic_record_type.getSchema();
142 
143  // Create the records by decoding the binary encoded data
144  // (column-major data into row-major data)
145  for ( int record_idx = 0; record_idx < num_records; ++record_idx )
146  {
147  // Create a GenericRecord object based on the KineticaType
148  KineticaRecord record = new KineticaRecord( record_schema );
149 
150  // Go through each column, decode the next value and put it into the record
151  for ( int column_idx = 0; column_idx < num_columns; ++column_idx )
152  {
153  // Get the value to be put
154  var val = encoded_column_data[ column_idx ][ record_idx ];
155 
156  // Get the property of the record into which the value need to saved
157  var field = record_schema.Fields[ column_idx ];
158 
159  // Set the value
160  record.Add( field.Name, val );
161  } // end inner for loop
162 
163  // Save the record
164  records.Add( record );
165  } // end outer for loop
166  } // end decoding block
167 
168  return records;
169  } // end DecodeDynamicTableRecords
170 
171 
172 
173  } // end class KineticaRecord
174 } // end namespace kinetica
Class for record schemas
Definition: RecordSchema.cs:31
Schema Schema
Field type's schema
Definition: Field.cs:75
Class for fields defined in a record
Definition: Field.cs:30
The default implementation for the generic reader.
Base class for all schema types
Definition: Schema.cs:29
Convenience class for using Avro.Generic.GenericRecord objects.
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
KineticaRecord(RecordSchema schema)
The default type used by GenericReader and GenericWriter for RecordSchema.
static KineticaType fromDynamicSchema(string dynamicTableSchemaString, Object[] columnHeaders, Object[] columnTypes)
Create a KineticaType object based on information provided in a dynamic schema.
string ContentsToString()
Convert the contents of the record to a string.
static IList< KineticaRecord > DecodeDynamicTableRecords(string dynamic_table_schema_string, byte[] encoded_data)
Decodes binary encoded data of a dynamically created table returned by the server.
static Schema Parse(string json)
Parses a given JSON string to create a new schema object
Definition: Schema.cs:141
bool TryGetValue(string fieldName, out object result)
Decoder for Avro binary format