Kinetica C# API  Version 6.2.0.1
KineticaRecord.cs
Go to the documentation of this file.
1 using Avro;
2 using Avro.IO;
3 using System;
4 using System.Collections.Generic;
5 using System.IO;
6 
7 
8 
9 namespace kinetica
10 {
14  public class KineticaRecord : Avro.Generic.GenericRecord
15  {
16  public KineticaRecord( RecordSchema schema ) : base( schema ) {}
17 
22  public string ContentsToString()
23  {
24  System.Text.StringBuilder sb = new System.Text.StringBuilder();
25  sb.Append( "Contents: " );
26  sb.Append( "{ " );
27  object value;
28  string value_string;
29  foreach ( Avro.Field field in this.Schema.Fields )
30  //foreach ( KeyValuePair<string, object> kv in this.contents )
31  {
32  // Append the field name
33  sb.Append( field.Name );
34  sb.Append( ": " );
35 
36  // Get the field value (handle nulls)
37  this.TryGetValue( field.Name, out value );
38  if ( value != null )
39  value_string = $"{value}";
40  else
41  value_string = "<null>";
42 
43  // Append the field value
44  sb.Append( value_string );
45  sb.Append( ", " );
46  }
47  sb.Remove( (sb.Length - 2), 2 ); // remove the trailing comma and space
48  sb.Append( " }" );
49  return sb.ToString();
50  } // end ContentsToString
51 
52 
59  public static IList<KineticaRecord> DecodeDynamicTableRecords( string dynamic_table_schema_string, byte[] encoded_data )
60  {
61  // Get a record schema from the schema string
62  Schema dynamic_table_schema;
63  try
64  {
65  dynamic_table_schema = Avro.Schema.Parse( dynamic_table_schema_string );
66  }
67  catch ( Exception ex )
68  {
69  throw new KineticaException( ex.ToString() );
70  }
71 
72  // The container for the decoded data (put into distinct records)
73  IList<KineticaRecord> records = new List<KineticaRecord>();
74 
75  // Decode the dynamic table schema to extract the column names and types.
76  // Then, decode each individual record.
77  using ( var ms = new MemoryStream( encoded_data ) )
78  {
79  // Read the table schema into a new GenericRecord
80  // ----------------------------------------------
81  var reader = new Avro.Generic.DefaultReader(dynamic_table_schema, dynamic_table_schema);
82  BinaryDecoder decoder = new BinaryDecoder( ms );
83  Avro.Generic.GenericRecord obj = (Avro.Generic.GenericRecord) reader.Read( null, dynamic_table_schema, dynamic_table_schema, decoder );
84 
85  // Extract the column names from the encoded data
86  object column_headers_0 = new object();
87  Object[] column_headers = null;
88  if ( obj.TryGetValue( "column_headers", out column_headers_0 ) ) // try to get the data out
89  {
90  column_headers = ( Object[] ) column_headers_0;
91  }
92 
93  // Extract the column types from the encoded data
94  object column_types_0 = new object();
95  Object[] column_types = null;
96  if ( obj.TryGetValue( "column_datatypes", out column_types_0 ) ) // try to get the data out
97  {
98  column_types = ( Object[] ) column_types_0;
99  }
100 
101  // Find out how many columns are returned
102  int num_columns = column_headers.Length;
103 
104  // Extract the column data from the encoded data (ignore the headers and types)
105  // and create a list with only the record data
106  Object[][] encoded_column_data = new Object[ num_columns ][];
107 
108  for ( int i = 0; i < num_columns; ++i )
109  {
110  // Get the column name (e.g. the first column is titled "column_1")
111  string column_name = $"column_{i+1}";
112 
113  // Get the column data out
114  object column_data_0 = new object();
115  Object[] column_data = null;
116  if ( obj.TryGetValue( column_name, out column_data_0 ) ) // try to get the data out
117  {
118  column_data = ( Object[] ) column_data_0;
119  }
120 
121  // Save this column's data in the 2D array declared above
122  encoded_column_data[i] = column_data;
123  } // done separating the column data from the headers and type
124 
125 
126  // Find out how many values per column are returned
127  int num_records = encoded_column_data[0].Length;
128 
129  // Make sure that all the column data are of the same length
130  foreach ( Object[] l in encoded_column_data )
131  {
132  if ( l.Length != num_records )
133  throw new KineticaException( "Dynamic table has uneven column data lengths" );
134  }
135 
136  // Based on the column headers and types, create a KineticaType
137  KineticaType dynamic_record_type = KineticaType.fromDynamicSchema( dynamic_table_schema_string, column_headers, column_types );
138 
139  // Using the dynamic record type, create a RecordSchema object
140  // ( off which we'll create the records)
141  Avro.RecordSchema record_schema = (Avro.RecordSchema) dynamic_record_type.getSchema();
142 
143  // Create the records by decoding the binary encoded data
144  // (column-major data into row-major data)
145  for ( int record_idx = 0; record_idx < num_records; ++record_idx )
146  {
147  // Create a GenericRecord object based on the KineticaType
148  KineticaRecord record = new KineticaRecord( record_schema );
149 
150  // Go through each column, decode the next value and put it into the record
151  for ( int column_idx = 0; column_idx < num_columns; ++column_idx )
152  {
153  // Get the value to be put
154  var val = encoded_column_data[ column_idx ][ record_idx ];
155 
156  // Get the property of the record into which the value need to saved
157  var field = record_schema.Fields[ column_idx ];
158 
159  // Set the value
160  record.Add( field.Name, val );
161  } // end inner for loop
162 
163  // Save the record
164  records.Add( record );
165  } // end outer for loop
166  } // end decoding block
167 
168  return records;
169  } // end DecodeDynamicTableRecords
170 
171 
172 
173  } // end class KineticaRecord
174 } // end namespace kinetica
Convenience class for using Avro.Generic.GenericRecord objects.
KineticaRecord(RecordSchema schema)
string ContentsToString()
Convert the contents of the record to a string.
static KineticaType fromDynamicSchema(string dynamic_table_schema_string, Object[] column_headers, Object[] column_types)
Create a KineticaType object based on information provided in a dynamic schema.
static IList< KineticaRecord > DecodeDynamicTableRecords(string dynamic_table_schema_string, byte[] encoded_data)
Decodes binary encoded data of a dynamically created table returned by the server.