Kinetica   C#   API  Version 7.2.3.0
PreresolvingDatumReader.cs
Go to the documentation of this file.
1 
18 using System.Collections.Generic;
19 using System.IO;
20 using Avro.IO;
21 
22 namespace Avro.Generic
23 {
29  public abstract class PreresolvingDatumReader<T> : DatumReader<T>
30  {
31  public Schema ReaderSchema { get; private set; }
32  public Schema WriterSchema { get; private set; }
33 
34  protected delegate object ReadItem(object reuse, Decoder dec);
35 
36  // read a specific field from a decoder
37  private delegate object DecoderRead(Decoder dec);
38  // skip specific field(s) from a decoder
39  private delegate void DecoderSkip(Decoder dec);
40  // read & set fields on a record
41  private delegate void FieldReader(object record, Decoder decoder);
42 
43  private readonly ReadItem _reader;
44  private readonly Dictionary<SchemaPair,ReadItem> _recordReaders = new Dictionary<SchemaPair,ReadItem>();
45 
46  protected PreresolvingDatumReader(Schema writerSchema, Schema readerSchema)
47  {
48  ReaderSchema = readerSchema;
49  WriterSchema = writerSchema;
51  throw new AvroException("Schema mismatch. Reader: " + ReaderSchema + ", writer: " + WriterSchema);
52  _reader = ResolveReader(writerSchema, readerSchema);
53  }
54 
55  public T Read(T reuse, Decoder decoder)
56  {
57  return (T)_reader(reuse, decoder);
58  }
59 
60  protected abstract ArrayAccess GetArrayAccess(ArraySchema readerSchema);
61  protected abstract EnumAccess GetEnumAccess(EnumSchema readerSchema);
62  protected abstract MapAccess GetMapAccess(MapSchema readerSchema);
63  protected abstract RecordAccess GetRecordAccess(RecordSchema readerSchema);
64  protected abstract FixedAccess GetFixedAccess(FixedSchema readerSchema);
65 
69  private ReadItem ResolveReader(Schema writerSchema, Schema readerSchema)
70  {
71  if (readerSchema.Tag == Schema.Type.Union && writerSchema.Tag != Schema.Type.Union)
72  {
73  readerSchema = FindBranch(readerSchema as UnionSchema, writerSchema);
74  }
75  switch (writerSchema.Tag)
76  {
77  case Schema.Type.Null:
78  return ReadNull;
79  case Schema.Type.Boolean:
80  return ReadBoolean;
81  case Schema.Type.Int:
82  {
83  switch (readerSchema.Tag)
84  {
85  case Schema.Type.Long:
86  return Read(d => (long) d.ReadInt());
87  case Schema.Type.Float:
88  return Read(d => (float) d.ReadInt());
89  case Schema.Type.Double:
90  return Read(d => (double) d.ReadInt());
91  default:
92  return Read(d => d.ReadInt());
93  }
94  }
95  case Schema.Type.Long:
96  {
97  switch (readerSchema.Tag)
98  {
99  case Schema.Type.Float:
100  return Read(d => (float) d.ReadLong());
101  case Schema.Type.Double:
102  return Read(d => (double) d.ReadLong());
103  default:
104  return Read(d => d.ReadLong());
105  }
106  }
107  case Schema.Type.Float:
108  {
109  switch (readerSchema.Tag)
110  {
111  case Schema.Type.Double:
112  return Read(d => (double) d.ReadFloat());
113  default:
114  return Read(d => d.ReadFloat());
115  }
116  }
117  case Schema.Type.Double:
118  return Read(d => d.ReadDouble());
119  case Schema.Type.String:
120  return Read(d => d.ReadString());
121  case Schema.Type.Bytes:
122  return Read(d => d.ReadBytes());
123  case Schema.Type.Error:
124  case Schema.Type.Record:
125  return ResolveRecord((RecordSchema)writerSchema, (RecordSchema)readerSchema);
126  case Schema.Type.Enumeration:
127  return ResolveEnum((EnumSchema)writerSchema, (EnumSchema)readerSchema);
128  case Schema.Type.Fixed:
129  return ResolveFixed((FixedSchema)writerSchema, (FixedSchema)readerSchema);
130  case Schema.Type.Array:
131  return ResolveArray((ArraySchema)writerSchema, (ArraySchema)readerSchema);
132  case Schema.Type.Map:
133  return ResolveMap((MapSchema)writerSchema, (MapSchema)readerSchema);
134  case Schema.Type.Union:
135  return ResolveUnion((UnionSchema)writerSchema, readerSchema);
136  default:
137  throw new AvroException("Unknown schema type: " + writerSchema);
138  }
139  }
140 
141  private ReadItem ResolveEnum(EnumSchema writerSchema, EnumSchema readerSchema)
142  {
143  var enumAccess = GetEnumAccess(readerSchema);
144 
145  if (readerSchema.Equals(writerSchema))
146  {
147  return (r, d) => enumAccess.CreateEnum(r, d.ReadEnum());
148  }
149 
150  var translator = new int[writerSchema.Symbols.Count];
151 
152  foreach (var symbol in writerSchema.Symbols)
153  {
154  var writerOrdinal = writerSchema.Ordinal(symbol);
155  if (readerSchema.Contains(symbol))
156  {
157  translator[writerOrdinal] = readerSchema.Ordinal(symbol);
158  }
159  else
160  {
161  translator[writerOrdinal] = -1;
162  }
163  }
164 
165  return (r, d) =>
166  {
167  var writerOrdinal = d.ReadEnum();
168  var readerOrdinal = translator[writerOrdinal];
169  if (readerOrdinal == -1)
170  {
171  throw new AvroException("No such symbol: " + writerSchema[writerOrdinal]);
172  }
173  return enumAccess.CreateEnum(r, readerOrdinal);
174  };
175  }
176 
177  private ReadItem ResolveRecord(RecordSchema writerSchema, RecordSchema readerSchema)
178  {
179  var schemaPair = new SchemaPair(writerSchema, readerSchema);
180  ReadItem recordReader;
181 
182  if (_recordReaders.TryGetValue(schemaPair, out recordReader))
183  {
184  return recordReader;
185  }
186 
187  FieldReader[] fieldReaderArray = null;
188  var recordAccess = GetRecordAccess(readerSchema);
189 
190  recordReader = (r, d) => ReadRecord(r, d, recordAccess, fieldReaderArray);
191  _recordReaders.Add(schemaPair, recordReader);
192 
193  var readSteps = new List<FieldReader>();
194 
195  foreach (Field wf in writerSchema)
196  {
197  Field rf;
198  if (readerSchema.TryGetFieldAlias(wf.Name, out rf))
199  {
200  var readItem = ResolveReader(wf.Schema, rf.Schema);
201  if(IsReusable(rf.Schema.Tag))
202  {
203  readSteps.Add((rec,d) => recordAccess.AddField(rec, rf.Name, rf.Pos,
204  readItem(recordAccess.GetField(rec, rf.Name, rf.Pos), d)));
205  }
206  else
207  {
208  readSteps.Add((rec, d) => recordAccess.AddField(rec, rf.Name, rf.Pos,
209  readItem(null, d)));
210  }
211  }
212  else
213  {
214  var skip = GetSkip(wf.Schema);
215  readSteps.Add((rec, d) => skip(d));
216  }
217  }
218 
219  // fill in defaults for any reader fields not in the writer schema
220  foreach (Field rf in readerSchema)
221  {
222  if (writerSchema.Contains(rf.Name)) continue;
223 
224  var defaultStream = new MemoryStream();
225  var defaultEncoder = new BinaryEncoder(defaultStream);
226 
227  defaultStream.Position = 0; // reset for writing
228  Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, rf.DefaultValue);
229  defaultStream.Flush();
230  var defaultBytes = defaultStream.ToArray();
231 
232  var readItem = ResolveReader(rf.Schema, rf.Schema);
233 
234  var rfInstance = rf;
235  if(IsReusable(rf.Schema.Tag))
236  {
237  readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
238  readItem(recordAccess.GetField(rec, rfInstance.Name, rfInstance.Pos),
239  new BinaryDecoder(new MemoryStream( defaultBytes)))));
240  }
241  else
242  {
243  readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
244  readItem(null, new BinaryDecoder(new MemoryStream(defaultBytes)))));
245  }
246  }
247 
248  fieldReaderArray = readSteps.ToArray();
249  return recordReader;
250  }
251 
252  private object ReadRecord(object reuse, Decoder decoder, RecordAccess recordAccess, IEnumerable<FieldReader> readSteps )
253  {
254  var rec = recordAccess.CreateRecord(reuse);
255  foreach (FieldReader fr in readSteps)
256  {
257  fr(rec, decoder);
258  // TODO: on exception, report offending field
259  }
260  return rec;
261  }
262 
263  private ReadItem ResolveUnion(UnionSchema writerSchema, Schema readerSchema)
264  {
265  var lookup = new ReadItem[writerSchema.Count];
266 
267  for (int i = 0; i < writerSchema.Count; i++)
268  {
269  var writerBranch = writerSchema[i];
270 
271  if (readerSchema is UnionSchema)
272  {
273  var unionReader = (UnionSchema) readerSchema;
274  var readerBranch = unionReader.MatchingBranch(writerBranch);
275  if (readerBranch == -1)
276  {
277  lookup[i] = (r, d) => { throw new AvroException( "No matching schema for " + writerBranch + " in " + unionReader ); };
278  }
279  else
280  {
281  lookup[i] = ResolveReader(writerBranch, unionReader[readerBranch]);
282  }
283  }
284  else
285  {
286  if (!readerSchema.CanRead(writerBranch))
287  {
288  lookup[i] = (r, d) => { throw new AvroException( "Schema mismatch Reader: " + ReaderSchema + ", writer: " + WriterSchema ); };
289  }
290  else
291  {
292  lookup[i] = ResolveReader(writerBranch, readerSchema);
293  }
294  }
295  }
296 
297  return (r, d) => ReadUnion(r, d, lookup);
298  }
299 
300  private object ReadUnion(object reuse, Decoder d, ReadItem[] branchLookup)
301  {
302  return branchLookup[d.ReadUnionIndex()](reuse, d);
303  }
304 
305  private ReadItem ResolveMap(MapSchema writerSchema, MapSchema readerSchema)
306  {
307  var rs = readerSchema.ValueSchema;
308  var ws = writerSchema.ValueSchema;
309 
310  var reader = ResolveReader(ws, rs);
311  var mapAccess = GetMapAccess(readerSchema);
312 
313  return (r,d) => ReadMap(r, d, mapAccess, reader);
314  }
315 
316  private object ReadMap(object reuse, Decoder decoder, MapAccess mapAccess, ReadItem valueReader)
317  {
318  object map = mapAccess.Create(reuse);
319 
320  for (int n = (int)decoder.ReadMapStart(); n != 0; n = (int)decoder.ReadMapNext())
321  {
322  mapAccess.AddElements(map, n, valueReader, decoder, false);
323  }
324  return map;
325  }
326 
327  private ReadItem ResolveArray(ArraySchema writerSchema, ArraySchema readerSchema)
328  {
329  var itemReader = ResolveReader(writerSchema.ItemSchema, readerSchema.ItemSchema);
330 
331  var arrayAccess = GetArrayAccess(readerSchema);
332  return (r, d) => ReadArray(r, d, arrayAccess, itemReader, IsReusable(readerSchema.ItemSchema.Tag));
333  }
334 
335  private object ReadArray(object reuse, Decoder decoder, ArrayAccess arrayAccess, ReadItem itemReader, bool itemReusable)
336  {
337  object array = arrayAccess.Create(reuse);
338  int i = 0;
339  for (int n = (int)decoder.ReadArrayStart(); n != 0; n = (int)decoder.ReadArrayNext())
340  {
341  arrayAccess.EnsureSize(ref array, i + n);
342  arrayAccess.AddElements(array, n, i, itemReader, decoder, itemReusable);
343  i += n;
344  }
345  arrayAccess.Resize(ref array, i);
346  return array;
347  }
348 
349  private ReadItem ResolveFixed(FixedSchema writerSchema, FixedSchema readerSchema)
350  {
351  if (readerSchema.Size != writerSchema.Size)
352  {
353  throw new AvroException("Size mismatch between reader and writer fixed schemas. Writer: " + writerSchema +
354  ", reader: " + readerSchema);
355  }
356  var fixedAccess = GetFixedAccess(readerSchema);
357  return (r, d) => ReadFixed(r, d, fixedAccess);
358  }
359 
360  private object ReadFixed(object reuse, Decoder decoder, FixedAccess fixedAccess)
361  {
362  var fixedrec = fixedAccess.CreateFixed(reuse);
363  decoder.ReadFixed(fixedAccess.GetFixedBuffer(fixedrec));
364  return fixedrec;
365  }
366 
367  protected static Schema FindBranch(UnionSchema us, Schema s)
368  {
369  int index = us.MatchingBranch(s);
370  if (index >= 0) return us[index];
371  throw new AvroException("No matching schema for " + s + " in " + us);
372  }
373 
374  private object ReadNull(object reuse, Decoder decoder)
375  {
376  decoder.ReadNull();
377  return null;
378  }
379 
380  private object ReadBoolean(object reuse, Decoder decoder)
381  {
382  return decoder.ReadBoolean();
383  }
384 
385  private ReadItem Read(DecoderRead decoderRead)
386  {
387  return (r, d) => decoderRead(d);
388  }
389 
390  private DecoderSkip GetSkip(Schema writerSchema)
391  {
392  switch (writerSchema.Tag)
393  {
394  case Schema.Type.Null:
395  return d => d.SkipNull();
396  case Schema.Type.Boolean:
397  return d => d.SkipBoolean();
398  case Schema.Type.Int:
399  return d => d.SkipInt();
400  case Schema.Type.Long:
401  return d => d.SkipLong();
402  case Schema.Type.Float:
403  return d => d.SkipFloat();
404  case Schema.Type.Double:
405  return d => d.SkipDouble();
406  case Schema.Type.String:
407  return d => d.SkipString();
408  case Schema.Type.Bytes:
409  return d => d.SkipBytes();
410  case Schema.Type.Error:
411  case Schema.Type.Record:
412  var recordSkips = new List<DecoderSkip>();
413  var recSchema = (RecordSchema)writerSchema;
414  recSchema.Fields.ForEach(r => recordSkips.Add(GetSkip(r.Schema)));
415  return d => recordSkips.ForEach(s=>s(d));
416  case Schema.Type.Enumeration:
417  return d => d.SkipEnum();
418  case Schema.Type.Fixed:
419  var size = ((FixedSchema)writerSchema).Size;
420  return d => d.SkipFixed(size);
421  case Schema.Type.Array:
422  var itemSkip = GetSkip(((ArraySchema)writerSchema).ItemSchema);
423  return d =>
424  {
425  for (long n = d.ReadArrayStart(); n != 0; n = d.ReadArrayNext())
426  {
427  for (long i = 0; i < n; i++) itemSkip(d);
428  }
429  };
430  case Schema.Type.Map:
431  {
432  var valueSkip = GetSkip(((MapSchema)writerSchema).ValueSchema);
433  return d =>
434  {
435  for (long n = d.ReadMapStart(); n != 0; n = d.ReadMapNext())
436  {
437  for (long i = 0; i < n; i++) { d.SkipString(); valueSkip(d); }
438  }
439  };
440  }
441  case Schema.Type.Union:
442  var unionSchema = (UnionSchema)writerSchema;
443  var lookup = new DecoderSkip[unionSchema.Count];
444  for (int i = 0; i < unionSchema.Count; i++)
445  {
446  lookup[i] = GetSkip( unionSchema[i] );
447  }
448  return d => lookup[d.ReadUnionIndex()](d);
449  default:
450  throw new AvroException("Unknown schema type: " + writerSchema);
451  }
452  }
453 
460  protected virtual bool IsReusable(Schema.Type tag)
461  {
462  return true;
463  }
464 
465  // interfaces to handle details of working with Specific vs Generic objects
466 
467  protected interface RecordAccess
468  {
474  object CreateRecord(object reuse);
475 
485  object GetField(object record, string fieldName, int fieldPos);
486 
496  void AddField(object record, string fieldName, int fieldPos, object fieldValue);
497  }
498 
499  protected interface EnumAccess
500  {
501  object CreateEnum(object reuse, int ordinal);
502  }
503 
504  protected interface FixedAccess
505  {
511  object CreateFixed(object reuse);
512 
519  byte[] GetFixedBuffer(object f);
520  }
521 
522  protected interface ArrayAccess
523  {
529  object Create(object reuse);
530 
538  void EnsureSize(ref object array, int targetSize);
539 
546  void Resize(ref object array, int targetSize);
547 
548  void AddElements( object array, int elements, int index, ReadItem itemReader, Decoder decoder, bool reuse );
549  }
550 
551  protected interface MapAccess
552  {
558  object Create(object reuse);
559 
560  void AddElements(object map, int elements, ReadItem itemReader, Decoder decoder, bool reuse);
561  }
562 
563  private class SchemaPair
564  {
565  private Schema _writerSchema;
566  private Schema _readerSchema;
567 
568  public SchemaPair( Schema writerSchema, Schema readerSchema )
569  {
570  _writerSchema = writerSchema;
571  _readerSchema = readerSchema;
572  }
573 
574  protected bool Equals( SchemaPair other )
575  {
576  return Equals( _writerSchema, other._writerSchema ) && Equals( _readerSchema, other._readerSchema );
577  }
578 
579  public override bool Equals( object obj )
580  {
581  if( ReferenceEquals( null, obj ) ) return false;
582  if( ReferenceEquals( this, obj ) ) return true;
583  if( obj.GetType() != this.GetType() ) return false;
584  return Equals( (SchemaPair) obj );
585  }
586 
587  public override int GetHashCode()
588  {
589  unchecked
590  {
591  return ( ( _writerSchema != null ? _writerSchema.GetHashCode() : 0 ) * 397 ) ^ ( _readerSchema != null ? _readerSchema.GetHashCode() : 0 );
592  }
593  }
594  }
595  }
596 }
abstract RecordAccess GetRecordAccess(RecordSchema readerSchema)
T Read(T reuse, Decoder decoder)
Read a datum.
Class for record schemas
Definition: RecordSchema.cs:31
override int GetHashCode()
Hash code function
Definition: Schema.cs:272
void SkipLong()
Skips a long Avro type on the stream.
abstract ArrayAccess GetArrayAccess(ArraySchema readerSchema)
Class for enum type schemas
Definition: EnumSchema.cs:28
void SkipBoolean()
Skips a boolean Avro type on the stream.
bool ReadBoolean()
Read a boolean Avro type
void AddElements(object array, int elements, int index, ReadItem itemReader, Decoder decoder, bool reuse)
static Schema FindBranch(UnionSchema us, Schema s)
void SkipDouble()
Skips a double Avro type on the stream.
object Create(object reuse)
Creates a new array object.
object Create(object reuse)
Creates a new map object.
Base class for all schema types
Definition: Schema.cs:29
void SkipString()
Skips a string Avro type on the stream.
abstract MapAccess GetMapAccess(MapSchema readerSchema)
Class for union schemas
Definition: UnionSchema.cs:29
Class for fixed schemas
Definition: FixedSchema.cs:28
object CreateFixed(object reuse)
Returns a fixed object.
virtual bool CanRead(Schema writerSchema)
Returns true if and only if data written using writerSchema can be read using the current schema acco...
Definition: Schema.cs:283
Type
Enum for schema types
Definition: Schema.cs:34
virtual bool IsReusable(Schema.Type tag)
Indicates if it's possible to reuse an object of the specified type.
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
void Resize(ref object array, int targetSize)
Resizes the array to the new value.
A general purpose reader of data from avro streams.
long ReadArrayStart()
Starts reading the array Avro type.
Write leaf values.
object CreateRecord(object reuse)
Creates a new record object.
long ReadArrayNext()
See ReadArrayStart().
Decoder is used to decode Avro data on a stream.
Definition: Decoder.cs:27
void SkipFloat()
Skips a float Avro type on the stream.
int MatchingBranch(Schema s)
Returns the index of a branch that can read the data written by the given schema s.
Definition: UnionSchema.cs:113
Class for array type schemas
Definition: ArraySchema.cs:27
static void EncodeDefaultValue(Encoder enc, Schema schema, JToken jtok)
Reads the passed JToken default value field and writes it in the specified encoder
Definition: Resolver.cs:33
Class for map schemas
Definition: MapSchema.cs:28
object CreateEnum(object reuse, int ordinal)
void ReadFixed(byte[] buffer)
A convenience method for ReadFixed(buffer, 0, buffer.Length);
int ReadUnionIndex()
Reads the index, which determines the type in an union Avro type.
void EnsureSize(ref object array, int targetSize)
Hint that the array should be able to handle at least targetSize elements.
long ReadMapNext()
See ReadMapStart().
abstract EnumAccess GetEnumAccess(EnumSchema readerSchema)
void SkipInt()
Skips a int Avro type on the stream.
void AddField(object record, string fieldName, int fieldPos, object fieldValue)
Used by the default implementation of ReadRecord() to add a field to a record object.
long ReadMapStart()
Starts reading the map Avro type.
void AddElements(object map, int elements, ReadItem itemReader, Decoder decoder, bool reuse)
abstract FixedAccess GetFixedAccess(FixedSchema readerSchema)
object GetField(object record, string fieldName, int fieldPos)
Used by the default implementation of ReadRecord() to get the existing field of a record object.
delegate object ReadItem(object reuse, Decoder dec)
void SkipNull()
Skips a null Avro type on the stream.
void SkipFixed(int len)
Type Tag
Schema type property
Definition: Schema.cs:56
void ReadNull()
Reads a null Avro type.
byte [] GetFixedBuffer(object f)
Returns a buffer of appropriate size to read data into.
void SkipBytes()
Skips a bytes Avro type on the stream.
PreresolvingDatumReader(Schema writerSchema, Schema readerSchema)
Decoder for Avro binary format