19 using System.Collections.Generic;
20 using System.Globalization;
31 private Stream _stream;
32 private MemoryStream _blockStream;
33 private Encoder _encoder, _blockEncoder;
36 private byte[] _syncData;
38 private bool _headerWritten;
39 private int _blockCount;
40 private int _syncInterval;
41 private IDictionary<string, byte[]> _metaData;
77 return OpenWriter(writer,
new FileStream(path, FileMode.Create), codec);
110 _metaData.Add(key, value);
117 SetMeta(key, GetByteValue(value.ToString(CultureInfo.InvariantCulture)));
129 SetMeta(key, GetByteValue(value));
139 if (syncInterval < 32 || syncInterval > (1 << 30))
143 _syncInterval = syncInterval;
151 long usedBuffer = _blockStream.Position;
155 _writer.
Write(datum, _blockEncoder);
159 _blockStream.Position = usedBuffer;
166 private void EnsureHeader()
171 _headerWritten =
true;
185 return _stream.Position;
197 private void WriteHeader()
208 _blockStream =
new MemoryStream();
217 private void AssertOpen()
219 if (!_isOpen)
throw new AvroRuntimeException(
"Cannot complete operation: avro file/stream not open");
222 private IFileWriter<T> Create(Schema schema, Stream outStream, Codec codec)
226 _metaData =
new Dictionary<string, byte[]>();
234 private void WriteMetaData()
239 SetMetaInternal(DataFileConstants.MetaDataCodec, GetByteValue(_codec.
GetName()));
240 SetMetaInternal(DataFileConstants.MetaDataSchema, GetByteValue(_schema.ToString()));
243 int size = _metaData.Count;
246 foreach (KeyValuePair<String,
byte[]> metaPair
in _metaData)
254 private void WriteIfBlockFull()
256 if (BufferInUse() >= _syncInterval)
260 private long BufferInUse()
262 return _blockStream.Position;
265 private void WriteBlock()
269 byte[] dataToWrite = _blockStream.ToArray();
282 _blockStream =
new MemoryStream();
287 private void WriteSyncData()
292 private void GenerateSyncData()
294 _syncData =
new byte[16];
296 Random random =
new Random();
297 random.NextBytes(_syncData);
300 private void SetMetaInternal(
string key,
byte[] value)
302 _metaData.Add(key, value);
305 private byte[] GetByteValue(
string value)
307 return System.Text.Encoding.UTF8.GetBytes(value);
static IFileWriter< T > OpenWriter(DatumWriter< T > writer, Stream outStream, Codec codec)
Open a new writer instance to write to an output stream with a specified codec
static IFileWriter< T > OpenWriter(DatumWriter< T > writer, string path, Codec codec)
Open a new writer instance to write
static IFileWriter< T > OpenWriter(DatumWriter< T > writer, Stream outStream)
Open a new writer instance to write
void WriteBytes(byte[] value)
Base class for all schema types
abstract string GetName()
Name of this codec type
void WriteLong(long value)
void Append(T datum)
Append datum to a file / stream
void Flush()
Flush out any buffered data
long Sync()
Forces the end of the current block, emitting a synchronization marker
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
const string MetaDataReserved
void SetMeta(String key, string value)
Set meta data pair (string value)
void WriteFixed(byte[] data)
void Close()
Closes the file / stream
void SetMeta(String key, byte[] value)
Set meta data pair
static IFileWriter< T > OpenWriter(DatumWriter< T > writer, string path)
Open a new writer instance to write
void SetSyncInterval(int syncInterval)
Set the synchronization interval for this file / stream, in bytes.
bool IsReservedMeta(string key)
Returns true if parameter is a reserved Avro meta data value.
const int DefaultSyncInterval
void WriteString(string value)
abstract byte [] Compress(byte[] uncompressedData)
Compress data using implemented codec
void Write(T datum, Encoder encoder)
static Codec CreateCodec(Type codecType)
Factory method to return child codec instance based on Codec.Type
void SetMeta(String key, long value)
Set meta data pair (long value)