18 using System.Collections.Generic;
    37         private delegate 
object DecoderRead(
Decoder dec);
    39         private delegate 
void DecoderSkip(
Decoder dec);
    41         private delegate 
void FieldReader(
object record, 
Decoder decoder);
    44         private readonly Dictionary<SchemaPair,ReadItem> _recordReaders = 
new Dictionary<SchemaPair,ReadItem>();
    52             _reader = ResolveReader(writerSchema, readerSchema);
    57             return (T)_reader(reuse, decoder);
    75             switch (writerSchema.
Tag)
    83                         switch (readerSchema.
Tag)
    86                                 return Read(d => (
long) d.ReadInt());
    88                                 return Read(d => (
float) d.ReadInt());
    90                                 return Read(d => (
double) d.ReadInt());
    92                                 return Read(d => d.ReadInt());
    95                 case Schema.Type.Long:
    97                         switch (readerSchema.
Tag)
    99                             case Schema.Type.Float:
   100                                 return Read(d => (
float) d.ReadLong());
   101                             case Schema.Type.Double:
   102                                 return Read(d => (
double) d.ReadLong());
   104                                 return Read(d => d.ReadLong());
   107                 case Schema.Type.Float:
   109                         switch (readerSchema.
Tag)
   111                             case Schema.Type.Double:
   112                                 return Read(d => (
double) d.ReadFloat());
   114                                 return Read(d => d.ReadFloat());
   117                 case Schema.Type.Double:
   118                     return Read(d => d.ReadDouble());
   119                 case Schema.Type.String:
   120                     return Read(d => d.ReadString());
   121                 case Schema.Type.Bytes:
   122                     return Read(d => d.ReadBytes());
   123                 case Schema.Type.Error:
   124                 case Schema.Type.Record:
   125                     return ResolveRecord((RecordSchema)writerSchema, (RecordSchema)readerSchema);
   126                 case Schema.Type.Enumeration:
   127                     return ResolveEnum((EnumSchema)writerSchema, (EnumSchema)readerSchema);
   128                 case Schema.Type.Fixed:
   129                     return ResolveFixed((FixedSchema)writerSchema, (FixedSchema)readerSchema);
   130                 case Schema.Type.Array:
   131                     return ResolveArray((ArraySchema)writerSchema, (ArraySchema)readerSchema);
   132                 case Schema.Type.Map:
   133                     return ResolveMap((MapSchema)writerSchema, (MapSchema)readerSchema);
   134                 case Schema.Type.Union:
   135                     return ResolveUnion((UnionSchema)writerSchema, readerSchema);
   137                     throw new AvroException(
"Unknown schema type: " + writerSchema);
   141         private ReadItem ResolveEnum(EnumSchema writerSchema, EnumSchema readerSchema)
   145             if (readerSchema.Equals(writerSchema))
   147                 return (r, d) => enumAccess.CreateEnum(r, d.ReadEnum());
   150             var translator = 
new int[writerSchema.Symbols.Count];
   152             foreach (var symbol 
in writerSchema.Symbols)
   154                 var writerOrdinal = writerSchema.Ordinal(symbol);
   155                 if (readerSchema.Contains(symbol))
   157                     translator[writerOrdinal] = readerSchema.Ordinal(symbol);
   161                     translator[writerOrdinal] = -1;
   167                             var writerOrdinal = d.ReadEnum();
   168                             var readerOrdinal = translator[writerOrdinal];
   169                             if (readerOrdinal == -1)
   171                                 throw new AvroException(
"No such symbol: " + writerSchema[writerOrdinal]);
   173                             return enumAccess.CreateEnum(r, readerOrdinal);
   177         private ReadItem ResolveRecord(RecordSchema writerSchema, RecordSchema readerSchema)
   179             var schemaPair = 
new SchemaPair(writerSchema, readerSchema);
   182             if (_recordReaders.TryGetValue(schemaPair, out recordReader))
   187             FieldReader[] fieldReaderArray = 
null;
   190             recordReader = (r, d) => ReadRecord(r, d, recordAccess, fieldReaderArray);
   191             _recordReaders.Add(schemaPair, recordReader);
   193             var readSteps = 
