18 using System.Collections.Generic;
37 private delegate
object DecoderRead(
Decoder dec);
39 private delegate
void DecoderSkip(
Decoder dec);
41 private delegate
void FieldReader(
object record,
Decoder decoder);
44 private readonly Dictionary<SchemaPair,ReadItem> _recordReaders =
new Dictionary<SchemaPair,ReadItem>();
52 _reader = ResolveReader(writerSchema, readerSchema);
57 return (T)_reader(reuse, decoder);
75 switch (writerSchema.
Tag)
83 switch (readerSchema.
Tag)
86 return Read(d => (
long) d.ReadInt());
88 return Read(d => (
float) d.ReadInt());
90 return Read(d => (
double) d.ReadInt());
92 return Read(d => d.ReadInt());
95 case Schema.Type.Long:
97 switch (readerSchema.
Tag)
99 case Schema.Type.Float:
100 return Read(d => (
float) d.ReadLong());
101 case Schema.Type.Double:
102 return Read(d => (
double) d.ReadLong());
104 return Read(d => d.ReadLong());
107 case Schema.Type.Float:
109 switch (readerSchema.
Tag)
111 case Schema.Type.Double:
112 return Read(d => (
double) d.ReadFloat());
114 return Read(d => d.ReadFloat());
117 case Schema.Type.Double:
118 return Read(d => d.ReadDouble());
119 case Schema.Type.String:
120 return Read(d => d.ReadString());
121 case Schema.Type.Bytes:
122 return Read(d => d.ReadBytes());
123 case Schema.Type.Error:
124 case Schema.Type.Record:
125 return ResolveRecord((RecordSchema)writerSchema, (RecordSchema)readerSchema);
126 case Schema.Type.Enumeration:
127 return ResolveEnum((EnumSchema)writerSchema, (EnumSchema)readerSchema);
128 case Schema.Type.Fixed:
129 return ResolveFixed((FixedSchema)writerSchema, (FixedSchema)readerSchema);
130 case Schema.Type.Array:
131 return ResolveArray((ArraySchema)writerSchema, (ArraySchema)readerSchema);
132 case Schema.Type.Map:
133 return ResolveMap((MapSchema)writerSchema, (MapSchema)readerSchema);
134 case Schema.Type.Union:
135 return ResolveUnion((UnionSchema)writerSchema, readerSchema);
137 throw new AvroException(
"Unknown schema type: " + writerSchema);
141 private ReadItem ResolveEnum(EnumSchema writerSchema, EnumSchema readerSchema)
145 if (readerSchema.Equals(writerSchema))
147 return (r, d) => enumAccess.CreateEnum(r, d.ReadEnum());
150 var translator =
new int[writerSchema.Symbols.Count];
152 foreach (var symbol
in writerSchema.Symbols)
154 var writerOrdinal = writerSchema.Ordinal(symbol);
155 if (readerSchema.Contains(symbol))
157 translator[writerOrdinal] = readerSchema.Ordinal(symbol);
161 translator[writerOrdinal] = -1;
167 var writerOrdinal = d.ReadEnum();
168 var readerOrdinal = translator[writerOrdinal];
169 if (readerOrdinal == -1)
171 throw new AvroException(
"No such symbol: " + writerSchema[writerOrdinal]);
173 return enumAccess.CreateEnum(r, readerOrdinal);
177 private ReadItem ResolveRecord(RecordSchema writerSchema, RecordSchema readerSchema)
179 var schemaPair =
new SchemaPair(writerSchema, readerSchema);
182 if (_recordReaders.TryGetValue(schemaPair, out recordReader))
187 FieldReader[] fieldReaderArray =
null;
190 recordReader = (r, d) => ReadRecord(r, d, recordAccess, fieldReaderArray);
191 _recordReaders.Add(schemaPair, recordReader);
193 var readSteps =
new List<FieldReader>();
195 foreach (Field wf
in writerSchema)
198 if (readerSchema.TryGetFieldAlias(wf.Name, out rf))
200 var readItem = ResolveReader(wf.Schema, rf.Schema);
203 readSteps.Add((rec,d) => recordAccess.AddField(rec, rf.Name, rf.Pos,
204 readItem(recordAccess.GetField(rec, rf.Name, rf.Pos), d)));
208 readSteps.Add((rec, d) => recordAccess.AddField(rec, rf.Name, rf.Pos,
214 var skip = GetSkip(wf.Schema);
215 readSteps.Add((rec, d) => skip(d));
220 foreach (Field rf
in readerSchema)
222 if (writerSchema.Contains(rf.Name))
continue;
224 var defaultStream =
new MemoryStream();
227 defaultStream.Position = 0;
229 defaultStream.Flush();
230 var defaultBytes = defaultStream.ToArray();
232 var readItem = ResolveReader(rf.Schema, rf.Schema);
237 readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
238 readItem(recordAccess.GetField(rec, rfInstance.Name, rfInstance.Pos),
243 readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
244 readItem(
null,
new BinaryDecoder(
new MemoryStream(defaultBytes)))));
248 fieldReaderArray = readSteps.ToArray();
252 private object ReadRecord(
object reuse,
Decoder decoder, RecordAccess recordAccess, IEnumerable<FieldReader> readSteps )
254 var rec = recordAccess.CreateRecord(reuse);
255 foreach (FieldReader fr
in readSteps)
263 private ReadItem ResolveUnion(UnionSchema writerSchema, Schema readerSchema)
265 var lookup =
new ReadItem[writerSchema.Count];
267 for (
int i = 0; i < writerSchema.Count; i++)
269 var writerBranch = writerSchema[i];
271 if (readerSchema is UnionSchema)
273 var unionReader = (UnionSchema) readerSchema;
274 var readerBranch = unionReader.MatchingBranch(writerBranch);
275 if (readerBranch == -1)
277 lookup[i] = (r, d) => {
throw new AvroException(
"No matching schema for " + writerBranch +
" in " + unionReader ); };
281 lookup[i] = ResolveReader(writerBranch, unionReader[readerBranch]);
286 if (!readerSchema.CanRead(writerBranch))
288 lookup[i] = (r, d) => {
throw new AvroException(
"Schema mismatch Reader: " +
ReaderSchema +
", writer: " +
WriterSchema ); };
292 lookup[i] = ResolveReader(writerBranch, readerSchema);
297 return (r, d) => ReadUnion(r, d, lookup);
300 private object ReadUnion(
object reuse,
Decoder d,
ReadItem[] branchLookup)
305 private ReadItem ResolveMap(MapSchema writerSchema, MapSchema readerSchema)
307 var rs = readerSchema.ValueSchema;
308 var ws = writerSchema.ValueSchema;
310 var reader = ResolveReader(ws, rs);
313 return (r,d) => ReadMap(r, d, mapAccess, reader);
316 private object ReadMap(
object reuse,
Decoder decoder, MapAccess mapAccess,
ReadItem valueReader)
318 object map = mapAccess.Create(reuse);
322 mapAccess.AddElements(map, n, valueReader, decoder,
false);
327 private ReadItem ResolveArray(ArraySchema writerSchema, ArraySchema readerSchema)
329 var itemReader = ResolveReader(writerSchema.ItemSchema, readerSchema.ItemSchema);
332 return (r, d) => ReadArray(r, d, arrayAccess, itemReader,
IsReusable(readerSchema.ItemSchema.Tag));
335 private object ReadArray(
object reuse,
Decoder decoder, ArrayAccess arrayAccess,
ReadItem itemReader,
bool itemReusable)
337 object array = arrayAccess.Create(reuse);
341 arrayAccess.EnsureSize(ref array, i + n);
342 arrayAccess.AddElements(array, n, i, itemReader, decoder, itemReusable);
345 arrayAccess.Resize(ref array, i);
349 private ReadItem ResolveFixed(FixedSchema writerSchema, FixedSchema readerSchema)
351 if (readerSchema.Size != writerSchema.Size)
353 throw new AvroException(
"Size mismatch between reader and writer fixed schemas. Writer: " + writerSchema +
354 ", reader: " + readerSchema);
357 return (r, d) => ReadFixed(r, d, fixedAccess);
360 private object ReadFixed(
object reuse,
Decoder decoder, FixedAccess fixedAccess)
362 var fixedrec = fixedAccess.CreateFixed(reuse);
363 decoder.
ReadFixed(fixedAccess.GetFixedBuffer(fixedrec));
370 if (index >= 0)
return us[index];
371 throw new AvroException(
"No matching schema for " + s +
" in " + us);
374 private object ReadNull(
object reuse,
Decoder decoder)
380 private object ReadBoolean(
object reuse,
Decoder decoder)
387 return (r, d) => decoderRead(d);
390 private DecoderSkip GetSkip(Schema writerSchema)
392 switch (writerSchema.Tag)
394 case Schema.Type.Null:
396 case Schema.Type.Boolean:
398 case Schema.Type.Int:
400 case Schema.Type.Long:
402 case Schema.Type.Float:
404 case Schema.Type.Double:
406 case Schema.Type.String:
408 case Schema.Type.Bytes:
410 case Schema.Type.Error:
411 case Schema.Type.Record:
412 var recordSkips =
new List<DecoderSkip>();
413 var recSchema = (RecordSchema)writerSchema;
414 recSchema.Fields.ForEach(r => recordSkips.Add(GetSkip(r.Schema)));
415 return d => recordSkips.ForEach(s=>s(d));
416 case Schema.Type.Enumeration:
418 case Schema.Type.Fixed:
419 var size = ((FixedSchema)writerSchema).Size;
421 case Schema.Type.Array:
422 var itemSkip = GetSkip(((ArraySchema)writerSchema).ItemSchema);
427 for (
long i = 0; i < n; i++) itemSkip(d);
430 case Schema.Type.Map:
432 var valueSkip = GetSkip(((MapSchema)writerSchema).ValueSchema);
437 for (
long i = 0; i < n; i++) { d.
SkipString(); valueSkip(d); }
441 case Schema.Type.Union:
442 var unionSchema = (UnionSchema)writerSchema;
443 var lookup =
new DecoderSkip[unionSchema.Count];
444 for (
int i = 0; i < unionSchema.Count; i++)
446 lookup[i] = GetSkip( unionSchema[i] );
450 throw new AvroException(
"Unknown schema type: " + writerSchema);
485 object GetField(
object record,
string fieldName,
int fieldPos);
496 void AddField(
object record,
string fieldName,
int fieldPos,
object fieldValue);
529 object Create(
object reuse);
538 void EnsureSize(ref
object array,
int targetSize);
546 void Resize(ref
object array,
int targetSize);
558 object Create(
object reuse);
563 private class SchemaPair
565 private Schema _writerSchema;
566 private Schema _readerSchema;
568 public SchemaPair(
Schema writerSchema,
Schema readerSchema )
570 _writerSchema = writerSchema;
571 _readerSchema = readerSchema;
574 protected bool Equals( SchemaPair other )
576 return Equals( _writerSchema, other._writerSchema ) && Equals( _readerSchema, other._readerSchema );
579 public override bool Equals(
object obj )
581 if( ReferenceEquals(
null, obj ) )
return false;
582 if( ReferenceEquals(
this, obj ) )
return true;
583 if( obj.GetType() != this.GetType() )
return false;
584 return Equals( (SchemaPair) obj );
587 public override int GetHashCode()
591 return ( ( _writerSchema !=
null ? _writerSchema.
GetHashCode() : 0 ) * 397 ) ^ ( _readerSchema !=
null ? _readerSchema.
GetHashCode() : 0 );
abstract RecordAccess GetRecordAccess(RecordSchema readerSchema)
T Read(T reuse, Decoder decoder)
Read a datum.
override int GetHashCode()
Hash code function
void SkipLong()
Skips a long Avro type on the stream.
abstract ArrayAccess GetArrayAccess(ArraySchema readerSchema)
Class for enum type schemas
void SkipBoolean()
Skips a boolean Avro type on the stream.
bool ReadBoolean()
Read a boolean Avro type
void AddElements(object array, int elements, int index, ReadItem itemReader, Decoder decoder, bool reuse)
static Schema FindBranch(UnionSchema us, Schema s)
void SkipDouble()
Skips a double Avro type on the stream.
object Create(object reuse)
Creates a new array object.
object Create(object reuse)
Creates a new map object.
Base class for all schema types
void SkipString()
Skips a string Avro type on the stream.
abstract MapAccess GetMapAccess(MapSchema readerSchema)
object CreateFixed(object reuse)
Returns a fixed object.
virtual bool CanRead(Schema writerSchema)
Returns true if and only if data written using writerSchema can be read using the current schema acco...
Type
Enum for schema types
virtual bool IsReusable(Schema.Type tag)
Indicates if it's possible to reuse an object of the specified type.
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
void Resize(ref object array, int targetSize)
Resizes the array to the new value.
A general purpose reader of data from avro streams.
long ReadArrayStart()
Starts reading the array Avro type.
object CreateRecord(object reuse)
Creates a new record object.
long ReadArrayNext()
See ReadArrayStart().
Decoder is used to decode Avro data on a stream.
void SkipFloat()
Skips a float Avro type on the stream.
int MatchingBranch(Schema s)
Returns the index of a branch that can read the data written by the given schema s.
Class for array type schemas
static void EncodeDefaultValue(Encoder enc, Schema schema, JToken jtok)
Reads the passed JToken default value field and writes it in the specified encoder
object CreateEnum(object reuse, int ordinal)
void ReadFixed(byte[] buffer)
A convenience method for ReadFixed(buffer, 0, buffer.Length);
int ReadUnionIndex()
Reads the index, which determines the type in an union Avro type.
void EnsureSize(ref object array, int targetSize)
Hint that the array should be able to handle at least targetSize elements.
long ReadMapNext()
See ReadMapStart().
abstract EnumAccess GetEnumAccess(EnumSchema readerSchema)
void SkipInt()
Skips a int Avro type on the stream.
void AddField(object record, string fieldName, int fieldPos, object fieldValue)
Used by the default implementation of ReadRecord() to add a field to a record object.
long ReadMapStart()
Starts reading the map Avro type.
void AddElements(object map, int elements, ReadItem itemReader, Decoder decoder, bool reuse)
abstract FixedAccess GetFixedAccess(FixedSchema readerSchema)
object GetField(object record, string fieldName, int fieldPos)
Used by the default implementation of ReadRecord() to get the existing field of a record object.
delegate object ReadItem(object reuse, Decoder dec)
void SkipNull()
Skips a null Avro type on the stream.
Type Tag
Schema type property
void ReadNull()
Reads a null Avro type.
byte [] GetFixedBuffer(object f)
Returns a buffer of appropriate size to read data into.
void SkipBytes()
Skips a bytes Avro type on the stream.
PreresolvingDatumReader(Schema writerSchema, Schema readerSchema)
Decoder for Avro binary format