Kinetica   C#   API  Version 7.2.3.0
DataFileReader.cs
Go to the documentation of this file.
1 
18 using System;
19 using System.Collections.Generic;
20 using System.Linq;
21 using System.IO;
22 using Avro.Generic;
23 using Avro.IO;
24 using Avro.Specific;
25 
26 namespace Avro.File
27 {
28  public class DataFileReader<T> : IFileReader<T>
29  {
30  public delegate DatumReader<T> CreateDatumReader(Schema writerSchema, Schema readerSchema);
31 
32  private DatumReader<T> _reader;
33  private Decoder _decoder, _datumDecoder;
34  private Header _header;
35  private Codec _codec;
36  private DataBlock _currentBlock;
37  private long _blockRemaining;
38  private long _blockSize;
39  private bool _availableBlock;
40  private byte[] _syncBuffer;
41  private long _blockStart;
42  private Stream _stream;
43  private Schema _readerSchema;
44  private readonly CreateDatumReader _datumReaderFactory;
45 
51  public static IFileReader<T> OpenReader(string path)
52  {
53  return OpenReader(new FileStream(path, FileMode.Open), null);
54  }
55 
61  public static IFileReader<T> OpenReader(string path, Schema readerSchema)
62  {
63  return OpenReader(new FileStream(path, FileMode.Open), readerSchema);
64  }
65 
71  public static IFileReader<T> OpenReader(Stream inStream)
72  {
73  return OpenReader(inStream, null);
74  }
75 
81  public static IFileReader<T> OpenReader(Stream inStream, Schema readerSchema)
82  {
83  return OpenReader(inStream, readerSchema, CreateDefaultReader);
84  }
85 
86 
92  public static IFileReader<T> OpenReader(Stream inStream, Schema readerSchema, CreateDatumReader datumReaderFactory)
93  {
94  if (!inStream.CanSeek)
95  throw new AvroRuntimeException("Not a valid input stream - must be seekable!");
96 
97  if (inStream.Length < DataFileConstants.Magic.Length)
98  throw new AvroRuntimeException("Not an Avro data file");
99 
100  // verify magic header
101  byte[] magic = new byte[DataFileConstants.Magic.Length];
102  inStream.Seek(0, SeekOrigin.Begin);
103  for (int c = 0; c < magic.Length; c = inStream.Read(magic, c, magic.Length - c)) { }
104  inStream.Seek(0, SeekOrigin.Begin);
105 
106  if (magic.SequenceEqual(DataFileConstants.Magic)) // current format
107  return new DataFileReader<T>(inStream, readerSchema, datumReaderFactory); // (not supporting 1.2 or below, format)
108 
109  throw new AvroRuntimeException("Not an Avro data file");
110  }
111 
112  DataFileReader(Stream stream, Schema readerSchema, CreateDatumReader datumReaderFactory)
113  {
114  _readerSchema = readerSchema;
115  _datumReaderFactory = datumReaderFactory;
116  Init(stream);
117  BlockFinished();
118  }
119 
120  public Header GetHeader()
121  {
122  return _header;
123  }
124 
125  public Schema GetSchema()
126  {
127  return _header.Schema;
128  }
129 
130  public ICollection<string> GetMetaKeys()
131  {
132  return _header.MetaData.Keys;
133  }
134 
135  public byte[] GetMeta(string key)
136  {
137  try
138  {
139  return _header.MetaData[key];
140  }
141  catch (KeyNotFoundException)
142  {
143  return null;
144  }
145  }
146 
147  public long GetMetaLong(string key)
148  {
149  return long.Parse(GetMetaString(key));
150  }
151 
152  public string GetMetaString(string key)
153  {
154  byte[] value = GetMeta(key);
155  if (value == null)
156  {
157  return null;
158  }
159  try
160  {
161  return System.Text.Encoding.UTF8.GetString(value);
162  }
163  catch (Exception e)
164  {
165  throw new AvroRuntimeException(string.Format("Error fetching meta data for key: {0}", key), e);
166  }
167  }
168 
169  public void Seek(long position)
170  {
171  _stream.Position = position;
172  _decoder = new BinaryDecoder(_stream);
173  _datumDecoder = null;
174  _blockRemaining = 0;
175  _blockStart = position;
176  }
177 
178  public void Sync(long position)
179  {
180  Seek(position);
181  // work around an issue where 1.5.4 C stored sync in metadata
182  if ((position == 0) && (GetMeta(DataFileConstants.MetaDataSync) != null))
183  {
184  Init(_stream); // re-init to skip header
185  return;
186  }
187 
188  try
189  {
190  bool done = false;
191 
192  do // read until sync mark matched
193  {
194  _decoder.ReadFixed(_syncBuffer);
195  if (Enumerable.SequenceEqual(_syncBuffer, _header.SyncData))
196  done = true;
197  else
198  _stream.Position = _stream.Position - (DataFileConstants.SyncSize - 1);
199  } while (!done);
200  }
201  catch (Exception) { } // could not find .. default to EOF
202 
203  _blockStart = _stream.Position;
204  }
205 
206  public bool PastSync(long position)
207  {
208  return ((_blockStart >= position + DataFileConstants.SyncSize) || (_blockStart >= _stream.Length));
209  }
210 
211  public long PreviousSync()
212  {
213  return _blockStart;
214  }
215 
216  public long Tell()
217  {
218  return _stream.Position;
219  }
220 
221  public IEnumerable<T> NextEntries
222  {
223  get
224  {
225  while (HasNext())
226  {
227  yield return Next();
228  }
229  }
230  }
231 
232  public bool HasNext()
233  {
234  try
235  {
236  if (_blockRemaining == 0)
237  {
238  // TODO: Check that the (block) stream is not partially read
239  /*if (_datumDecoder != null)
240  { }*/
241  if (HasNextBlock())
242  {
243  _currentBlock = NextRawBlock(_currentBlock);
244  _currentBlock.Data = _codec.Decompress(_currentBlock.Data);
245  _datumDecoder = new BinaryDecoder(_currentBlock.GetDataAsStream());
246  }
247  }
248  return _blockRemaining != 0;
249  }
250  catch (Exception e)
251  {
252  throw new AvroRuntimeException(string.Format("Error fetching next object from block: {0}", e));
253  }
254  }
255 
256  public void Reset()
257  {
258  Init(_stream);
259  }
260 
261  public void Dispose()
262  {
263  _stream.Close();
264  }
265 
266  private void Init(Stream stream)
267  {
268  _stream = stream;
269  _header = new Header();
270  _decoder = new BinaryDecoder(stream);
271  _syncBuffer = new byte[DataFileConstants.SyncSize];
272 
273  // read magic
274  byte[] firstBytes = new byte[DataFileConstants.Magic.Length];
275  try
276  {
277  _decoder.ReadFixed(firstBytes);
278  }
279  catch (Exception e)
280  {
281  throw new AvroRuntimeException("Not a valid data file!", e);
282  }
283  if (!firstBytes.SequenceEqual(DataFileConstants.Magic))
284  throw new AvroRuntimeException("Not a valid data file!");
285 
286  // read meta data
287  long len = _decoder.ReadMapStart();
288  if (len > 0)
289  {
290  do
291  {
292  for (long i = 0; i < len; i++)
293  {
294  string key = _decoder.ReadString();
295  byte[] val = _decoder.ReadBytes();
296  _header.MetaData.Add(key, val);
297  }
298  } while ((len = _decoder.ReadMapNext()) != 0);
299  }
300 
301  // read in sync data
302  _decoder.ReadFixed(_header.SyncData);
303 
304  // parse schema and set codec
305  _header.Schema = Schema.Parse(GetMetaString(DataFileConstants.MetaDataSchema));
306  _reader = _datumReaderFactory(_header.Schema, _readerSchema ?? _header.Schema);
307  _codec = ResolveCodec();
308  }
309 
310  private static DatumReader<T> CreateDefaultReader(Schema writerSchema, Schema readerSchema)
311  {
312  DatumReader<T> reader = null;
313  Type type = typeof(T);
314 
315  if (typeof(ISpecificRecord).IsAssignableFrom(type))
316  {
317  reader = new SpecificReader<T>(writerSchema, readerSchema);
318  }
319  else // generic
320  {
321  reader = new GenericReader<T>(writerSchema, readerSchema);
322  }
323  return reader;
324  }
325 
326  private Codec ResolveCodec()
327  {
328  return Codec.CreateCodecFromString(GetMetaString(DataFileConstants.MetaDataCodec));
329  }
330 
331  public T Next()
332  {
333  return Next(default(T));
334  }
335 
336  private T Next(T reuse)
337  {
338  try
339  {
340  if (!HasNext())
341  throw new AvroRuntimeException("No more datum objects remaining in block!");
342 
343  T result = _reader.Read(reuse, _datumDecoder);
344  if (--_blockRemaining == 0)
345  {
346  BlockFinished();
347  }
348  return result;
349  }
350  catch (Exception e)
351  {
352  throw new AvroRuntimeException(string.Format("Error fetching next object from block: {0}", e));
353  }
354  }
355 
356  private void BlockFinished()
357  {
358  _blockStart = _stream.Position;
359  }
360 
361  private DataBlock NextRawBlock(DataBlock reuse)
362  {
363  if (!HasNextBlock())
364  throw new AvroRuntimeException("No data remaining in block!");
365 
366  if (reuse == null || reuse.Data.Length < _blockSize)
367  {
368  reuse = new DataBlock(_blockRemaining, _blockSize);
369  }
370  else
371  {
372  reuse.NumberOfEntries = _blockRemaining;
373  reuse.BlockSize = _blockSize;
374  }
375 
376  _decoder.ReadFixed(reuse.Data, 0, (int)reuse.BlockSize);
377  _decoder.ReadFixed(_syncBuffer);
378 
379  if (!Enumerable.SequenceEqual(_syncBuffer, _header.SyncData))
380  throw new AvroRuntimeException("Invalid sync!");
381 
382  _availableBlock = false;
383  return reuse;
384  }
385 
386  private bool DataLeft()
387  {
388  long currentPosition = _stream.Position;
389  if (_stream.ReadByte() != -1)
390  _stream.Position = currentPosition;
391  else
392  return false;
393 
394  return true;
395  }
396 
397  private bool HasNextBlock()
398  {
399  try
400  {
401  // block currently being read
402  if (_availableBlock)
403  return true;
404 
405  // check to ensure still data to read
406  if (!DataLeft())
407  return false;
408 
409  _blockRemaining = _decoder.ReadLong(); // read block count
410  _blockSize = _decoder.ReadLong(); // read block size
411  if (_blockSize > System.Int32.MaxValue || _blockSize < 0)
412  {
413  throw new AvroRuntimeException("Block size invalid or too large for this " +
414  "implementation: " + _blockSize);
415  }
416  _availableBlock = true;
417  return true;
418  }
419  catch (Exception e)
420  {
421  throw new AvroRuntimeException(string.Format("Error ascertaining if data has next block: {0}", e));
422  }
423  }
424  }
425 }
Reader wrapper class for reading data and storing into specific classes
string GetMetaString(string key)
Return the string value of a metadata property
string ReadString()
Reads a string Avro type
IEnumerable< T > NextEntries
long Tell()
Return the current position in the input
static IFileReader< T > OpenReader(string path, Schema readerSchema)
Open a reader for a file using path and the reader's schema
Base class for all schema types
Definition: Schema.cs:29
static IFileReader< T > OpenReader(Stream inStream)
Open a reader for a stream
static IFileReader< T > OpenReader(Stream inStream, Schema readerSchema)
Open a reader for a stream using the reader's schema
Interface class for generated classes
long GetMetaLong(string key)
Return the long value of a metadata property
long ReadLong()
Reads a long Avro type.
IDictionary< string, byte[]> MetaData
Definition: Header.cs:30
ICollection< string > GetMetaKeys()
Return the list of keys in the metadata
A general purpose reader of data from avro streams.
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
byte [] ReadBytes()
Reads the bytes Avro type
abstract byte [] Decompress(byte[] compressedData)
Decompress data using implemented codec
void Seek(long position)
Move to a specific, known synchronization point, one returned from IFileWriter.Sync() while writing
void Sync(long position)
Move to the next synchronization point after a position
static IFileReader< T > OpenReader(Stream inStream, Schema readerSchema, CreateDatumReader datumReaderFactory)
Open a reader for a stream using the reader's schema and a custom DatumReader
T Next()
Read the next datum from the file.
Decoder is used to decode Avro data on a stream.
Definition: Decoder.cs:27
bool HasNext()
True if more entries remain in this file.
bool PastSync(long position)
Return true if past the next synchronization point after a position
static IFileReader< T > OpenReader(string path)
Open a reader for a file using path
Schema GetSchema()
Return the schema as read from the input file / stream
void ReadFixed(byte[] buffer)
A convenience method for ReadFixed(buffer, 0, buffer.Length);
Header GetHeader()
Return the header for the input file / stream
long ReadMapNext()
See ReadMapStart().
Schema Schema
Definition: Header.cs:32
long PreviousSync()
Return the last synchronization point before our current position
long ReadMapStart()
Starts reading the map Avro type.
T Read(T reuse, Decoder decoder)
Read a datum.
byte [] GetMeta(string key)
Return the byte value of a metadata property
static Schema Parse(string json)
Parses a given JSON string to create a new schema object
Definition: Schema.cs:141
byte [] SyncData
Definition: Header.cs:31
delegate DatumReader< T > CreateDatumReader(Schema writerSchema, Schema readerSchema)
Decoder for Avro binary format