Kinetica   C#   API  Version 7.2.3.1
AvroEncoders.cs
Go to the documentation of this file.
1 using System;
2 using System.Buffers;
3 using System.Buffers.Binary;
4 using System.Collections.Concurrent;
5 using System.Collections.Generic;
6 using System.Linq;
7 using System.Linq.Expressions;
8 using System.Reflection;
9 using System.Runtime.CompilerServices;
10 using System.Text;
11 using Avro;
12 
13 namespace kinetica;
14 
15 #region Avro Varint Encoding Utilities
16 
20  internal static class AvroEncoding
21  {
22  [MethodImpl(MethodImplOptions.AggressiveInlining)]
23  public static int WriteVarInt(byte[] buffer, int position, int value)
24  {
25  return WriteVarLong(buffer, position, value);
26  }
27 
28  [MethodImpl(MethodImplOptions.AggressiveInlining)]
29  public static int WriteVarLong(byte[] buffer, int position, long value)
30  {
31  // Zig-zag encoding
32  ulong n = (ulong)((value << 1) ^ (value >> 63));
33 
34  while ((n & ~0x7FUL) != 0)
35  {
36  buffer[position++] = (byte)((n & 0x7F) | 0x80);
37  n >>= 7;
38  }
39  buffer[position++] = (byte)n;
40 
41  return position;
42  }
43 
44  [MethodImpl(MethodImplOptions.AggressiveInlining)]
45  public static int GetVarIntSize(long value)
46  {
47  ulong n = (ulong)((value << 1) ^ (value >> 63));
48  int size = 1;
49  while ((n & ~0x7FUL) != 0)
50  {
51  size++;
52  n >>= 7;
53  }
54  return size;
55  }
56 
57  [MethodImpl(MethodImplOptions.AggressiveInlining)]
58  public static void EnsureCapacity(ref byte[] buffer, int position, int required)
59  {
60  if (position + required > buffer.Length)
61  {
62  var newBuffer = new byte[Math.Max(buffer.Length * 2, position + required + 1024)];
63  Buffer.BlockCopy(buffer, 0, newBuffer, 0, position);
64  buffer = newBuffer;
65  }
66  }
67  }
68 
69  #endregion
70 
71  #region Direct Avro Record Encoder
72 
83  internal sealed class DirectAvroEncoder<T>
84  {
85  private readonly FieldEncoder[] _fieldEncoders;
86  private readonly int _fieldCount;
87  private readonly int _estimatedRecordSize;
88 
89  // Cache of encoders by type
90  private static readonly ConcurrentDictionary<(Type, string), DirectAvroEncoder<T>> _encoderCache = new();
91 
96  {
97  var schemaString = ktype.getSchemaString();
98  var key = (typeof(T), schemaString);
99 
100  return _encoderCache.GetOrAdd(key, _ => new DirectAvroEncoder<T>(ktype));
101  }
102 
103  private DirectAvroEncoder(KineticaType ktype)
104  {
105  var schema = ktype.getSchema() as RecordSchema
106  ?? throw new ArgumentException("Invalid schema: not a record schema");
107 
108  _fieldCount = schema.Fields.Count;
109  _fieldEncoders = new FieldEncoder[_fieldCount];
110  _estimatedRecordSize = 64; // Initial estimate, grows as needed
111 
112  var type = typeof(T);
113  var properties = type.GetProperties(BindingFlags.Public | BindingFlags.Instance);
114  var propertyMap = properties.ToDictionary(
115  p => p.Name.ToLowerInvariant(),
116  p => p,
117  StringComparer.OrdinalIgnoreCase);
118 
119  for (int i = 0; i < _fieldCount; i++)
120  {
121  var field = schema.Fields[i];
122  var fieldNameLower = field.Name.ToLowerInvariant();
123 
124  if (propertyMap.TryGetValue(fieldNameLower, out var property))
125  {
126  _fieldEncoders[i] = CreateFieldEncoder(field.Schema, property);
127  }
128  else
129  {
130  // Property not found - create null encoder
131  _fieldEncoders[i] = new NullFieldEncoder();
132  }
133  }
134  }
135 
136  private FieldEncoder CreateFieldEncoder(Schema fieldSchema, PropertyInfo property)
137  {
138  // Handle union types (nullable)
139  if (fieldSchema is UnionSchema unionSchema)
140  {
141  // Find the non-null schema in the union
142  Schema? nonNullSchema = null;
143  int nonNullIndex = -1;
144  for (int i = 0; i < unionSchema.Schemas.Count; i++)
145  {
146  if (unionSchema.Schemas[i].Tag != Schema.Type.Null)
147  {
148  nonNullSchema = unionSchema.Schemas[i];
149  nonNullIndex = i;
150  break;
151  }
152  }
153 
154  if (nonNullSchema != null)
155  {
156  return CreateNullableFieldEncoder(nonNullSchema, property, nonNullIndex);
157  }
158  }
159 
160  return CreateNonNullableFieldEncoder(fieldSchema, property);
161  }
162 
163  private FieldEncoder CreateNullableFieldEncoder(Schema schema, PropertyInfo property, int nonNullIndex)
164  {
165  var propertyType = property.PropertyType;
166  var underlyingType = Nullable.GetUnderlyingType(propertyType) ?? propertyType;
167 
168  return schema.Tag switch
169  {
170  Schema.Type.Int => new NullableIntEncoder(CreateGetter<int?>(property), nonNullIndex),
171  Schema.Type.Long => new NullableLongEncoder(CreateGetter<long?>(property), nonNullIndex),
172  Schema.Type.Float => new NullableFloatEncoder(CreateGetter<float?>(property), nonNullIndex),
173  Schema.Type.Double => new NullableDoubleEncoder(CreateGetter<double?>(property), nonNullIndex),
174  Schema.Type.Boolean => new NullableBoolEncoder(CreateGetter<bool?>(property), nonNullIndex),
175  Schema.Type.String => new NullableStringEncoder(CreateGetter<string?>(property), nonNullIndex),
176  Schema.Type.Bytes => new NullableBytesEncoder(CreateGetter<byte[]?>(property), nonNullIndex),
177  _ => new NullFieldEncoder()
178  };
179  }
180 
181  private FieldEncoder CreateNonNullableFieldEncoder(Schema schema, PropertyInfo property)
182  {
183  return schema.Tag switch
184  {
185  Schema.Type.Int => new IntEncoder(CreateGetter<int>(property)),
186  Schema.Type.Long => new LongEncoder(CreateGetter<long>(property)),
187  Schema.Type.Float => new FloatEncoder(CreateGetter<float>(property)),
188  Schema.Type.Double => new DoubleEncoder(CreateGetter<double>(property)),
189  Schema.Type.Boolean => new BoolEncoder(CreateGetter<bool>(property)),
190  Schema.Type.String => new StringEncoder(CreateGetter<string>(property)),
191  Schema.Type.Bytes => new BytesEncoder(CreateGetter<byte[]>(property)),
192  _ => new NullFieldEncoder()
193  };
194  }
195 
196  private static Func<T, TValue> CreateGetter<TValue>(PropertyInfo property)
197  {
198  var parameter = Expression.Parameter(typeof(T), "obj");
199  Expression propertyAccess = Expression.Property(parameter, property);
200 
201  // Handle type conversions
202  if (propertyAccess.Type != typeof(TValue))
203  {
204  propertyAccess = Expression.Convert(propertyAccess, typeof(TValue));
205  }
206 
207  return Expression.Lambda<Func<T, TValue>>(propertyAccess, parameter).Compile();
208  }
209 
210  // Thread-local buffer to avoid allocations - each thread gets its own resizable buffer
211  [ThreadStatic]
212  private static byte[]? t_buffer;
213 
218  public byte[] Encode(T record)
219  {
220  // Get or create thread-local buffer (start with larger buffer for large strings)
221  var buffer = t_buffer ?? (t_buffer = new byte[4096]);
222  int position = 0;
223 
224  for (int i = 0; i < _fieldCount; i++)
225  {
226  // Encoders handle their own capacity checks and can resize buffer via ref
227  position = _fieldEncoders[i].Encode(record, ref buffer, position);
228  }
229 
230  // Update thread-local buffer if it was resized
231  t_buffer = buffer;
232 
233  // Allocate exact-sized result array (unavoidable - this is what goes in the request)
234  var result = new byte[position];
235  Buffer.BlockCopy(buffer, 0, result, 0, position);
236  return result;
237  }
238 
244  public int EncodeTo(T record, ref byte[] buffer, int offset)
245  {
246  int position = offset;
247 
248  for (int i = 0; i < _fieldCount; i++)
249  {
250  position = _fieldEncoders[i].Encode(record, ref buffer, position);
251  }
252 
253  return position - offset;
254  }
255 
260  public int CalculateEncodedSize(T record)
261  {
262  // Use thread-local buffer
263  var buffer = t_buffer ?? (t_buffer = new byte[4096]);
264  var size = EncodeTo(record, ref buffer, 0);
265  t_buffer = buffer; // Update if resized
266  return size;
267  }
268 
272  public byte[][] EncodeMany(IReadOnlyList<T> records)
273  {
274  var count = records.Count;
275  var results = new byte[count][];
276 
277  if (count < 100)
278  {
279  for (int i = 0; i < count; i++)
280  {
281  results[i] = Encode(records[i]);
282  }
283  }
284  else
285  {
286  System.Threading.Tasks.Parallel.For(0, count, i =>
287  {
288  results[i] = Encode(records[i]);
289  });
290  }
291 
292  return results;
293  }
294 
298  public List<byte[]> EncodeManyAsList(IReadOnlyList<T> records)
299  {
300  return new List<byte[]>(EncodeMany(records));
301  }
302 
307  public (byte[] Buffer, (int Offset, int Length)[] Segments) EncodeManyZeroCopy(IReadOnlyList<T> records)
308  {
309  var count = records.Count;
310  if (count == 0)
311  {
312  return (Array.Empty<byte>(), Array.Empty<(int, int)>());
313  }
314 
315  // Estimate total size (will grow if needed via ref in encoders)
316  int estimatedTotalSize = count * _estimatedRecordSize;
317  var buffer = new byte[Math.Max(estimatedTotalSize, 4096)];
318  var segments = new (int Offset, int Length)[count];
319  int position = 0;
320 
321  for (int i = 0; i < count; i++)
322  {
323  int startPos = position;
324 
325  // Encode directly into the shared buffer (encoders handle capacity via ref)
326  for (int f = 0; f < _fieldCount; f++)
327  {
328  position = _fieldEncoders[f].Encode(records[i], ref buffer, position);
329  }
330 
331  segments[i] = (startPos, position - startPos);
332  }
333 
334  // Trim buffer to exact size if significantly oversized
335  if (buffer.Length > position * 2)
336  {
337  var trimmed = new byte[position];
338  Buffer.BlockCopy(buffer, 0, trimmed, 0, position);
339  buffer = trimmed;
340  }
341 
342  return (buffer, segments);
343  }
344 
349  public (byte[] Buffer, (int Offset, int Length)[] Segments) EncodeManyZeroCopyParallel(IReadOnlyList<T> records)
350  {
351  var count = records.Count;
352  if (count == 0)
353  {
354  return (Array.Empty<byte>(), Array.Empty<(int, int)>());
355  }
356 
357  // For small batches, use sequential zero-copy
358  if (count < 100)
359  {
360  return EncodeManyZeroCopy(records);
361  }
362 
363  // Phase 1: Parallel encode to individual arrays (uses thread-local buffers)
364  var encodedRecords = new byte[count][];
365  System.Threading.Tasks.Parallel.For(0, count, i =>
366  {
367  encodedRecords[i] = Encode(records[i]);
368  });
369 
370  // Phase 2: Calculate total size and allocate single buffer
371  int totalSize = 0;
372  for (int i = 0; i < count; i++)
373  {
374  totalSize += encodedRecords[i].Length;
375  }
376 
377  var buffer = new byte[totalSize];
378  var segments = new (int Offset, int Length)[count];
379  int position = 0;
380 
381  // Phase 3: Copy into contiguous buffer (fast sequential memcpy)
382  for (int i = 0; i < count; i++)
383  {
384  var encoded = encodedRecords[i];
385  segments[i] = (position, encoded.Length);
386  Buffer.BlockCopy(encoded, 0, buffer, position, encoded.Length);
387  position += encoded.Length;
388  }
389 
390  return (buffer, segments);
391  }
392 
393  #region Field Encoders
394 
395  private abstract class FieldEncoder
396  {
397  public abstract int Encode(T record, ref byte[] buffer, int position);
398  }
399 
400  private sealed class NullFieldEncoder : FieldEncoder
401  {
402  public override int Encode(T record, ref byte[] buffer, int position) => position;
403  }
404 
405  // Non-nullable encoders
406  private sealed class IntEncoder : FieldEncoder
407  {
408  private readonly Func<T, int> _getter;
409  public IntEncoder(Func<T, int> getter) => _getter = getter;
410 
411  public override int Encode(T record, ref byte[] buffer, int position)
412  {
413  AvroEncoding.EnsureCapacity(ref buffer, position, 10);
414  return AvroEncoding.WriteVarInt(buffer, position, _getter(record));
415  }
416  }
417 
418  private sealed class LongEncoder : FieldEncoder
419  {
420  private readonly Func<T, long> _getter;
421  public LongEncoder(Func<T, long> getter) => _getter = getter;
422 
423  public override int Encode(T record, ref byte[] buffer, int position)
424  {
425  AvroEncoding.EnsureCapacity(ref buffer, position, 10);
426  return AvroEncoding.WriteVarLong(buffer, position, _getter(record));
427  }
428  }
429 
430  private sealed class FloatEncoder : FieldEncoder
431  {
432  private readonly Func<T, float> _getter;
433  public FloatEncoder(Func<T, float> getter) => _getter = getter;
434 
435  public override int Encode(T record, ref byte[] buffer, int position)
436  {
437  AvroEncoding.EnsureCapacity(ref buffer, position, 4);
438  BinaryPrimitives.WriteSingleLittleEndian(buffer.AsSpan(position), _getter(record));
439  return position + 4;
440  }
441  }
442 
443  private sealed class DoubleEncoder : FieldEncoder
444  {
445  private readonly Func<T, double> _getter;
446  public DoubleEncoder(Func<T, double> getter) => _getter = getter;
447 
448  public override int Encode(T record, ref byte[] buffer, int position)
449  {
450  AvroEncoding.EnsureCapacity(ref buffer, position, 8);
451  BinaryPrimitives.WriteDoubleLittleEndian(buffer.AsSpan(position), _getter(record));
452  return position + 8;
453  }
454  }
455 
456  private sealed class BoolEncoder : FieldEncoder
457  {
458  private readonly Func<T, bool> _getter;
459  public BoolEncoder(Func<T, bool> getter) => _getter = getter;
460 
461  public override int Encode(T record, ref byte[] buffer, int position)
462  {
463  AvroEncoding.EnsureCapacity(ref buffer, position, 1);
464  buffer[position] = _getter(record) ? (byte)1 : (byte)0;
465  return position + 1;
466  }
467  }
468 
469  private sealed class StringEncoder : FieldEncoder
470  {
471  private readonly Func<T, string> _getter;
472  public StringEncoder(Func<T, string> getter) => _getter = getter;
473 
474  public override int Encode(T record, ref byte[] buffer, int position)
475  {
476  var value = _getter(record);
477  if (value == null)
478  {
479  AvroEncoding.EnsureCapacity(ref buffer, position, 1);
480  return AvroEncoding.WriteVarLong(buffer, position, 0);
481  }
482 
483  var bytes = Encoding.UTF8.GetBytes(value);
484  // Ensure buffer has enough capacity for length prefix + string bytes
485  AvroEncoding.EnsureCapacity(ref buffer, position, bytes.Length + 10);
486  position = AvroEncoding.WriteVarLong(buffer, position, bytes.Length);
487  Buffer.BlockCopy(bytes, 0, buffer, position, bytes.Length);
488  return position + bytes.Length;
489  }
490  }
491 
492  private sealed class BytesEncoder : FieldEncoder
493  {
494  private readonly Func<T, byte[]> _getter;
495  public BytesEncoder(Func<T, byte[]> getter) => _getter = getter;
496 
497  public override int Encode(T record, ref byte[] buffer, int position)
498  {
499  var value = _getter(record);
500  if (value == null)
501  {
502  AvroEncoding.EnsureCapacity(ref buffer, position, 1);
503  return AvroEncoding.WriteVarLong(buffer, position, 0);
504  }
505 
506  AvroEncoding.EnsureCapacity(ref buffer, position, value.Length + 10);
507  position = AvroEncoding.WriteVarLong(buffer, position, value.Length);
508  Buffer.BlockCopy(value, 0, buffer, position, value.Length);
509  return position + value.Length;
510  }
511  }
512 
513  // Nullable encoders (for union types)
514  private sealed class NullableIntEncoder : FieldEncoder
515  {
516  private readonly Func<T, int?> _getter;
517  private readonly int _nonNullIndex;
518  public NullableIntEncoder(Func<T, int?> getter, int nonNullIndex) { _getter = getter; _nonNullIndex = nonNullIndex; }
519 
520  public override int Encode(T record, ref byte[] buffer, int position)
521  {
522  AvroEncoding.EnsureCapacity(ref buffer, position, 20);
523  var value = _getter(record);
524  if (!value.HasValue)
525  {
526  return AvroEncoding.WriteVarLong(buffer, position, _nonNullIndex == 0 ? 1 : 0);
527  }
528  position = AvroEncoding.WriteVarLong(buffer, position, _nonNullIndex);
529  return AvroEncoding.WriteVarInt(buffer, position, value.Value);
530  }
531  }
532 
533  private sealed class NullableLongEncoder : FieldEncoder
534  {
535  private readonly Func<T, long?> _getter;
536  private readonly int _nonNullIndex;
537  public NullableLongEncoder(Func<T, long?> getter, int nonNullIndex) { _getter = getter; _nonNullIndex = nonNullIndex; }
538 
539  public override int Encode(T record, ref byte[] buffer, int position)
540  {
541  AvroEncoding.EnsureCapacity(ref buffer, position, 20);
542  var value = _getter(record);
543  if (!value.HasValue)
544  {
545  return AvroEncoding.WriteVarLong(buffer, position, _nonNullIndex == 0 ? 1 : 0);
546  }
547  position = AvroEncoding.WriteVarLong(buffer, position, _nonNullIndex);
548  return AvroEncoding.WriteVarLong(buffer, position, value.Value);
549  }
550  }
551 
552  private sealed class NullableFloatEncoder : FieldEncoder
553  {
554  private readonly Func<T, float?> _getter;
555  private readonly int _nonNullIndex;
556  public NullableFloatEncoder(Func<T, float?> getter, int nonNullIndex) { _getter = getter; _nonNullIndex = nonNullIndex; }
557 
558  public override int Encode(T record, ref byte[] buffer, int position)
559  {
560  AvroEncoding.EnsureCapacity(ref buffer, position, 14);
561  var value = _getter(record);
562  if (!value.HasValue)
563  {
564  return AvroEncoding.WriteVarLong(buffer, position, _nonNullIndex == 0 ? 1 : 0);
565  }
566  position = AvroEncoding.WriteVarLong(buffer, position, _nonNullIndex);
567  BinaryPrimitives.WriteSingleLittleEndian(buffer.AsSpan(position), value.Value);
568  return position + 4;
569  }
570  }
571 
572  private sealed class NullableDoubleEncoder : FieldEncoder
573  {
574  private readonly Func<T, double?> _getter;
575  private readonly int _nonNullIndex;
576  public NullableDoubleEncoder(Func<T, double?> getter, int nonNullIndex) { _getter = getter; _nonNullIndex = nonNullIndex; }
577 
578  public override int Encode(T record, ref byte[] buffer, int position)
579  {
580  AvroEncoding.EnsureCapacity(ref buffer, position, 18);
581  var value = _getter(record);
582  if (!value.HasValue)
583  {
584  return AvroEncoding.WriteVarLong(buffer, position, _nonNullIndex == 0 ? 1 : 0);
585  }
586  position = AvroEncoding.WriteVarLong(buffer, position, _nonNullIndex);
587  BinaryPrimitives.WriteDoubleLittleEndian(buffer.AsSpan(position), value.Value);
588  return position + 8;
589  }
590  }
591 
592  private sealed class NullableBoolEncoder : FieldEncoder
593  {
594  private readonly Func<T, bool?> _getter;
595  private readonly int _nonNullIndex;
596  public NullableBoolEncoder(Func<T, bool?> getter, int nonNullIndex) { _getter = getter; _nonNullIndex = nonNullIndex; }
597 
598  public override int Encode(T record, ref byte[] buffer, int position)
599  {
600  AvroEncoding.EnsureCapacity(ref buffer, position, 11);
601  var value = _getter(record);
602  if (!value.HasValue)
603  {
604  return AvroEncoding.WriteVarLong(buffer, position, _nonNullIndex == 0 ? 1 : 0);
605  }
606  position = AvroEncoding.WriteVarLong(buffer, position, _nonNullIndex);
607  buffer[position] = value.Value ? (byte)1 : (byte)0;
608  return position + 1;
609  }
610  }
611 
612  private sealed class NullableStringEncoder : FieldEncoder
613  {
614  private readonly Func<T, string?> _getter;
615  private readonly int _nonNullIndex;
616  public NullableStringEncoder(Func<T, string?> getter, int nonNullIndex) { _getter = getter; _nonNullIndex = nonNullIndex; }
617 
618  public override int Encode(T record, ref byte[] buffer, int position)
619  {
620  var value = _getter(record);
621  if (value == null)
622  {
623  AvroEncoding.EnsureCapacity(ref buffer, position, 10);
624  return AvroEncoding.WriteVarLong(buffer, position, _nonNullIndex == 0 ? 1 : 0);
625  }
626  var bytes = Encoding.UTF8.GetBytes(value);
627  AvroEncoding.EnsureCapacity(ref buffer, position, bytes.Length + 20);
628  position = AvroEncoding.WriteVarLong(buffer, position, _nonNullIndex);
629  position = AvroEncoding.WriteVarLong(buffer, position, bytes.Length);
630  Buffer.BlockCopy(bytes, 0, buffer, position, bytes.Length);
631  return position + bytes.Length;
632  }
633  }
634 
635  private sealed class NullableBytesEncoder : FieldEncoder
636  {
637  private readonly Func<T, byte[]?> _getter;
638  private readonly int _nonNullIndex;
639  public NullableBytesEncoder(Func<T, byte[]?> getter, int nonNullIndex) { _getter = getter; _nonNullIndex = nonNullIndex; }
640 
641  public override int Encode(T record, ref byte[] buffer, int position)
642  {
643  var value = _getter(record);
644  if (value == null)
645  {
646  AvroEncoding.EnsureCapacity(ref buffer, position, 10);
647  return AvroEncoding.WriteVarLong(buffer, position, _nonNullIndex == 0 ? 1 : 0);
648  }
649  AvroEncoding.EnsureCapacity(ref buffer, position, value.Length + 20);
650  position = AvroEncoding.WriteVarLong(buffer, position, _nonNullIndex);
651  position = AvroEncoding.WriteVarLong(buffer, position, value.Length);
652  Buffer.BlockCopy(value, 0, buffer, position, value.Length);
653  return position + value.Length;
654  }
655  }
656 
657  #endregion
658  }
659 
660  #endregion
661 
662  #region Direct Request Encoder
663 
675  internal static class DirectRequestEncoder
676  {
677  // Pre-encoded constants
678  private static readonly byte[] BinaryEncodingBytes = Encoding.UTF8.GetBytes("binary");
679 
684  public static byte[] Encode(
685  string tableName,
686  IReadOnlyList<byte[]> records,
687  IDictionary<string, string>? options)
688  {
689  // Estimate size: table name + records + overhead
690  int estimatedSize = 256 + tableName.Length * 2;
691  foreach (var record in records)
692  {
693  estimatedSize += record.Length + 10; // +10 for length prefix
694  }
695 
696  var buffer = new byte[estimatedSize];
697  int position = 0;
698 
699  // 1. Encode table_name (Avro string: length-prefixed UTF-8)
700  position = WriteString(buffer, position, tableName);
701 
702  // 2. Encode list (Avro array of bytes)
703  position = WriteBytesArray(buffer, position, records, ref buffer);
704 
705  // 3. Encode list_str (empty array)
706  buffer[position++] = 0; // Empty array marker
707 
708  // 4. Encode list_encoding ("binary")
709  position = WriteBytes(buffer, position, BinaryEncodingBytes);
710 
711  // 5. Encode options (Avro map)
712  position = WriteStringMap(buffer, position, options, ref buffer);
713 
714  // Return exact-sized result
715  var result = new byte[position];
716  Buffer.BlockCopy(buffer, 0, result, 0, position);
717  return result;
718  }
719 
724  public static byte[] EncodeZeroCopy(
725  string tableName,
726  byte[] recordsBuffer,
727  (int Offset, int Length)[] recordSegments,
728  IDictionary<string, string>? options)
729  {
730  // Calculate exact size
731  int size = 0;
732 
733  // Table name
734  var tableNameBytes = Encoding.UTF8.GetBytes(tableName);
735  size += AvroEncoding.GetVarIntSize(tableNameBytes.Length) + tableNameBytes.Length;
736 
737  // Records array: block header + sum of (length prefix + data)
738  int recordsCount = recordSegments.Length;
739  if (recordsCount > 0)
740  {
741  size += AvroEncoding.GetVarIntSize(recordsCount); // Block count
742  foreach (var (offset, length) in recordSegments)
743  {
744  size += AvroEncoding.GetVarIntSize(length) + length;
745  }
746  size += 1; // End of array marker
747  }
748  else
749  {
750  size += 1; // Empty array
751  }
752 
753  // Empty list_str
754  size += 1;
755 
756  // list_encoding "binary"
757  size += AvroEncoding.GetVarIntSize(BinaryEncodingBytes.Length) + BinaryEncodingBytes.Length;
758 
759  // Options map
760  int optionsCount = options?.Count ?? 0;
761  if (optionsCount > 0)
762  {
763  size += AvroEncoding.GetVarIntSize(optionsCount);
764  foreach (var kvp in options!)
765  {
766  var keyBytes = Encoding.UTF8.GetBytes(kvp.Key);
767  var valueBytes = Encoding.UTF8.GetBytes(kvp.Value);
768  size += AvroEncoding.GetVarIntSize(keyBytes.Length) + keyBytes.Length;
769  size += AvroEncoding.GetVarIntSize(valueBytes.Length) + valueBytes.Length;
770  }
771  size += 1; // End of map marker
772  }
773  else
774  {
775  size += 1; // Empty map
776  }
777 
778  // Allocate exact-sized buffer
779  var result = new byte[size];
780  int position = 0;
781 
782  // 1. Write table_name
783  position = AvroEncoding.WriteVarInt(result, position, tableNameBytes.Length);
784  Buffer.BlockCopy(tableNameBytes, 0, result, position, tableNameBytes.Length);
785  position += tableNameBytes.Length;
786 
787  // 2. Write records array
788  if (recordsCount > 0)
789  {
790  position = AvroEncoding.WriteVarInt(result, position, recordsCount);
791  foreach (var (offset, length) in recordSegments)
792  {
793  position = AvroEncoding.WriteVarInt(result, position, length);
794  Buffer.BlockCopy(recordsBuffer, offset, result, position, length);
795  position += length;
796  }
797  result[position++] = 0; // End of array
798  }
799  else
800  {
801  result[position++] = 0; // Empty array
802  }
803 
804  // 3. Write empty list_str
805  result[position++] = 0;
806 
807  // 4. Write list_encoding
808  position = AvroEncoding.WriteVarInt(result, position, BinaryEncodingBytes.Length);
809  Buffer.BlockCopy(BinaryEncodingBytes, 0, result, position, BinaryEncodingBytes.Length);
810  position += BinaryEncodingBytes.Length;
811 
812  // 5. Write options map
813  if (optionsCount > 0)
814  {
815  position = AvroEncoding.WriteVarInt(result, position, optionsCount);
816  foreach (var kvp in options!)
817  {
818  var keyBytes = Encoding.UTF8.GetBytes(kvp.Key);
819  var valueBytes = Encoding.UTF8.GetBytes(kvp.Value);
820 
821  position = AvroEncoding.WriteVarInt(result, position, keyBytes.Length);
822  Buffer.BlockCopy(keyBytes, 0, result, position, keyBytes.Length);
823  position += keyBytes.Length;
824 
825  position = AvroEncoding.WriteVarInt(result, position, valueBytes.Length);
826  Buffer.BlockCopy(valueBytes, 0, result, position, valueBytes.Length);
827  position += valueBytes.Length;
828  }
829  result[position++] = 0; // End of map
830  }
831  else
832  {
833  result[position++] = 0; // Empty map
834  }
835 
836  return result;
837  }
838 
839  [MethodImpl(MethodImplOptions.AggressiveInlining)]
840  private static int WriteString(byte[] buffer, int position, string value)
841  {
842  var bytes = Encoding.UTF8.GetBytes(value);
843  position = AvroEncoding.WriteVarInt(buffer, position, bytes.Length);
844  AvroEncoding.EnsureCapacity(ref buffer, position, bytes.Length);
845  Buffer.BlockCopy(bytes, 0, buffer, position, bytes.Length);
846  return position + bytes.Length;
847  }
848 
849  [MethodImpl(MethodImplOptions.AggressiveInlining)]
850  private static int WriteBytes(byte[] buffer, int position, byte[] bytes)
851  {
852  position = AvroEncoding.WriteVarInt(buffer, position, bytes.Length);
853  AvroEncoding.EnsureCapacity(ref buffer, position, bytes.Length);
854  Buffer.BlockCopy(bytes, 0, buffer, position, bytes.Length);
855  return position + bytes.Length;
856  }
857 
858  private static int WriteBytesArray(byte[] buffer, int position, IReadOnlyList<byte[]> items, ref byte[] bufferRef)
859  {
860  int count = items.Count;
861  if (count == 0)
862  {
863  buffer[position++] = 0; // Empty array
864  return position;
865  }
866 
867  // Write block count (positive = block with count items)
868  position = AvroEncoding.WriteVarInt(buffer, position, count);
869 
870  // Write each item
871  for (int i = 0; i < count; i++)
872  {
873  var item = items[i];
874  AvroEncoding.EnsureCapacity(ref buffer, position, item.Length + 10);
875  bufferRef = buffer;
876 
877  position = AvroEncoding.WriteVarInt(buffer, position, item.Length);
878  Buffer.BlockCopy(item, 0, buffer, position, item.Length);
879  position += item.Length;
880  }
881 
882  // End of array marker
883  buffer[position++] = 0;
884  return position;
885  }
886 
887  private static int WriteStringMap(byte[] buffer, int position, IDictionary<string, string>? map, ref byte[] bufferRef)
888  {
889  if (map == null || map.Count == 0)
890  {
891  buffer[position++] = 0; // Empty map
892  return position;
893  }
894 
895  // Write block count
896  position = AvroEncoding.WriteVarInt(buffer, position, map.Count);
897 
898  // Write each key-value pair
899  foreach (var kvp in map)
900  {
901  AvroEncoding.EnsureCapacity(ref buffer, position, 256);
902  bufferRef = buffer;
903 
904  // Write key
905  var keyBytes = Encoding.UTF8.GetBytes(kvp.Key);
906  position = AvroEncoding.WriteVarInt(buffer, position, keyBytes.Length);
907  Buffer.BlockCopy(keyBytes, 0, buffer, position, keyBytes.Length);
908  position += keyBytes.Length;
909 
910  // Write value
911  var valueBytes = Encoding.UTF8.GetBytes(kvp.Value);
912  position = AvroEncoding.WriteVarInt(buffer, position, valueBytes.Length);
913  Buffer.BlockCopy(valueBytes, 0, buffer, position, valueBytes.Length);
914  position += valueBytes.Length;
915  }
916 
917  // End of map marker
918  buffer[position++] = 0;
919  return position;
920  }
921  }
922 
923  #endregion
static byte [] EncodeZeroCopy(string tableName, byte[] recordsBuffer,(int Offset, int Length)[] recordSegments, IDictionary< string, string >? options)
Encodes using pre-encoded records stored in a contiguous buffer with segments.
string getSchemaString()
Class for record schemas
Definition: RecordSchema.cs:31
byte [] Buffer
Zero-copy batch encoding: encodes all records into a single contiguous buffer and returns segments po...
Base class for all schema types
Definition: Schema.cs:29
Class for union schemas
Definition: UnionSchema.cs:29
Type
Enum for schema types
Definition: Schema.cs:34
static int WriteVarInt(byte[] buffer, int position, int value)
Definition: AvroEncoders.cs:23
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
static int WriteVarLong(byte[] buffer, int position, long value)
Definition: AvroEncoders.cs:29
Direct binary encoder for RawInsertRecordsRequest that bypasses the Apache Avro library.
static DirectAvroEncoder< T > GetOrCreate(KineticaType ktype)
Gets or creates a DirectAvroEncoder for the specified type and KineticaType.
Definition: AvroEncoders.cs:95
byte [] Encode(T record)
Encodes a single record to Avro binary format.
byte [][] EncodeMany(IReadOnlyList< T > records)
Encodes multiple records in parallel using thread-local buffers.
Schema getSchema()
Shared utilities for Avro varint (zig-zag) encoding.
Definition: AvroEncoders.cs:20
int CalculateEncodedSize(T record)
Calculates the encoded size of a record without actually encoding it.
static void EnsureCapacity(ref byte[] buffer, int position, int required)
Definition: AvroEncoders.cs:58
static int GetVarIntSize(long value)
Definition: AvroEncoders.cs:45
Ultra-high-performance Avro encoder that writes directly to binary format without using GenericRecord...
Definition: AvroEncoders.cs:83
Immutable collection of metadata about a Kinetica type.
Definition: Type.cs:36
int EncodeTo(T record, ref byte[] buffer, int offset)
Encodes a single record directly into the provided buffer.
static byte [] Encode(string tableName, IReadOnlyList< byte[]> records, IDictionary< string, string >? options)
Encodes a RawInsertRecordsRequest directly to Avro binary format.
Type Tag
Schema type property
Definition: Schema.cs:56
List< byte[]> EncodeManyAsList(IReadOnlyList< T > records)
Encodes multiple records to a list.