Kinetica   C#   API  Version 7.2.3.0
DataFileWriter.cs
Go to the documentation of this file.
1 
18 using System;
19 using System.Collections.Generic;
20 using System.Globalization;
21 using System.IO;
22 using Avro.IO;
23 using Avro.Generic;
24 
25 namespace Avro.File
26 {
27  public class DataFileWriter<T> : IFileWriter<T>
28  {
29  private Schema _schema;
30  private Codec _codec;
31  private Stream _stream;
32  private MemoryStream _blockStream;
33  private Encoder _encoder, _blockEncoder;
34  private DatumWriter<T> _writer;
35 
36  private byte[] _syncData;
37  private bool _isOpen;
38  private bool _headerWritten;
39  private int _blockCount;
40  private int _syncInterval;
41  private IDictionary<string, byte[]> _metaData;
42 
50  public static IFileWriter<T> OpenWriter(DatumWriter<T> writer, string path)
51  {
52  return OpenWriter(writer, new FileStream(path, FileMode.Create), Codec.CreateCodec(Codec.Type.Null));
53  }
54 
62  public static IFileWriter<T> OpenWriter(DatumWriter<T> writer, Stream outStream)
63  {
64  return OpenWriter(writer, outStream, Codec.CreateCodec(Codec.Type.Null));
65  }
66 
75  public static IFileWriter<T> OpenWriter(DatumWriter<T> writer, string path, Codec codec)
76  {
77  return OpenWriter(writer, new FileStream(path, FileMode.Create), codec);
78  }
79 
88  public static IFileWriter<T> OpenWriter(DatumWriter<T> writer, Stream outStream, Codec codec)
89  {
90  return new DataFileWriter<T>(writer).Create(writer.Schema, outStream, codec);
91  }
92 
94  {
95  _writer = writer;
96  _syncInterval = DataFileConstants.DefaultSyncInterval;
97  }
98 
99  public bool IsReservedMeta(string key)
100  {
101  return key.StartsWith(DataFileConstants.MetaDataReserved);
102  }
103 
104  public void SetMeta(String key, byte[] value)
105  {
106  if (IsReservedMeta(key))
107  {
108  throw new AvroRuntimeException("Cannot set reserved meta key: " + key);
109  }
110  _metaData.Add(key, value);
111  }
112 
113  public void SetMeta(String key, long value)
114  {
115  try
116  {
117  SetMeta(key, GetByteValue(value.ToString(CultureInfo.InvariantCulture)));
118  }
119  catch (Exception e)
120  {
121  throw new AvroRuntimeException(e.Message, e);
122  }
123  }
124 
125  public void SetMeta(String key, string value)
126  {
127  try
128  {
129  SetMeta(key, GetByteValue(value));
130  }
131  catch (Exception e)
132  {
133  throw new AvroRuntimeException(e.Message, e);
134  }
135  }
136 
137  public void SetSyncInterval(int syncInterval)
138  {
139  if (syncInterval < 32 || syncInterval > (1 << 30))
140  {
141  throw new AvroRuntimeException("Invalid sync interval value: " + syncInterval);
142  }
143  _syncInterval = syncInterval;
144  }
145 
146  public void Append(T datum)
147  {
148  AssertOpen();
149  EnsureHeader();
150 
151  long usedBuffer = _blockStream.Position;
152 
153  try
154  {
155  _writer.Write(datum, _blockEncoder);
156  }
157  catch (Exception e)
158  {
159  _blockStream.Position = usedBuffer;
160  throw new AvroRuntimeException("Error appending datum to writer", e);
161  }
162  _blockCount++;
163  WriteIfBlockFull();
164  }
165 
166  private void EnsureHeader()
167  {
168  if (!_headerWritten)
169  {
170  WriteHeader();
171  _headerWritten = true;
172  }
173  }
174 
175  public void Flush()
176  {
177  EnsureHeader();
178  Sync();
179  }
180 
181  public long Sync()
182  {
183  AssertOpen();
184  WriteBlock();
185  return _stream.Position;
186  }
187 
188  public void Close()
189  {
190  EnsureHeader();
191  Flush();
192  _stream.Flush();
193  _stream.Close();
194  _isOpen = false;
195  }
196 
197  private void WriteHeader()
198  {
200  WriteMetaData();
201  WriteSyncData();
202  }
203 
204  private void Init()
205  {
206  _blockCount = 0;
207  _encoder = new BinaryEncoder(_stream);
208  _blockStream = new MemoryStream();
209  _blockEncoder = new BinaryEncoder(_blockStream);
210 
211  if (_codec == null)
212  _codec = Codec.CreateCodec(Codec.Type.Null);
213 
214  _isOpen = true;
215  }
216 
217  private void AssertOpen()
218  {
219  if (!_isOpen) throw new AvroRuntimeException("Cannot complete operation: avro file/stream not open");
220  }
221 
222  private IFileWriter<T> Create(Schema schema, Stream outStream, Codec codec)
223  {
224  _codec = codec;
225  _stream = outStream;
226  _metaData = new Dictionary<string, byte[]>();
227  _schema = schema;
228 
229  Init();
230 
231  return this;
232  }
233 
234  private void WriteMetaData()
235  {
236  // Add sync, code & schema to metadata
237  GenerateSyncData();
238  //SetMetaInternal(DataFileConstants.MetaDataSync, _syncData); - Avro 1.5.4 C
239  SetMetaInternal(DataFileConstants.MetaDataCodec, GetByteValue(_codec.GetName()));
240  SetMetaInternal(DataFileConstants.MetaDataSchema, GetByteValue(_schema.ToString()));
241 
242  // write metadata
243  int size = _metaData.Count;
244  _encoder.WriteInt(size);
245 
246  foreach (KeyValuePair<String, byte[]> metaPair in _metaData)
247  {
248  _encoder.WriteString(metaPair.Key);
249  _encoder.WriteBytes(metaPair.Value);
250  }
251  _encoder.WriteMapEnd();
252  }
253 
254  private void WriteIfBlockFull()
255  {
256  if (BufferInUse() >= _syncInterval)
257  WriteBlock();
258  }
259 
260  private long BufferInUse()
261  {
262  return _blockStream.Position;
263  }
264 
265  private void WriteBlock()
266  {
267  if (_blockCount > 0)
268  {
269  byte[] dataToWrite = _blockStream.ToArray();
270 
271  // write count
272  _encoder.WriteLong(_blockCount);
273 
274  // write data
275  _encoder.WriteBytes(_codec.Compress(dataToWrite));
276 
277  // write sync marker
278  _encoder.WriteFixed(_syncData);
279 
280  // reset / re-init block
281  _blockCount = 0;
282  _blockStream = new MemoryStream();
283  _blockEncoder = new BinaryEncoder(_blockStream);
284  }
285  }
286 
287  private void WriteSyncData()
288  {
289  _encoder.WriteFixed(_syncData);
290  }
291 
292  private void GenerateSyncData()
293  {
294  _syncData = new byte[16];
295 
296  Random random = new Random();
297  random.NextBytes(_syncData);
298  }
299 
300  private void SetMetaInternal(string key, byte[] value)
301  {
302  _metaData.Add(key, value);
303  }
304 
305  private byte[] GetByteValue(string value)
306  {
307  return System.Text.Encoding.UTF8.GetBytes(value);
308  }
309 
310  public void Dispose()
311  {
312  Close();
313  }
314  }
315 }
static IFileWriter< T > OpenWriter(DatumWriter< T > writer, Stream outStream, Codec codec)
Open a new writer instance to write to an output stream with a specified codec
static IFileWriter< T > OpenWriter(DatumWriter< T > writer, string path, Codec codec)
Open a new writer instance to write
static IFileWriter< T > OpenWriter(DatumWriter< T > writer, Stream outStream)
Open a new writer instance to write
void WriteBytes(byte[] value)
void WriteInt(int value)
Base class for all schema types
Definition: Schema.cs:29
abstract string GetName()
Name of this codec type
void WriteLong(long value)
void Append(T datum)
Append datum to a file / stream
void Flush()
Flush out any buffered data
long Sync()
Forces the end of the current block, emitting a synchronization marker
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
void WriteMapEnd()
Write leaf values.
void SetMeta(String key, string value)
Set meta data pair (string value)
void WriteFixed(byte[] data)
void Close()
Closes the file / stream
void SetMeta(String key, byte[] value)
Set meta data pair
static IFileWriter< T > OpenWriter(DatumWriter< T > writer, string path)
Open a new writer instance to write
Type
Codec types
Definition: Codec.cs:65
void SetSyncInterval(int syncInterval)
Set the synchronization interval for this file / stream, in bytes.
bool IsReservedMeta(string key)
Returns true if parameter is a reserved Avro meta data value.
void WriteString(string value)
abstract byte [] Compress(byte[] uncompressedData)
Compress data using implemented codec
void Write(T datum, Encoder encoder)
static Codec CreateCodec(Type codecType)
Factory method to return child codec instance based on Codec.Type
Definition: Codec.cs:78
void SetMeta(String key, long value)
Set meta data pair (long value)