3 using System.Collections.Concurrent;
4 using System.Collections.Generic;
5 using System.Runtime.CompilerServices;
21 private readonly
bool[] _fieldNullable;
22 private readonly
int[] _nullIndex;
23 private readonly
int _fieldCount;
27 private static byte[]? _threadLocalBuffer;
30 private static readonly ConcurrentDictionary<string, GenericRecordEncoder> _encoderCache =
new();
44 ??
throw new ArgumentException(
"Invalid schema: not a record schema");
46 _fieldCount = schema.Fields.Count;
48 _fieldNullable =
new bool[_fieldCount];
49 _nullIndex =
new int[_fieldCount];
51 for (
int i = 0; i < _fieldCount; i++)
53 var field = schema.Fields[i];
54 var fieldSchema = field.Schema;
59 _fieldNullable[i] =
true;
67 for (
int j = 0; j < unionSchema.Schemas.Count; j++)
69 var s = unionSchema.Schemas[j];
77 _fieldTypes[i] = s.
Tag;
81 _nullIndex[i] = nullIdx;
85 _fieldNullable[i] =
false;
86 _fieldTypes[i] = fieldSchema.
Tag;
97 var values = record.GetValues();
98 var buffer = GetBuffer();
101 for (
int i = 0; i < _fieldCount; i++)
103 var value = values[i];
104 var fieldType = _fieldTypes[i];
105 var nullable = _fieldNullable[i];
111 var nullIdx = _nullIndex[i];
112 var nonNullIdx = nullIdx == 0 ? 1 : 0;
127 position = EncodeValue(buffer, position, value, fieldType);
131 var result =
new byte[position];
132 Buffer.BlockCopy(buffer, 0, result, 0, position);
139 public byte[][]
EncodeMany(IReadOnlyList<GenericRecord> records)
141 var count = records.Count;
142 var results =
new byte[count][];
147 for (
int i = 0; i < count; i++)
149 results[i] =
Encode(records[i]);
156 System.Threading.Tasks.Parallel.For(0, count, i =>
158 results[i] =
Encode(records[i]);
174 [MethodImpl(MethodImplOptions.AggressiveInlining)]
186 var floatVal = value.
AsFloat() ?? 0f;
188 var floatBytes = BitConverter.GetBytes(floatVal);
189 if (!BitConverter.IsLittleEndian)
190 Array.Reverse(floatBytes);
191 Buffer.BlockCopy(floatBytes, 0, buffer, position, 4);
195 var doubleVal = value.
AsDouble() ?? 0.0;
197 var doubleBytes = BitConverter.GetBytes(doubleVal);
198 if (!BitConverter.IsLittleEndian)
199 Array.Reverse(doubleBytes);
200 Buffer.BlockCopy(doubleBytes, 0, buffer, position, 8);
204 buffer[position++] = (byte)(value.
AsBool() ==
true ? 1 : 0);
208 var strVal = value.
AsString() ??
string.Empty;
209 var strBytes = Encoding.UTF8.GetBytes(strVal);
212 Buffer.BlockCopy(strBytes, 0, buffer, position, strBytes.Length);
213 return position + strBytes.Length;
219 Buffer.BlockCopy(bytesVal, 0, buffer, position, bytesVal.Length);
220 return position + bytesVal.Length;
224 buffer[position++] = 0;
229 private static byte[] GetBuffer()
231 var buffer = _threadLocalBuffer;
234 buffer =
new byte[4096];
235 _threadLocalBuffer = buffer;
string? AsString()
Gets the value as a string.
Base class for all schema types
High-performance Avro encoder for GenericRecord instances.
long? AsLong()
Gets the value as a 64-bit integer.
Type
Enum for schema types
static int WriteVarInt(byte[] buffer, int position, int value)
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
byte [] Encode(GenericRecord record)
Encodes a single GenericRecord to Avro binary format.
static int WriteVarLong(byte[] buffer, int position, long value)
byte? [] AsBytes()
Gets the value as a byte array.
bool? AsBool()
Gets the value as a boolean (from int).
int? AsInt()
Gets the value as a 32-bit integer.
Shared utilities for Avro varint (zig-zag) encoding.
static void EnsureCapacity(ref byte[] buffer, int position, int required)
double? AsDouble()
Gets the value as a 64-bit double.
List< byte[]> EncodeManyAsList(IReadOnlyList< GenericRecord > records)
Encodes multiple GenericRecords to a list of byte arrays.
A generic record that can hold values for any Kinetica type.
Type Tag
Schema type property
float? AsFloat()
Gets the value as a 32-bit float.
static GenericRecordEncoder GetOrCreate(KineticaType ktype)
Gets or creates a GenericRecordEncoder for the specified KineticaType.
A typed value that can be stored in a GenericRecord.
byte [][] EncodeMany(IReadOnlyList< GenericRecord > records)
Encodes multiple GenericRecords in parallel using thread-local buffers.