19 using System.Collections.Generic;
33 private Decoder _decoder, _datumDecoder;
37 private long _blockRemaining;
38 private long _blockSize;
39 private bool _availableBlock;
40 private byte[] _syncBuffer;
41 private long _blockStart;
42 private Stream _stream;
43 private Schema _readerSchema;
53 return OpenReader(
new FileStream(path, FileMode.Open),
null);
63 return OpenReader(
new FileStream(path, FileMode.Open), readerSchema);
83 return OpenReader(inStream, readerSchema, CreateDefaultReader);
94 if (!inStream.CanSeek)
102 inStream.Seek(0, SeekOrigin.Begin);
103 for (
int c = 0; c < magic.Length; c = inStream.Read(magic, c, magic.Length - c)) { }
104 inStream.Seek(0, SeekOrigin.Begin);
114 _readerSchema = readerSchema;
115 _datumReaderFactory = datumReaderFactory;
141 catch (KeyNotFoundException)
161 return System.Text.Encoding.UTF8.GetString(value);
169 public void Seek(
long position)
171 _stream.Position = position;
173 _datumDecoder =
null;
175 _blockStart = position;
178 public void Sync(
long position)
195 if (Enumerable.SequenceEqual(_syncBuffer, _header.
SyncData))
201 catch (Exception) { }
203 _blockStart = _stream.Position;
218 return _stream.Position;
236 if (_blockRemaining == 0)
243 _currentBlock = NextRawBlock(_currentBlock);
245 _datumDecoder =
new BinaryDecoder(_currentBlock.GetDataAsStream());
248 return _blockRemaining != 0;
266 private void Init(Stream stream)
283 if (!firstBytes.SequenceEqual(DataFileConstants.Magic))
284 throw new AvroRuntimeException(
"Not a valid data file!");
292 for (
long i = 0; i < len; i++)
306 _reader = _datumReaderFactory(_header.
Schema, _readerSchema ?? _header.
Schema);
307 _codec = ResolveCodec();
310 private static DatumReader<T> CreateDefaultReader(Schema writerSchema, Schema readerSchema)
313 Type type = typeof(T);
326 private Codec ResolveCodec()
328 return Codec.CreateCodecFromString(
GetMetaString(DataFileConstants.MetaDataCodec));
333 return Next(
default(T));
336 private T
Next(T reuse)
343 T result = _reader.
Read(reuse, _datumDecoder);
344 if (--_blockRemaining == 0)
352 throw new AvroRuntimeException(
string.Format(
"Error fetching next object from block: {0}", e));
356 private void BlockFinished()
358 _blockStart = _stream.Position;
361 private DataBlock NextRawBlock(DataBlock reuse)
364 throw new AvroRuntimeException(
"No data remaining in block!");
366 if (reuse ==
null || reuse.Data.Length < _blockSize)
368 reuse =
new DataBlock(_blockRemaining, _blockSize);
372 reuse.NumberOfEntries = _blockRemaining;
373 reuse.BlockSize = _blockSize;
376 _decoder.
ReadFixed(reuse.Data, 0, (
int)reuse.BlockSize);
379 if (!Enumerable.SequenceEqual(_syncBuffer, _header.
SyncData))
380 throw new AvroRuntimeException(
"Invalid sync!");
382 _availableBlock =
false;
386 private bool DataLeft()
388 long currentPosition = _stream.Position;
389 if (_stream.ReadByte() != -1)
390 _stream.Position = currentPosition;
397 private bool HasNextBlock()
409 _blockRemaining = _decoder.
ReadLong();
411 if (_blockSize > System.Int32.MaxValue || _blockSize < 0)
413 throw new AvroRuntimeException(
"Block size invalid or too large for this " +
414 "implementation: " + _blockSize);
416 _availableBlock =
true;
421 throw new AvroRuntimeException(
string.Format(
"Error ascertaining if data has next block: {0}", e));
Reader wrapper class for reading data and storing into specific classes
string GetMetaString(string key)
Return the string value of a metadata property
string ReadString()
Reads a string Avro type
IEnumerable< T > NextEntries
long Tell()
Return the current position in the input
static IFileReader< T > OpenReader(string path, Schema readerSchema)
Open a reader for a file using path and the reader's schema
Base class for all schema types
static IFileReader< T > OpenReader(Stream inStream)
Open a reader for a stream
static IFileReader< T > OpenReader(Stream inStream, Schema readerSchema)
Open a reader for a stream using the reader's schema
Interface class for generated classes
long GetMetaLong(string key)
Return the long value of a metadata property
long ReadLong()
Reads a long Avro type.
ICollection< string > GetMetaKeys()
Return the list of keys in the metadata
A general purpose reader of data from avro streams.
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
byte [] ReadBytes()
Reads the bytes Avro type
abstract byte [] Decompress(byte[] compressedData)
Decompress data using implemented codec
void Seek(long position)
Move to a specific, known synchronization point, one returned from IFileWriter.Sync() while writing
const string MetaDataSync
void Sync(long position)
Move to the next synchronization point after a position
static IFileReader< T > OpenReader(Stream inStream, Schema readerSchema, CreateDatumReader datumReaderFactory)
Open a reader for a stream using the reader's schema and a custom DatumReader
T Next()
Read the next datum from the file.
Decoder is used to decode Avro data on a stream.
bool HasNext()
True if more entries remain in this file.
bool PastSync(long position)
Return true if past the next synchronization point after a position
static IFileReader< T > OpenReader(string path)
Open a reader for a file using path
Schema GetSchema()
Return the schema as read from the input file / stream
void ReadFixed(byte[] buffer)
A convenience method for ReadFixed(buffer, 0, buffer.Length);
Header GetHeader()
Return the header for the input file / stream
long ReadMapNext()
See ReadMapStart().
long PreviousSync()
Return the last synchronization point before our current position
long ReadMapStart()
Starts reading the map Avro type.
T Read(T reuse, Decoder decoder)
Read a datum.
byte [] GetMeta(string key)
Return the byte value of a metadata property
static Schema Parse(string json)
Parses a given JSON string to create a new schema object
delegate DatumReader< T > CreateDatumReader(Schema writerSchema, Schema readerSchema)
Decoder for Avro binary format