Kinetica   C#   API  Version 7.2.3.1
GenericRecordEncoder.cs
Go to the documentation of this file.
1 using System;
2 using System.Buffers;
3 using System.Collections.Concurrent;
4 using System.Collections.Generic;
5 using System.Runtime.CompilerServices;
6 using System.Text;
7 using Avro;
8 using kinetica.Records;
9 
10 namespace kinetica;
11 
18  internal sealed class GenericRecordEncoder
19  {
20  private readonly Schema.Type[] _fieldTypes;
21  private readonly bool[] _fieldNullable;
22  private readonly int[] _nullIndex; // Index of null in union (0 or 1)
23  private readonly int _fieldCount;
24 
25  // Thread-local buffer for encoding
26  [ThreadStatic]
27  private static byte[]? _threadLocalBuffer;
28 
29  // Cache of encoders by schema
30  private static readonly ConcurrentDictionary<string, GenericRecordEncoder> _encoderCache = new();
31 
36  {
37  var schemaString = ktype.getSchemaString();
38  return _encoderCache.GetOrAdd(schemaString, _ => new GenericRecordEncoder(ktype));
39  }
40 
41  private GenericRecordEncoder(KineticaType ktype)
42  {
43  var schema = ktype.getSchema() as RecordSchema
44  ?? throw new ArgumentException("Invalid schema: not a record schema");
45 
46  _fieldCount = schema.Fields.Count;
47  _fieldTypes = new Schema.Type[_fieldCount];
48  _fieldNullable = new bool[_fieldCount];
49  _nullIndex = new int[_fieldCount];
50 
51  for (int i = 0; i < _fieldCount; i++)
52  {
53  var field = schema.Fields[i];
54  var fieldSchema = field.Schema;
55 
56  // Handle union types (nullable)
57  if (fieldSchema is UnionSchema unionSchema)
58  {
59  _fieldNullable[i] = true;
60 
61  // Find the positions of null and non-null types in the union
62  // Kinetica typically uses ["type", "null"] (non-null first, null second)
63  // But we need to handle both orderings
64  int nullIdx = -1;
65  int nonNullIdx = -1;
66 
67  for (int j = 0; j < unionSchema.Schemas.Count; j++)
68  {
69  var s = unionSchema.Schemas[j];
70  if (s.Tag == Schema.Type.Null)
71  {
72  nullIdx = j;
73  }
74  else
75  {
76  nonNullIdx = j;
77  _fieldTypes[i] = s.Tag;
78  }
79  }
80 
81  _nullIndex[i] = nullIdx;
82  }
83  else
84  {
85  _fieldNullable[i] = false;
86  _fieldTypes[i] = fieldSchema.Tag;
87  _nullIndex[i] = -1;
88  }
89  }
90  }
91 
95  public byte[] Encode(GenericRecord record)
96  {
97  var values = record.GetValues();
98  var buffer = GetBuffer();
99  int position = 0;
100 
101  for (int i = 0; i < _fieldCount; i++)
102  {
103  var value = values[i];
104  var fieldType = _fieldTypes[i];
105  var nullable = _fieldNullable[i];
106 
107  AvroEncoding.EnsureCapacity(ref buffer, position, 32);
108 
109  if (nullable)
110  {
111  var nullIdx = _nullIndex[i];
112  var nonNullIdx = nullIdx == 0 ? 1 : 0;
113 
114  if (value.IsNull)
115  {
116  // Write null union index (zig-zag encoded)
117  position = AvroEncoding.WriteVarInt(buffer, position, nullIdx);
118  continue;
119  }
120  else
121  {
122  // Write non-null union index (zig-zag encoded)
123  position = AvroEncoding.WriteVarInt(buffer, position, nonNullIdx);
124  }
125  }
126 
127  position = EncodeValue(buffer, position, value, fieldType);
128  }
129 
130  // Copy to exact-size result
131  var result = new byte[position];
132  Buffer.BlockCopy(buffer, 0, result, 0, position);
133  return result;
134  }
135 
139  public byte[][] EncodeMany(IReadOnlyList<GenericRecord> records)
140  {
141  var count = records.Count;
142  var results = new byte[count][];
143 
144  if (count < 100)
145  {
146  // Sequential for small batches (< 100 records)
147  for (int i = 0; i < count; i++)
148  {
149  results[i] = Encode(records[i]);
150  }
151  }
152  else
153  {
154  // Parallel for large batches (>= 100 records)
155  // Each thread uses its own thread-local buffer for efficiency
156  System.Threading.Tasks.Parallel.For(0, count, i =>
157  {
158  results[i] = Encode(records[i]);
159  });
160  }
161 
162  return results;
163  }
164 
169  public List<byte[]> EncodeManyAsList(IReadOnlyList<GenericRecord> records)
170  {
171  return new List<byte[]>(EncodeMany(records));
172  }
173 
174  [MethodImpl(MethodImplOptions.AggressiveInlining)]
175  private int EncodeValue(byte[] buffer, int position, RecordValue value, Schema.Type fieldType)
176  {
177  switch (fieldType)
178  {
179  case Schema.Type.Int:
180  return AvroEncoding.WriteVarInt(buffer, position, value.AsInt() ?? 0);
181 
182  case Schema.Type.Long:
183  return AvroEncoding.WriteVarLong(buffer, position, value.AsLong() ?? 0);
184 
185  case Schema.Type.Float:
186  var floatVal = value.AsFloat() ?? 0f;
187  AvroEncoding.EnsureCapacity(ref buffer, position, 4);
188  var floatBytes = BitConverter.GetBytes(floatVal);
189  if (!BitConverter.IsLittleEndian)
190  Array.Reverse(floatBytes);
191  Buffer.BlockCopy(floatBytes, 0, buffer, position, 4);
192  return position + 4;
193 
194  case Schema.Type.Double:
195  var doubleVal = value.AsDouble() ?? 0.0;
196  AvroEncoding.EnsureCapacity(ref buffer, position, 8);
197  var doubleBytes = BitConverter.GetBytes(doubleVal);
198  if (!BitConverter.IsLittleEndian)
199  Array.Reverse(doubleBytes);
200  Buffer.BlockCopy(doubleBytes, 0, buffer, position, 8);
201  return position + 8;
202 
203  case Schema.Type.Boolean:
204  buffer[position++] = (byte)(value.AsBool() == true ? 1 : 0);
205  return position;
206 
207  case Schema.Type.String:
208  var strVal = value.AsString() ?? string.Empty;
209  var strBytes = Encoding.UTF8.GetBytes(strVal);
210  AvroEncoding.EnsureCapacity(ref buffer, position, strBytes.Length + 10);
211  position = AvroEncoding.WriteVarLong(buffer, position, strBytes.Length);
212  Buffer.BlockCopy(strBytes, 0, buffer, position, strBytes.Length);
213  return position + strBytes.Length;
214 
215  case Schema.Type.Bytes:
216  var bytesVal = value.AsBytes() ?? Array.Empty<byte>();
217  AvroEncoding.EnsureCapacity(ref buffer, position, bytesVal.Length + 10);
218  position = AvroEncoding.WriteVarLong(buffer, position, bytesVal.Length);
219  Buffer.BlockCopy(bytesVal, 0, buffer, position, bytesVal.Length);
220  return position + bytesVal.Length;
221 
222  default:
223  // For unsupported types, write empty string
224  buffer[position++] = 0;
225  return position;
226  }
227  }
228 
229  private static byte[] GetBuffer()
230  {
231  var buffer = _threadLocalBuffer;
232  if (buffer == null)
233  {
234  buffer = new byte[4096];
235  _threadLocalBuffer = buffer;
236  }
237  return buffer;
238  }
239  }
string? AsString()
Gets the value as a string.
Definition: RecordValue.cs:156
string getSchemaString()
Class for record schemas
Definition: RecordSchema.cs:31
Base class for all schema types
Definition: Schema.cs:29
Class for union schemas
Definition: UnionSchema.cs:29
High-performance Avro encoder for GenericRecord instances.
long? AsLong()
Gets the value as a 64-bit integer.
Definition: RecordValue.cs:118
Type
Enum for schema types
Definition: Schema.cs:34
static int WriteVarInt(byte[] buffer, int position, int value)
Definition: AvroEncoders.cs:23
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
byte [] Encode(GenericRecord record)
Encodes a single GenericRecord to Avro binary format.
static int WriteVarLong(byte[] buffer, int position, long value)
Definition: AvroEncoders.cs:29
byte? [] AsBytes()
Gets the value as a byte array.
Definition: RecordValue.cs:168
bool? AsBool()
Gets the value as a boolean (from int).
Definition: RecordValue.cs:180
int? AsInt()
Gets the value as a 32-bit integer.
Definition: RecordValue.cs:106
Shared utilities for Avro varint (zig-zag) encoding.
Definition: AvroEncoders.cs:20
Schema getSchema()
static void EnsureCapacity(ref byte[] buffer, int position, int required)
Definition: AvroEncoders.cs:58
double? AsDouble()
Gets the value as a 64-bit double.
Definition: RecordValue.cs:143
List< byte[]> EncodeManyAsList(IReadOnlyList< GenericRecord > records)
Encodes multiple GenericRecords to a list of byte arrays.
A generic record that can hold values for any Kinetica type.
Type Tag
Schema type property
Definition: Schema.cs:56
float? AsFloat()
Gets the value as a 32-bit float.
Definition: RecordValue.cs:131
static GenericRecordEncoder GetOrCreate(KineticaType ktype)
Gets or creates a GenericRecordEncoder for the specified KineticaType.
A typed value that can be stored in a GenericRecord.
Definition: RecordValue.cs:34
byte [][] EncodeMany(IReadOnlyList< GenericRecord > records)
Encodes multiple GenericRecords in parallel using thread-local buffers.