3 using System.Buffers.Binary;
4 using System.Collections.Concurrent;
5 using System.Collections.Generic;
7 using System.Linq.Expressions;
8 using System.Reflection;
9 using System.Runtime.CompilerServices;
15 #region Avro Varint Encoding Utilities 22 [MethodImpl(MethodImplOptions.AggressiveInlining)]
23 public static int WriteVarInt(
byte[] buffer,
int position,
int value)
28 [MethodImpl(MethodImplOptions.AggressiveInlining)]
29 public static int WriteVarLong(
byte[] buffer,
int position,
long value)
32 ulong n = (ulong)((value << 1) ^ (value >> 63));
34 while ((n & ~0x7FUL) != 0)
36 buffer[position++] = (byte)((n & 0x7F) | 0x80);
39 buffer[position++] = (byte)n;
44 [MethodImpl(MethodImplOptions.AggressiveInlining)]
47 ulong n = (ulong)((value << 1) ^ (value >> 63));
49 while ((n & ~0x7FUL) != 0)
57 [MethodImpl(MethodImplOptions.AggressiveInlining)]
58 public static void EnsureCapacity(ref
byte[] buffer,
int position,
int required)
60 if (position + required > buffer.Length)
62 var newBuffer =
new byte[Math.Max(buffer.Length * 2, position + required + 1024)];
63 Buffer.BlockCopy(buffer, 0, newBuffer, 0, position);
71 #region Direct Avro Record Encoder 85 private readonly FieldEncoder[] _fieldEncoders;
86 private readonly
int _fieldCount;
87 private readonly
int _estimatedRecordSize;
98 var key = (typeof(T), schemaString);
106 ??
throw new ArgumentException(
"Invalid schema: not a record schema");
108 _fieldCount = schema.Fields.Count;
109 _fieldEncoders =
new FieldEncoder[_fieldCount];
110 _estimatedRecordSize = 64;
112 var type = typeof(T);
113 var properties = type.GetProperties(BindingFlags.Public | BindingFlags.Instance);
114 var propertyMap = properties.ToDictionary(
115 p => p.Name.ToLowerInvariant(),
117 StringComparer.OrdinalIgnoreCase);
119 for (
int i = 0; i < _fieldCount; i++)
121 var field = schema.Fields[i];
122 var fieldNameLower = field.Name.ToLowerInvariant();
124 if (propertyMap.TryGetValue(fieldNameLower, out var property))
126 _fieldEncoders[i] = CreateFieldEncoder(field.Schema, property);
131 _fieldEncoders[i] =
new NullFieldEncoder();
136 private FieldEncoder CreateFieldEncoder(
Schema fieldSchema, PropertyInfo property)
142 Schema? nonNullSchema =
null;
143 int nonNullIndex = -1;
144 for (
int i = 0; i < unionSchema.Schemas.Count; i++)
146 if (unionSchema.Schemas[i].Tag !=
Schema.
Type.Null)
148 nonNullSchema = unionSchema.Schemas[i];
154 if (nonNullSchema !=
null)
156 return CreateNullableFieldEncoder(nonNullSchema, property, nonNullIndex);
160 return CreateNonNullableFieldEncoder(fieldSchema, property);
163 private FieldEncoder CreateNullableFieldEncoder(
Schema schema, PropertyInfo property,
int nonNullIndex)
165 var propertyType =
property.PropertyType;
166 var underlyingType = Nullable.GetUnderlyingType(propertyType) ?? propertyType;
168 return schema.
Tag switch 170 Schema.
Type.Int =>
new NullableIntEncoder(CreateGetter<int?>(property), nonNullIndex),
171 Schema.
Type.Long =>
new NullableLongEncoder(CreateGetter<long?>(property), nonNullIndex),
172 Schema.
Type.Float =>
new NullableFloatEncoder(CreateGetter<float?>(property), nonNullIndex),
173 Schema.
Type.Double =>
new NullableDoubleEncoder(CreateGetter<double?>(property), nonNullIndex),
174 Schema.
Type.Boolean =>
new NullableBoolEncoder(CreateGetter<bool?>(property), nonNullIndex),
175 Schema.
Type.String =>
new NullableStringEncoder(CreateGetter<string?>(property), nonNullIndex),
176 Schema.
Type.Bytes =>
new NullableBytesEncoder(CreateGetter<
byte[]?>(property), nonNullIndex),
177 _ =>
new NullFieldEncoder()
181 private FieldEncoder CreateNonNullableFieldEncoder(
Schema schema, PropertyInfo property)
183 return schema.
Tag switch 185 Schema.
Type.Int =>
new IntEncoder(CreateGetter<int>(property)),
186 Schema.
Type.Long =>
new LongEncoder(CreateGetter<long>(property)),
187 Schema.
Type.Float =>
new FloatEncoder(CreateGetter<float>(property)),
188 Schema.
Type.Double =>
new DoubleEncoder(CreateGetter<double>(property)),
189 Schema.
Type.Boolean =>
new BoolEncoder(CreateGetter<bool>(property)),
190 Schema.
Type.String =>
new StringEncoder(CreateGetter<string>(property)),
191 Schema.
Type.Bytes =>
new BytesEncoder(CreateGetter<
byte[]>(property)),
192 _ =>
new NullFieldEncoder()
196 private static Func<T, TValue> CreateGetter<TValue>(PropertyInfo property)
198 var parameter = Expression.Parameter(typeof(T),
"obj");
199 Expression propertyAccess = Expression.Property(parameter, property);
202 if (propertyAccess.Type != typeof(TValue))
204 propertyAccess = Expression.Convert(propertyAccess, typeof(TValue));
207 return Expression.Lambda<Func<T, TValue>>(propertyAccess, parameter).Compile();
212 private static byte[]? t_buffer;
221 var buffer = t_buffer ?? (t_buffer =
new byte[4096]);
224 for (
int i = 0; i < _fieldCount; i++)
227 position = _fieldEncoders[i].Encode(record, ref buffer, position);
234 var result =
new byte[position];
235 Buffer.BlockCopy(buffer, 0, result, 0, position);
244 public int EncodeTo(T record, ref
byte[] buffer,
int offset)
246 int position = offset;
248 for (
int i = 0; i < _fieldCount; i++)
250 position = _fieldEncoders[i].Encode(record, ref buffer, position);
253 return position - offset;
263 var buffer = t_buffer ?? (t_buffer =
new byte[4096]);
264 var size =
EncodeTo(record, ref buffer, 0);
274 var count = records.Count;
275 var results =
new byte[count][];
279 for (
int i = 0; i < count; i++)
281 results[i] =
Encode(records[i]);
286 System.Threading.Tasks.Parallel.For(0, count, i =>
288 results[i] =
Encode(records[i]);
307 public (
byte[]
Buffer, (
int Offset,
int Length)[] Segments) EncodeManyZeroCopy(IReadOnlyList<T> records)
309 var count = records.Count;
312 return (
Array.Empty<
byte>(),
Array.Empty<(
int,
int)>());
316 int estimatedTotalSize = count * _estimatedRecordSize;
317 var buffer =
new byte[Math.Max(estimatedTotalSize, 4096)];
318 var segments =
new (
int Offset,
int Length)[count];
321 for (
int i = 0; i < count; i++)
323 int startPos = position;
326 for (
int f = 0; f < _fieldCount; f++)
328 position = _fieldEncoders[f].Encode(records[i], ref buffer, position);
331 segments[i] = (startPos, position - startPos);
335 if (buffer.Length > position * 2)
337 var trimmed =
new byte[position];
338 Buffer.BlockCopy(buffer, 0, trimmed, 0, position);
342 return (buffer, segments);
349 public (
byte[]
Buffer, (
int Offset,
int Length)[] Segments) EncodeManyZeroCopyParallel(IReadOnlyList<T> records)
351 var count = records.Count;
354 return (
Array.Empty<
byte>(),
Array.Empty<(
int,
int)>());
360 return EncodeManyZeroCopy(records);
364 var encodedRecords =
new byte[count][];
365 System.Threading.Tasks.Parallel.For(0, count, i =>
367 encodedRecords[i] =
Encode(records[i]);
372 for (
int i = 0; i < count; i++)
374 totalSize += encodedRecords[i].Length;
377 var buffer =
new byte[totalSize];
378 var segments =
new (
int Offset,
int Length)[count];
382 for (
int i = 0; i < count; i++)
384 var encoded = encodedRecords[i];
385 segments[i] = (position, encoded.Length);
386 Buffer.BlockCopy(encoded, 0, buffer, position, encoded.Length);
387 position += encoded.Length;
390 return (buffer, segments);
393 #region Field Encoders 395 private abstract class FieldEncoder
397 public abstract int Encode(T record, ref
byte[] buffer,
int position);
400 private sealed
class NullFieldEncoder : FieldEncoder
402 public override int Encode(T record, ref
byte[] buffer,
int position) => position;
406 private sealed
class IntEncoder : FieldEncoder
408 private readonly Func<T, int> _getter;
409 public IntEncoder(Func<T, int> getter) => _getter = getter;
411 public override int Encode(T record, ref
byte[] buffer,
int position)
418 private sealed
class LongEncoder : FieldEncoder
420 private readonly Func<T, long> _getter;
421 public LongEncoder(Func<T, long> getter) => _getter = getter;
423 public override int Encode(T record, ref
byte[] buffer,
int position)
430 private sealed
class FloatEncoder : FieldEncoder
432 private readonly Func<T, float> _getter;
433 public FloatEncoder(Func<T, float> getter) => _getter = getter;
435 public override int Encode(T record, ref
byte[] buffer,
int position)
438 BinaryPrimitives.WriteSingleLittleEndian(buffer.AsSpan(position), _getter(record));
443 private sealed
class DoubleEncoder : FieldEncoder
445 private readonly Func<T, double> _getter;
446 public DoubleEncoder(Func<T, double> getter) => _getter = getter;
448 public override int Encode(T record, ref
byte[] buffer,
int position)
451 BinaryPrimitives.WriteDoubleLittleEndian(buffer.AsSpan(position), _getter(record));
456 private sealed
class BoolEncoder : FieldEncoder
458 private readonly Func<T, bool> _getter;
459 public BoolEncoder(Func<T, bool> getter) => _getter = getter;
461 public override int Encode(T record, ref
byte[] buffer,
int position)
464 buffer[position] = _getter(record) ? (byte)1 : (
byte)0;
469 private sealed
class StringEncoder : FieldEncoder
471 private readonly Func<T, string> _getter;
472 public StringEncoder(Func<T, string> getter) => _getter = getter;
474 public override int Encode(T record, ref
byte[] buffer,
int position)
476 var value = _getter(record);
483 var bytes = Encoding.UTF8.GetBytes(value);
487 Buffer.BlockCopy(bytes, 0, buffer, position, bytes.Length);
488 return position + bytes.Length;
492 private sealed
class BytesEncoder : FieldEncoder
494 private readonly Func<T, byte[]> _getter;
495 public BytesEncoder(Func<T,
byte[]> getter) => _getter = getter;
497 public override int Encode(T record, ref
byte[] buffer,
int position)
499 var value = _getter(record);
508 Buffer.BlockCopy(value, 0, buffer, position, value.Length);
509 return position + value.Length;
514 private sealed
class NullableIntEncoder : FieldEncoder
516 private readonly Func<T, int?> _getter;
517 private readonly
int _nonNullIndex;
518 public NullableIntEncoder(Func<T, int?> getter,
int nonNullIndex) { _getter = getter; _nonNullIndex = nonNullIndex; }
520 public override int Encode(T record, ref
byte[] buffer,
int position)
523 var value = _getter(record);
533 private sealed
class NullableLongEncoder : FieldEncoder
535 private readonly Func<T, long?> _getter;
536 private readonly
int _nonNullIndex;
537 public NullableLongEncoder(Func<T, long?> getter,
int nonNullIndex) { _getter = getter; _nonNullIndex = nonNullIndex; }
539 public override int Encode(T record, ref
byte[] buffer,
int position)
542 var value = _getter(record);
552 private sealed
class NullableFloatEncoder : FieldEncoder
554 private readonly Func<T, float?> _getter;
555 private readonly
int _nonNullIndex;
556 public NullableFloatEncoder(Func<T, float?> getter,
int nonNullIndex) { _getter = getter; _nonNullIndex = nonNullIndex; }
558 public override int Encode(T record, ref
byte[] buffer,
int position)
561 var value = _getter(record);
567 BinaryPrimitives.WriteSingleLittleEndian(buffer.AsSpan(position), value.Value);
572 private sealed
class NullableDoubleEncoder : FieldEncoder
574 private readonly Func<T, double?> _getter;
575 private readonly
int _nonNullIndex;
576 public NullableDoubleEncoder(Func<T, double?> getter,
int nonNullIndex) { _getter = getter; _nonNullIndex = nonNullIndex; }
578 public override int Encode(T record, ref
byte[] buffer,
int position)
581 var value = _getter(record);
587 BinaryPrimitives.WriteDoubleLittleEndian(buffer.AsSpan(position), value.Value);
592 private sealed
class NullableBoolEncoder : FieldEncoder
594 private readonly Func<T, bool?> _getter;
595 private readonly
int _nonNullIndex;
596 public NullableBoolEncoder(Func<T, bool?> getter,
int nonNullIndex) { _getter = getter; _nonNullIndex = nonNullIndex; }
598 public override int Encode(T record, ref
byte[] buffer,
int position)
601 var value = _getter(record);
607 buffer[position] = value.Value ? (byte)1 : (
byte)0;
612 private sealed
class NullableStringEncoder : FieldEncoder
614 private readonly Func<T, string?> _getter;
615 private readonly
int _nonNullIndex;
616 public NullableStringEncoder(Func<T, string?> getter,
int nonNullIndex) { _getter = getter; _nonNullIndex = nonNullIndex; }
618 public override int Encode(T record, ref
byte[] buffer,
int position)
620 var value = _getter(record);
626 var bytes = Encoding.UTF8.GetBytes(value);
630 Buffer.BlockCopy(bytes, 0, buffer, position, bytes.Length);
631 return position + bytes.Length;
635 private sealed
class NullableBytesEncoder : FieldEncoder
637 private readonly Func<T, byte[]?> _getter;
638 private readonly
int _nonNullIndex;
639 public NullableBytesEncoder(Func<T,
byte[]?> getter,
int nonNullIndex) { _getter = getter; _nonNullIndex = nonNullIndex; }
641 public override int Encode(T record, ref
byte[] buffer,
int position)
643 var value = _getter(record);
652 Buffer.BlockCopy(value, 0, buffer, position, value.Length);
653 return position + value.Length;
662 #region Direct Request Encoder 678 private static readonly
byte[] BinaryEncodingBytes = Encoding.UTF8.GetBytes(
"binary");
686 IReadOnlyList<
byte[]> records,
687 IDictionary<string, string>? options)
690 int estimatedSize = 256 + tableName.Length * 2;
691 foreach (var record
in records)
693 estimatedSize += record.Length + 10;
696 var buffer =
new byte[estimatedSize];
700 position = WriteString(buffer, position, tableName);
703 position = WriteBytesArray(buffer, position, records, ref buffer);
706 buffer[position++] = 0;
709 position = WriteBytes(buffer, position, BinaryEncodingBytes);
712 position = WriteStringMap(buffer, position, options, ref buffer);
715 var result =
new byte[position];
716 Buffer.BlockCopy(buffer, 0, result, 0, position);
726 byte[] recordsBuffer,
727 (
int Offset,
int Length)[] recordSegments,
728 IDictionary<string, string>? options)
734 var tableNameBytes = Encoding.UTF8.GetBytes(tableName);
738 int recordsCount = recordSegments.Length;
739 if (recordsCount > 0)
742 foreach (var (offset, length) in recordSegments)
760 int optionsCount = options?.Count ?? 0;
761 if (optionsCount > 0)
764 foreach (var kvp
in options!)
766 var keyBytes = Encoding.UTF8.GetBytes(kvp.Key);
767 var valueBytes = Encoding.UTF8.GetBytes(kvp.Value);
779 var result =
new byte[size];
784 Buffer.BlockCopy(tableNameBytes, 0, result, position, tableNameBytes.Length);
785 position += tableNameBytes.Length;
788 if (recordsCount > 0)
791 foreach (var (offset, length) in recordSegments)
794 Buffer.BlockCopy(recordsBuffer, offset, result, position, length);
797 result[position++] = 0;
801 result[position++] = 0;
805 result[position++] = 0;
809 Buffer.BlockCopy(BinaryEncodingBytes, 0, result, position, BinaryEncodingBytes.Length);
810 position += BinaryEncodingBytes.Length;
813 if (optionsCount > 0)
816 foreach (var kvp
in options!)
818 var keyBytes = Encoding.UTF8.GetBytes(kvp.Key);
819 var valueBytes = Encoding.UTF8.GetBytes(kvp.Value);
822 Buffer.BlockCopy(keyBytes, 0, result, position, keyBytes.Length);
823 position += keyBytes.Length;
826 Buffer.BlockCopy(valueBytes, 0, result, position, valueBytes.Length);
827 position += valueBytes.Length;
829 result[position++] = 0;
833 result[position++] = 0;
839 [MethodImpl(MethodImplOptions.AggressiveInlining)]
840 private static int WriteString(
byte[] buffer,
int position,
string value)
842 var bytes = Encoding.UTF8.GetBytes(value);
845 Buffer.BlockCopy(bytes, 0, buffer, position, bytes.Length);
846 return position + bytes.Length;
849 [MethodImpl(MethodImplOptions.AggressiveInlining)]
850 private static int WriteBytes(
byte[] buffer,
int position,
byte[] bytes)
854 Buffer.BlockCopy(bytes, 0, buffer, position, bytes.Length);
855 return position + bytes.Length;
858 private static int WriteBytesArray(
byte[] buffer,
int position, IReadOnlyList<
byte[]> items, ref
byte[] bufferRef)
860 int count = items.Count;
863 buffer[position++] = 0;
871 for (
int i = 0; i < count; i++)
878 Buffer.BlockCopy(item, 0, buffer, position, item.Length);
879 position += item.Length;
883 buffer[position++] = 0;
887 private static int WriteStringMap(
byte[] buffer,
int position, IDictionary<string, string>? map, ref
byte[] bufferRef)
889 if (map ==
null || map.Count == 0)
891 buffer[position++] = 0;
899 foreach (var kvp
in map)
905 var keyBytes = Encoding.UTF8.GetBytes(kvp.Key);
907 Buffer.BlockCopy(keyBytes, 0, buffer, position, keyBytes.Length);
908 position += keyBytes.Length;
911 var valueBytes = Encoding.UTF8.GetBytes(kvp.Value);
913 Buffer.BlockCopy(valueBytes, 0, buffer, position, valueBytes.Length);
914 position += valueBytes.Length;
918 buffer[position++] = 0;
static byte [] EncodeZeroCopy(string tableName, byte[] recordsBuffer,(int Offset, int Length)[] recordSegments, IDictionary< string, string >? options)
Encodes using pre-encoded records stored in a contiguous buffer with segments.
byte [] Buffer
Zero-copy batch encoding: encodes all records into a single contiguous buffer and returns segments po...
Base class for all schema types
Type
Enum for schema types
static int WriteVarInt(byte[] buffer, int position, int value)
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
static int WriteVarLong(byte[] buffer, int position, long value)
Direct binary encoder for RawInsertRecordsRequest that bypasses the Apache Avro library.
static DirectAvroEncoder< T > GetOrCreate(KineticaType ktype)
Gets or creates a DirectAvroEncoder for the specified type and KineticaType.
byte [] Encode(T record)
Encodes a single record to Avro binary format.
byte [][] EncodeMany(IReadOnlyList< T > records)
Encodes multiple records in parallel using thread-local buffers.
Shared utilities for Avro varint (zig-zag) encoding.
int CalculateEncodedSize(T record)
Calculates the encoded size of a record without actually encoding it.
static void EnsureCapacity(ref byte[] buffer, int position, int required)
static int GetVarIntSize(long value)
Ultra-high-performance Avro encoder that writes directly to binary format without using GenericRecord...
Immutable collection of metadata about a Kinetica type.
int EncodeTo(T record, ref byte[] buffer, int offset)
Encodes a single record directly into the provided buffer.
static byte [] Encode(string tableName, IReadOnlyList< byte[]> records, IDictionary< string, string >? options)
Encodes a RawInsertRecordsRequest directly to Avro binary format.
Type Tag
Schema type property
List< byte[]> EncodeManyAsList(IReadOnlyList< T > records)
Encodes multiple records to a list.