new List<FieldReader>();
   195             foreach (Field wf 
in writerSchema)
   198                 if (readerSchema.TryGetFieldAlias(wf.Name, out rf))
   200                     var readItem = ResolveReader(wf.Schema, rf.Schema);
   203                         readSteps.Add((rec,d) => recordAccess.AddField(rec, rf.Name, rf.Pos,
   204                             readItem(recordAccess.GetField(rec, rf.Name, rf.Pos), d)));
   208                         readSteps.Add((rec, d) => recordAccess.AddField(rec, rf.Name, rf.Pos,
   214                     var skip = GetSkip(wf.Schema);
   215                     readSteps.Add((rec, d) => skip(d));
   220             foreach (Field rf 
in readerSchema)
   222                 if (writerSchema.Contains(rf.Name)) 
continue;
   224                 var defaultStream = 
new MemoryStream();
   227                 defaultStream.Position = 0; 
   229                 defaultStream.Flush();
   230                 var defaultBytes = defaultStream.ToArray();
   232                 var readItem = ResolveReader(rf.Schema, rf.Schema);
   237                     readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
   238                         readItem(recordAccess.GetField(rec, rfInstance.Name, rfInstance.Pos),
   243                     readSteps.Add((rec, d) => recordAccess.AddField(rec, rfInstance.Name, rfInstance.Pos,
   244                         readItem(
null, 
new BinaryDecoder(
new MemoryStream(defaultBytes)))));
   248             fieldReaderArray = readSteps.ToArray();
   252         private object ReadRecord(
object reuse, 
Decoder decoder, RecordAccess recordAccess, IEnumerable<FieldReader> readSteps )
   254             var rec = recordAccess.CreateRecord(reuse);
   255             foreach (FieldReader fr 
in readSteps)
   263         private ReadItem ResolveUnion(UnionSchema writerSchema, Schema readerSchema)
   265             var lookup = 
new ReadItem[writerSchema.Count];
   267             for (
int i = 0; i < writerSchema.Count; i++)
   269                 var writerBranch = writerSchema[i];
   271                 if (readerSchema is UnionSchema)
   273                     var unionReader = (UnionSchema) readerSchema;
   274                     var readerBranch = unionReader.MatchingBranch(writerBranch);
   275                     if (readerBranch == -1)
   277                         lookup[i] = (r, d) => { 
throw new AvroException( 
"No matching schema for " + writerBranch + 
" in " + unionReader ); };
   281                         lookup[i] = ResolveReader(writerBranch, unionReader[readerBranch]);
   286                     if (!readerSchema.CanRead(writerBranch))
   288                         lookup[i] = (r, d) => { 
throw new AvroException( 
"Schema mismatch Reader: " + 
ReaderSchema + 
", writer: " + 
WriterSchema ); };
   292                         lookup[i] = ResolveReader(writerBranch, readerSchema);
   297             return (r, d) => ReadUnion(r, d, lookup);
   300         private object ReadUnion(
object reuse, 
Decoder d, 
ReadItem[] branchLookup)
   305         private ReadItem ResolveMap(MapSchema writerSchema, MapSchema readerSchema)
   307             var rs = readerSchema.ValueSchema;
   308             var ws = writerSchema.ValueSchema;
   310             var reader = ResolveReader(ws, rs);
   313             return (r,d) => ReadMap(r, d, mapAccess, reader);
   316         private object ReadMap(
object reuse, 
Decoder decoder, MapAccess mapAccess, 
ReadItem valueReader)
   318             object map = mapAccess.Create(reuse);
   322                 mapAccess.AddElements(map, n, valueReader, decoder, 
false);
   327         private ReadItem ResolveArray(ArraySchema writerSchema, ArraySchema readerSchema)
   329             var itemReader = ResolveReader(writerSchema.ItemSchema, readerSchema.ItemSchema);
   332             return (r, d) => ReadArray(r, d, arrayAccess, itemReader, 
IsReusable(readerSchema.ItemSchema.Tag));
   335         private object ReadArray(
object reuse, 
Decoder decoder, ArrayAccess arrayAccess, 
ReadItem itemReader, 
bool itemReusable)
   337             object array = arrayAccess.Create(reuse);
   341                 arrayAccess.EnsureSize(ref array, i + n);
   342                 arrayAccess.AddElements(array, n, i, itemReader, decoder, itemReusable);
   345             arrayAccess.Resize(ref array, i);
   349         private ReadItem ResolveFixed(FixedSchema writerSchema, FixedSchema readerSchema)
   351             if (readerSchema.Size != writerSchema.Size)
   353                 throw new AvroException(
"Size mismatch between reader and writer fixed schemas. Writer: " + writerSchema +
   354                     ", reader: " + readerSchema);
   357             return (r, d) => ReadFixed(r, d, fixedAccess);
   360         private object ReadFixed(
object reuse, 
Decoder decoder, FixedAccess fixedAccess)
   362             var fixedrec = fixedAccess.CreateFixed(reuse);
   363             decoder.
ReadFixed(fixedAccess.GetFixedBuffer(fixedrec));
   370             if (index >= 0) 
return us[index];
   371             throw new AvroException(
"No matching schema for " + s + 
" in " + us);
   374         private object ReadNull(
object reuse, 
Decoder decoder)
   380         private object ReadBoolean(
object reuse, 
Decoder decoder)
   387             return (r, d) => decoderRead(d);
   390         private DecoderSkip GetSkip(Schema writerSchema)
   392             switch (writerSchema.Tag)
   394                 case Schema.Type.Null:
   396                 case Schema.Type.Boolean:
   398                 case Schema.Type.Int:
   400                 case Schema.Type.Long:
   402                 case Schema.Type.Float:
   404                 case Schema.Type.Double:
   406                 case Schema.Type.String:
   408                 case Schema.Type.Bytes:
   410                 case Schema.Type.Error:
   411                 case Schema.Type.Record:
   412                     var recordSkips = 
new List<DecoderSkip>();
   413                     var recSchema = (RecordSchema)writerSchema;
   414                     recSchema.Fields.ForEach(r => recordSkips.Add(GetSkip(r.Schema)));
   415                     return d => recordSkips.ForEach(s=>s(d));
   416                 case Schema.Type.Enumeration:
   418                 case Schema.Type.Fixed:
   419                     var size = ((FixedSchema)writerSchema).Size;
   421                 case Schema.Type.Array:
   422                     var itemSkip = GetSkip(((ArraySchema)writerSchema).ItemSchema);
   427                             for (
long i = 0; i < n; i++) itemSkip(d);
   430                 case Schema.Type.Map:
   432                         var valueSkip = GetSkip(((MapSchema)writerSchema).ValueSchema);
   437                                 for (
long i = 0; i < n; i++) { d.
SkipString(); valueSkip(d); }
   441                 case Schema.Type.Union:
   442                     var unionSchema = (UnionSchema)writerSchema;
   443                     var lookup = 
new DecoderSkip[unionSchema.Count];
   444                     for (
int i = 0; i < unionSchema.Count; i++)
   446                         lookup[i] = GetSkip( unionSchema[i] );
   450                     throw new AvroException(
"Unknown schema type: " + writerSchema);
   485             object GetField(
object record, 
string fieldName, 
int fieldPos);
   496             void AddField(
object record, 
string fieldName, 
int fieldPos, 
object fieldValue);
   529             object Create(
object reuse);
   538             void EnsureSize(ref 
object array, 
int targetSize);
   546             void Resize(ref 
object array, 
int targetSize);
   558             object Create(
object reuse);
   563         private class SchemaPair
   565             private Schema _writerSchema;
   566             private Schema _readerSchema;
   568             public SchemaPair( 
Schema writerSchema, 
Schema readerSchema )
   570                 _writerSchema = writerSchema;
   571                 _readerSchema = readerSchema;
   574             protected bool Equals( SchemaPair other )
   576                 return Equals( _writerSchema, other._writerSchema ) && Equals( _readerSchema, other._readerSchema );
   579             public override bool Equals( 
object obj )
   581                 if( ReferenceEquals( 
null, obj ) ) 
return false;
   582                 if( ReferenceEquals( 
this, obj ) ) 
return true;
   583                 if( obj.GetType() != this.GetType() ) 
return false;
   584                 return Equals( (SchemaPair) obj );
   587             public override int GetHashCode()
   591                     return ( ( _writerSchema != 
null ? _writerSchema.
GetHashCode() : 0 ) * 397 ) ^ ( _readerSchema != 
null ? _readerSchema.
GetHashCode() : 0 );
 abstract RecordAccess GetRecordAccess(RecordSchema readerSchema)
 
T Read(T reuse, Decoder decoder)
Read a datum.
 
override int GetHashCode()
Hash code function
 
void SkipLong()
Skips a long Avro type on the stream.
 
abstract ArrayAccess GetArrayAccess(ArraySchema readerSchema)
 
Class for enum type schemas
 
void SkipBoolean()
Skips a boolean Avro type on the stream.
 
bool ReadBoolean()
Read a boolean Avro type
 
void AddElements(object array, int elements, int index, ReadItem itemReader, Decoder decoder, bool reuse)
 
static Schema FindBranch(UnionSchema us, Schema s)
 
void SkipDouble()
Skips a double Avro type on the stream.
 
object Create(object reuse)
Creates a new array object.
 
object Create(object reuse)
Creates a new map object.
 
Base class for all schema types
 
void SkipString()
Skips a string Avro type on the stream.
 
abstract MapAccess GetMapAccess(MapSchema readerSchema)
 
object CreateFixed(object reuse)
Returns a fixed object.
 
virtual bool CanRead(Schema writerSchema)
Returns true if and only if data written using writerSchema can be read using the current schema acco...
 
Type
Enum for schema types
 
virtual bool IsReusable(Schema.Type tag)
Indicates if it's possible to reuse an object of the specified type.
 
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
 
void Resize(ref object array, int targetSize)
Resizes the array to the new value.
 
A general purpose reader of data from avro streams.
 
long ReadArrayStart()
Starts reading the array Avro type.
 
object CreateRecord(object reuse)
Creates a new record object.
 
long ReadArrayNext()
See ReadArrayStart().
 
Decoder is used to decode Avro data on a stream.
 
void SkipFloat()
Skips a float Avro type on the stream.
 
int MatchingBranch(Schema s)
Returns the index of a branch that can read the data written by the given schema s.
 
Class for array type schemas
 
static void EncodeDefaultValue(Encoder enc, Schema schema, JToken jtok)
Reads the passed JToken default value field and writes it in the specified encoder
 
object CreateEnum(object reuse, int ordinal)
 
void ReadFixed(byte[] buffer)
A convenience method for ReadFixed(buffer, 0, buffer.Length);
 
int ReadUnionIndex()
Reads the index, which determines the type in an union Avro type.
 
void EnsureSize(ref object array, int targetSize)
Hint that the array should be able to handle at least targetSize elements.
 
long ReadMapNext()
See ReadMapStart().
 
abstract EnumAccess GetEnumAccess(EnumSchema readerSchema)
 
void SkipInt()
Skips a int Avro type on the stream.
 
void AddField(object record, string fieldName, int fieldPos, object fieldValue)
Used by the default implementation of ReadRecord() to add a field to a record object.
 
long ReadMapStart()
Starts reading the map Avro type.
 
void AddElements(object map, int elements, ReadItem itemReader, Decoder decoder, bool reuse)
 
abstract FixedAccess GetFixedAccess(FixedSchema readerSchema)
 
object GetField(object record, string fieldName, int fieldPos)
Used by the default implementation of ReadRecord() to get the existing field of a record object.
 
delegate object ReadItem(object reuse, Decoder dec)
 
void SkipNull()
Skips a null Avro type on the stream.
 
Type Tag
Schema type property
 
void ReadNull()
Reads a null Avro type.
 
byte [] GetFixedBuffer(object f)
Returns a buffer of appropriate size to read data into.
 
void SkipBytes()
Skips a bytes Avro type on the stream.
 
PreresolvingDatumReader(Schema writerSchema, Schema readerSchema)
 
Decoder for Avro binary format