Kinetica   C#   API  Version 7.2.3.0
GenericReader.cs
Go to the documentation of this file.
1 
18 using System;
19 using System.Collections.Generic;
20 using Avro.IO;
21 using System.IO;
22 
23 namespace Avro.Generic
24 {
25  public delegate T Reader<T>();
26 
35  public sealed class GenericReader<T> : DatumReader<T>
36  {
37  private readonly DefaultReader reader;
38 
45  public GenericReader(Schema writerSchema, Schema readerSchema)
46  : this(new DefaultReader(writerSchema, readerSchema))
47  {
48  }
49 
55  {
56  this.reader = reader;
57  }
58 
59  public Schema WriterSchema { get { return reader.WriterSchema; } }
60 
61  public Schema ReaderSchema { get { return reader.ReaderSchema; } }
62 
63  public T Read(T reuse, Decoder d)
64  {
65  return reader.Read(reuse, d);
66  }
67  }
68 
81  public class DefaultReader
82  {
83  public Schema ReaderSchema { get; private set; }
84  public Schema WriterSchema { get; private set; }
85 
86 
94  public DefaultReader(Schema writerSchema, Schema readerSchema)
95  {
96  this.ReaderSchema = readerSchema;
97  this.WriterSchema = writerSchema;
98  }
99 
109  public T Read<T>(T reuse, Decoder decoder)
110  {
112  throw new AvroException("Schema mismatch. Reader: " + ReaderSchema + ", writer: " + WriterSchema);
113 
114  return (T)Read(reuse, WriterSchema, ReaderSchema, decoder);
115  }
116 
117  public object Read(object reuse, Schema writerSchema, Schema readerSchema, Decoder d)
118  {
119  if (readerSchema.Tag == Schema.Type.Union && writerSchema.Tag != Schema.Type.Union)
120  {
121  readerSchema = findBranch(readerSchema as UnionSchema, writerSchema);
122  }
123  /*
124  if (!readerSchema.CanRead(writerSchema))
125  {
126  throw new AvroException("Schema mismatch. Reader: " + readerSchema + ", writer: " + writerSchema);
127  }
128  */
129  switch (writerSchema.Tag)
130  {
131  case Schema.Type.Null:
132  return ReadNull(readerSchema, d);
133  case Schema.Type.Boolean:
134  return Read<bool>(writerSchema.Tag, readerSchema, d.ReadBoolean);
135  case Schema.Type.Int:
136  {
137  int i = Read<int>(writerSchema.Tag, readerSchema, d.ReadInt);
138  switch (readerSchema.Tag)
139  {
140  case Schema.Type.Long:
141  return (long)i;
142  case Schema.Type.Float:
143  return (float)i;
144  case Schema.Type.Double:
145  return (double)i;
146  default:
147  return i;
148  }
149  }
150  case Schema.Type.Long:
151  {
152  long l = Read<long>(writerSchema.Tag, readerSchema, d.ReadLong);
153  switch (readerSchema.Tag)
154  {
155  case Schema.Type.Float:
156  return (float)l;
157  case Schema.Type.Double:
158  return (double)l;
159  default:
160  return l;
161  }
162  }
163  case Schema.Type.Float:
164  {
165  float f = Read<float>(writerSchema.Tag, readerSchema, d.ReadFloat);
166  switch (readerSchema.Tag)
167  {
168  case Schema.Type.Double:
169  return (double)f;
170  default:
171  return f;
172  }
173  }
174  case Schema.Type.Double:
175  return Read<double>(writerSchema.Tag, readerSchema, d.ReadDouble);
176  case Schema.Type.String:
177  return Read<string>(writerSchema.Tag, readerSchema, d.ReadString);
178  case Schema.Type.Bytes:
179  return Read<byte[]>(writerSchema.Tag, readerSchema, d.ReadBytes);
180  case Schema.Type.Error:
181  case Schema.Type.Record:
182  return ReadRecord(reuse, (RecordSchema)writerSchema, readerSchema, d);
183  case Schema.Type.Enumeration:
184  return ReadEnum(reuse, (EnumSchema)writerSchema, readerSchema, d);
185  case Schema.Type.Fixed:
186  return ReadFixed(reuse, (FixedSchema)writerSchema, readerSchema, d);
187  case Schema.Type.Array:
188  return ReadArray(reuse, (ArraySchema)writerSchema, readerSchema, d);
189  case Schema.Type.Map:
190  return ReadMap(reuse, (MapSchema)writerSchema, readerSchema, d);
191  case Schema.Type.Union:
192  return ReadUnion(reuse, (UnionSchema)writerSchema, readerSchema, d);
193  default:
194  throw new AvroException("Unknown schema type: " + writerSchema);
195  }
196  }
197 
204  protected virtual object ReadNull(Schema readerSchema, Decoder d)
205  {
206  d.ReadNull();
207  return null;
208  }
209 
218  protected S Read<S>(Schema.Type tag, Schema readerSchema, Reader<S> reader)
219  {
220  return reader();
221  }
222 
231  protected virtual object ReadRecord(object reuse, RecordSchema writerSchema, Schema readerSchema, Decoder dec)
232  {
233  RecordSchema rs = (RecordSchema)readerSchema;
234 
235  object rec = CreateRecord(reuse, rs);
236  foreach (Field wf in writerSchema)
237  {
238  try
239  {
240  Field rf;
241  if (rs.TryGetFieldAlias(wf.Name, out rf))
242  {
243  object obj = null;
244  TryGetField(rec, wf.Name, rf.Pos, out obj);
245  AddField(rec, wf.Name, rf.Pos, Read(obj, wf.Schema, rf.Schema, dec));
246  }
247  else
248  Skip(wf.Schema, dec);
249  }
250  catch (Exception ex)
251  {
252  throw new AvroException(ex.Message + " in field " + wf.Name);
253  }
254  }
255 
256  var defaultStream = new MemoryStream();
257  var defaultEncoder = new BinaryEncoder(defaultStream);
258  var defaultDecoder = new BinaryDecoder(defaultStream);
259  foreach (Field rf in rs)
260  {
261  if (writerSchema.Contains(rf.Name)) continue;
262 
263  defaultStream.Position = 0; // reset for writing
264  Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
265  defaultStream.Flush();
266  defaultStream.Position = 0; // reset for reading
267 
268  object obj = null;
269  TryGetField(rec, rf.Name, rf.Pos, out obj);
270  AddField(rec, rf.Name, rf.Pos, Read(obj, rf.Schema, rf.Schema, defaultDecoder));
271  }
272 
273  return rec;
274  }
275 
282  protected virtual object CreateRecord(object reuse, RecordSchema readerSchema)
283  {
284  GenericRecord ru = (reuse == null || !(reuse is GenericRecord) || !(reuse as GenericRecord).Schema.Equals(readerSchema)) ?
285  new GenericRecord(readerSchema) :
286  reuse as GenericRecord;
287  return ru;
288  }
289 
299  protected virtual bool TryGetField(object record, string fieldName, int fieldPos, out object value)
300  {
301  return (record as GenericRecord).TryGetValue(fieldName, out value);
302  }
303 
312  protected virtual void AddField(object record, string fieldName, int fieldPos, object fieldValue)
313  {
314  (record as GenericRecord).Add(fieldName, fieldValue);
315  }
316 
325  protected virtual object ReadEnum(object reuse, EnumSchema writerSchema, Schema readerSchema, Decoder d)
326  {
327  EnumSchema es = readerSchema as EnumSchema;
328  return CreateEnum(reuse, readerSchema as EnumSchema, writerSchema[d.ReadEnum()]);
329  }
330 
338  protected virtual object CreateEnum(object reuse, EnumSchema es, string symbol)
339  {
340  if (reuse is GenericEnum)
341  {
342  GenericEnum ge = reuse as GenericEnum;
343  if (ge.Schema.Equals(es))
344  {
345  ge.Value = symbol;
346  return ge;
347  }
348  }
349  return new GenericEnum(es, symbol);
350  }
351 
362  protected virtual object ReadArray(object reuse, ArraySchema writerSchema, Schema readerSchema, Decoder d)
363  {
364 
365  ArraySchema rs = (ArraySchema)readerSchema;
366  object result = CreateArray(reuse, rs);
367  int i = 0;
368  for (int n = (int)d.ReadArrayStart(); n != 0; n = (int)d.ReadArrayNext())
369  {
370  if (GetArraySize(result) < (i + n)) ResizeArray(ref result, i + n);
371  for (int j = 0; j < n; j++, i++)
372  {
373  SetArrayElement(result, i, Read(GetArrayElement(result, i), writerSchema.ItemSchema, rs.ItemSchema, d));
374  }
375  }
376  if (GetArraySize(result) != i) ResizeArray(ref result, i);
377  return result;
378  }
379 
386  protected virtual object CreateArray(object reuse, ArraySchema rs)
387  {
388  return (reuse != null && reuse is object[]) ? (object[])reuse : new object[0];
389  }
390 
397  protected virtual int GetArraySize(object array)
398  {
399  return (array as object[]).Length;
400  }
401 
408  protected virtual void ResizeArray(ref object array, int n)
409  {
410  object[] o = array as object[];
411  Array.Resize(ref o, n);
412  array = o;
413  }
414 
422  protected virtual void SetArrayElement(object array, int index, object value)
423  {
424  object[] a = array as object[];
425  a[index] = value;
426  }
427 
435  protected virtual object GetArrayElement(object array, int index)
436  {
437  return (array as object[])[index];
438  }
439 
449  protected virtual object ReadMap(object reuse, MapSchema writerSchema, Schema readerSchema, Decoder d)
450  {
451  MapSchema rs = (MapSchema)readerSchema;
452  object result = CreateMap(reuse, rs);
453  for (int n = (int)d.ReadMapStart(); n != 0; n = (int)d.ReadMapNext())
454  {
455  for (int j = 0; j < n; j++)
456  {
457  string k = d.ReadString();
458  AddMapEntry(result, k, Read(null, writerSchema.ValueSchema, rs.ValueSchema, d));
459  }
460  }
461  return result;
462  }
463 
470  protected virtual object CreateMap(object reuse, MapSchema ms)
471  {
472  if (reuse != null && reuse is IDictionary<string, object>)
473  {
474  IDictionary<string, object> result = reuse as IDictionary<string, object>;
475  result.Clear();
476  return result;
477  }
478  return new Dictionary<string, object>();
479  }
480 
487  protected virtual void AddMapEntry(object map, string key, object value)
488  {
489  (map as IDictionary<string, object>).Add(key, value);
490  }
491 
500  protected virtual object ReadUnion(object reuse, UnionSchema writerSchema, Schema readerSchema, Decoder d)
501  {
502  int index = d.ReadUnionIndex();
503  Schema ws = writerSchema[index];
504 
505  if (readerSchema is UnionSchema)
506  readerSchema = findBranch(readerSchema as UnionSchema, ws);
507  else
508  if (!readerSchema.CanRead(ws))
509  throw new AvroException("Schema mismatch. Reader: " + ReaderSchema + ", writer: " + WriterSchema);
510 
511  return Read(reuse, ws, readerSchema, d);
512  }
513 
524  protected virtual object ReadFixed(object reuse, FixedSchema writerSchema, Schema readerSchema, Decoder d)
525  {
526  FixedSchema rs = (FixedSchema)readerSchema;
527  if (rs.Size != writerSchema.Size)
528  {
529  throw new AvroException("Size mismatch between reader and writer fixed schemas. Writer: " + writerSchema +
530  ", reader: " + readerSchema);
531  }
532 
533  object ru = CreateFixed(reuse, rs);
534  byte[] bb = GetFixedBuffer(ru);
535  d.ReadFixed(bb);
536  return ru;
537  }
538 
545  protected virtual object CreateFixed(object reuse, FixedSchema rs)
546  {
547  return (reuse != null && reuse is GenericFixed && (reuse as GenericFixed).Schema.Equals(rs)) ?
548  (GenericFixed)reuse : new GenericFixed(rs);
549  }
550 
557  protected virtual byte[] GetFixedBuffer(object f)
558  {
559  return (f as GenericFixed).Value;
560  }
561 
562  protected virtual void Skip(Schema writerSchema, Decoder d)
563  {
564  switch (writerSchema.Tag)
565  {
566  case Schema.Type.Null:
567  d.SkipNull();
568  break;
569  case Schema.Type.Boolean:
570  d.SkipBoolean();
571  break;
572  case Schema.Type.Int:
573  d.SkipInt();
574  break;
575  case Schema.Type.Long:
576  d.SkipLong();
577  break;
578  case Schema.Type.Float:
579  d.SkipFloat();
580  break;
581  case Schema.Type.Double:
582  d.SkipDouble();
583  break;
584  case Schema.Type.String:
585  d.SkipString();
586  break;
587  case Schema.Type.Bytes:
588  d.SkipBytes();
589  break;
590  case Schema.Type.Record:
591  foreach (Field f in writerSchema as RecordSchema) Skip(f.Schema, d);
592  break;
593  case Schema.Type.Enumeration:
594  d.SkipEnum();
595  break;
596  case Schema.Type.Fixed:
597  d.SkipFixed((writerSchema as FixedSchema).Size);
598  break;
599  case Schema.Type.Array:
600  {
601  Schema s = (writerSchema as ArraySchema).ItemSchema;
602  for (long n = d.ReadArrayStart(); n != 0; n = d.ReadArrayNext())
603  {
604  for (long i = 0; i < n; i++) Skip(s, d);
605  }
606  }
607  break;
608  case Schema.Type.Map:
609  {
610  Schema s = (writerSchema as MapSchema).ValueSchema;
611  for (long n = d.ReadMapStart(); n != 0; n = d.ReadMapNext())
612  {
613  for (long i = 0; i < n; i++) { d.SkipString(); Skip(s, d); }
614  }
615  }
616  break;
617  case Schema.Type.Union:
618  Skip((writerSchema as UnionSchema)[d.ReadUnionIndex()], d);
619  break;
620  default:
621  throw new AvroException("Unknown schema type: " + writerSchema);
622  }
623  }
624 
625  protected static Schema findBranch(UnionSchema us, Schema s)
626  {
627  int index = us.MatchingBranch(s);
628  if (index >= 0) return us[index];
629  throw new AvroException("No matching schema for " + s + " in " + us);
630  }
631 
632  }
633 }
virtual object ReadNull(Schema readerSchema, Decoder d)
Deserializes a null from the stream.
double ReadDouble()
Reads a double Avro type
virtual void Skip(Schema writerSchema, Decoder d)
virtual object ReadMap(object reuse, MapSchema writerSchema, Schema readerSchema, Decoder d)
Deserialized an avro map.
int ReadEnum()
Reads an enum AvroType
Class for record schemas
Definition: RecordSchema.cs:31
virtual object ReadUnion(object reuse, UnionSchema writerSchema, Schema readerSchema, Decoder d)
Deserialized an object based on the writer's uninon schema.
Schema Schema
Field type's schema
Definition: Field.cs:75
Class for fields defined in a record
Definition: Field.cs:30
GenericReader(DefaultReader reader)
Constructs a generic reader by directly using the given DefaultReader
T Read(T reuse, Decoder d)
Read a datum.
The defualt class to hold values for enum schema in GenericReader and GenericWriter.
Definition: GenericEnum.cs:27
void SkipLong()
Skips a long Avro type on the stream.
virtual object CreateFixed(object reuse, FixedSchema rs)
Returns a fixed object.
GenericReader(Schema writerSchema, Schema readerSchema)
Constructs a generic reader for the given schemas using the DefaultReader.
string ReadString()
Reads a string Avro type
Class for enum type schemas
Definition: EnumSchema.cs:28
void SkipBoolean()
Skips a boolean Avro type on the stream.
DefaultReader(Schema writerSchema, Schema readerSchema)
Constructs the default reader for the given schemas using the DefaultReader.
bool ReadBoolean()
Read a boolean Avro type
virtual void AddField(object record, string fieldName, int fieldPos, object fieldValue)
Used by the default implementation of ReadRecord() to add a field to a record object.
Schema ValueSchema
Schema for map values type
Definition: MapSchema.cs:33
void SkipDouble()
Skips a double Avro type on the stream.
virtual void SetArrayElement(object array, int index, object value)
Assigns a new value to the object at the given index
The default implementation for the generic reader.
int Pos
Position of the field within its record.
Definition: Field.cs:55
Base class for all schema types
Definition: Schema.cs:29
virtual object ReadEnum(object reuse, EnumSchema writerSchema, Schema readerSchema, Decoder d)
Deserializes a enum.
void SkipString()
Skips a string Avro type on the stream.
Class for union schemas
Definition: UnionSchema.cs:29
int ReadInt()
Reads an int Avro type.
readonly string Name
Name of the field.
Definition: Field.cs:45
Class for fixed schemas
Definition: FixedSchema.cs:28
virtual object CreateArray(object reuse, ArraySchema rs)
Creates a new array object.
virtual bool CanRead(Schema writerSchema)
Returns true if and only if data written using writerSchema can be read using the current schema acco...
Definition: Schema.cs:283
float ReadFloat()
Reads a float Avro type
Type
Enum for schema types
Definition: Schema.cs:34
long ReadLong()
Reads a long Avro type.
int Size
Fixed size for the bytes
Definition: FixedSchema.cs:33
A general purpose reader of data from avro streams.
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
byte [] ReadBytes()
Reads the bytes Avro type
T Read< T >(T reuse, Decoder decoder)
Reads an object off the stream.
static Schema findBranch(UnionSchema us, Schema s)
long ReadArrayStart()
Starts reading the array Avro type.
Write leaf values.
The default type used by GenericReader and GenericWriter for RecordSchema.
virtual object GetArrayElement(object array, int index)
Returns the element at the given index.
S Read< S >(Schema.Type tag, Schema readerSchema, Reader< S > reader)
A generic function to read primitive types
long ReadArrayNext()
See ReadArrayStart().
virtual object ReadArray(object reuse, ArraySchema writerSchema, Schema readerSchema, Decoder d)
Deserializes an array and returns an array object.
virtual object CreateEnum(object reuse, EnumSchema es, string symbol)
Used by the default implementation of ReadEnum to construct a new enum object.
virtual object ReadRecord(object reuse, RecordSchema writerSchema, Schema readerSchema, Decoder dec)
Deserializes a record from the stream.
object Read(object reuse, Schema writerSchema, Schema readerSchema, Decoder d)
Decoder is used to decode Avro data on a stream.
Definition: Decoder.cs:27
void SkipFloat()
Skips a float Avro type on the stream.
virtual void AddMapEntry(object map, string key, object value)
Adds an entry to the map.
virtual byte [] GetFixedBuffer(object f)
Returns a buffer of appropriate size to read data into.
int MatchingBranch(Schema s)
Returns the index of a branch that can read the data written by the given schema s.
Definition: UnionSchema.cs:113
Class for array type schemas
Definition: ArraySchema.cs:27
virtual void ResizeArray(ref object array, int n)
Resizes the array to the new value.
static void EncodeDefaultValue(Encoder enc, Schema schema, JToken jtok)
Reads the passed JToken default value field and writes it in the specified encoder
Definition: Resolver.cs:33
override bool Equals(object obj)
Checks equality of two enum schema
Definition: EnumSchema.cs:161
delegate T Reader< T >()
JToken DefaultValue
The default value for the field stored as JSON object, if defined.
Definition: Field.cs:65
Class for map schemas
Definition: MapSchema.cs:28
void ReadFixed(byte[] buffer)
A convenience method for ReadFixed(buffer, 0, buffer.Length);
virtual object ReadFixed(object reuse, FixedSchema writerSchema, Schema readerSchema, Decoder d)
Deserializes a fixed object and returns the object.
int ReadUnionIndex()
Reads the index, which determines the type in an union Avro type.
Schema(Type type, PropertyMap props)
Constructor for schema class
Definition: Schema.cs:67
virtual object CreateMap(object reuse, MapSchema ms)
Used by the default implementation of ReadMap() to create a fresh map object.
long ReadMapNext()
See ReadMapStart().
void SkipInt()
Skips a int Avro type on the stream.
long ReadMapStart()
Starts reading the map Avro type.
virtual object CreateRecord(object reuse, RecordSchema readerSchema)
Creates a new record object.
void SkipNull()
Skips a null Avro type on the stream.
Schema ItemSchema
Schema for the array 'type' attribute
Definition: ArraySchema.cs:32
void SkipFixed(int len)
virtual bool TryGetField(object record, string fieldName, int fieldPos, out object value)
Used by the default implementation of ReadRecord() to get the existing field of a record object.
Type Tag
Schema type property
Definition: Schema.cs:56
void ReadNull()
Reads a null Avro type.
virtual int GetArraySize(object array)
Returns the size of the given array object.
void SkipBytes()
Skips a bytes Avro type on the stream.
The default type used by GenericReader and GenericWriter for objects for FixedSchema
Definition: GenericFixed.cs:28
Decoder for Avro binary format