Kinetica   C#   API  Version 7.2.3.1
CsvFileReader.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 using System.Globalization;
4 using System.IO;
5 using System.Linq;
6 using System.Text;
7 using System.Threading;
8 using System.Threading.Tasks;
9 using Avro;
10 using kinetica;
11 using kinetica.Records;
12 using Microsoft.Extensions.FileSystemGlobbing;
13 using Microsoft.Extensions.FileSystemGlobbing.Abstractions;
14 
15 namespace KineticaAdo
16 {
21  public class CsvFileReader
22  {
23  private readonly Kinetica _kinetica;
24  private readonly InsertFromFileInfo _fileInfo;
25  private readonly KineticaType? _tableType;
26  private readonly List<string> _columnNames;
27  private readonly Dictionary<string, int> _columnIndexMap;
28 
32  public long RecordsInserted { get; private set; }
33 
37  public long RecordsSkipped { get; private set; }
38 
42  public List<string> Warnings { get; } = new();
43 
50  public CsvFileReader(Kinetica kinetica, InsertFromFileInfo fileInfo, KineticaType? tableType = null)
51  {
52  _kinetica = kinetica ?? throw new ArgumentNullException(nameof(kinetica));
53  _fileInfo = fileInfo ?? throw new ArgumentNullException(nameof(fileInfo));
54  _tableType = tableType;
55  _columnNames = new List<string>();
56  _columnIndexMap = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);
57  }
58 
63  public async Task<long> ReadAndInsertAsync(CancellationToken cancellationToken = default)
64  {
65  // If it's a KiFS path, use server-side import (supports all formats)
66  if (_fileInfo.IsKifsPath)
67  {
68  return await ImportViaServerApiAsync(cancellationToken).ConfigureAwait(false);
69  }
70 
71  // For non-delimited formats (Parquet, JSON, Avro, Shapefile), use server-side API
72  // These formats require the file to be uploaded to KiFS first or use InsertRecordsFromFiles
73  if (_fileInfo.Options.Format != FileFormat.DelimitedText &&
74  _fileInfo.Options.Format != FileFormat.Auto)
75  {
76  return await ImportViaServerApiAsync(cancellationToken).ConfigureAwait(false);
77  }
78 
79  // Get table type if not provided
80  var tableType = _tableType ?? await Task.Run(
81  () => KineticaType.fromTable(_kinetica, _fileInfo.TableName),
82  cancellationToken).ConfigureAwait(false);
83 
84  // Clear table if requested
85  if (_fileInfo.Options.InitialClear)
86  {
87  await ClearTableAsync(cancellationToken).ConfigureAwait(false);
88  }
89 
90  // Create bulk inserter options
91  var bulkOptions = new BulkInserterOptions
92  {
93  BatchSize = _fileInfo.Options.BatchSize,
94  InsertOptions = new Dictionary<string, string>()
95  };
96 
97  if (_fileInfo.Options.UpdateOnExistingPk)
98  bulkOptions.InsertOptions["update_on_existing_pk"] = "true";
99  if (_fileInfo.Options.IgnoreExistingPk)
100  bulkOptions.InsertOptions["ignore_existing_pk"] = "true";
101  if (_fileInfo.Options.TruncateStrings)
102  bulkOptions.InsertOptions["truncate_strings"] = "true";
103 
104  // Determine columns to use
105  var recordSchema = tableType.getSchema() as RecordSchema;
106  if (recordSchema == null)
107  {
108  throw new KineticaException($"Table '{_fileInfo.TableName}' schema is not a record schema");
109  }
110 
111  var insertColumns = _fileInfo.InsertColumns.Count > 0
112  ? _fileInfo.InsertColumns
113  : recordSchema.Fields.Select(f => f.Name).ToList();
114 
115  // For local delimited text files, process client-side
116  // Create Type from KineticaType for GenericRecord
117  var recordType = CreateTypeFromKineticaType(tableType);
118  using var inserter = new BulkInserter<GenericRecord>(_kinetica, _fileInfo.TableName, tableType, bulkOptions);
119 
120  // Resolve file paths (supports glob patterns)
121  var files = ResolveLocalFiles(_fileInfo.FilePath!);
122  if (files.Count == 0)
123  {
124  throw new KineticaException($"No files found matching pattern: {_fileInfo.FilePath}");
125  }
126 
127  foreach (var filePath in files)
128  {
129  await ProcessFileAsync(filePath, recordSchema, recordType, insertColumns, inserter, cancellationToken).ConfigureAwait(false);
130  }
131 
132  // Flush remaining records
133  await inserter.FlushAsync(cancellationToken).ConfigureAwait(false);
134 
135  // Wait for all pending batches
136  while (inserter.PendingBatches > 0)
137  {
138  cancellationToken.ThrowIfCancellationRequested();
139  await Task.Delay(10, cancellationToken).ConfigureAwait(false);
140  }
141 
142  // Check for errors
143  var errors = inserter.DrainErrors();
144  if (errors.Count > 0 && _fileInfo.Options.ErrorMode == FileErrorMode.Abort)
145  {
146  throw new KineticaException($"File insert failed with {errors.Count} error(s): {string.Join("; ", errors.Select(e => e.Message))}");
147  }
148 
149  RecordsInserted = inserter.CountInserted;
150  return RecordsInserted;
151  }
152 
156  public long ReadAndInsert()
157  {
158  return Task.Run(() => ReadAndInsertAsync(CancellationToken.None)).GetAwaiter().GetResult();
159  }
160 
161  private async Task ClearTableAsync(CancellationToken cancellationToken)
162  {
163  await Task.Run(() =>
164  {
165  var options = new Dictionary<string, string>
166  {
168  };
169  _kinetica.deleteRecords(_fileInfo.TableName, new List<string>(), options);
170  }, cancellationToken).ConfigureAwait(false);
171  }
172 
177  private async Task<long> ImportViaServerApiAsync(CancellationToken cancellationToken)
178  {
179  var options = _fileInfo.Options;
180 
181  // Build file paths list
182  var filePaths = new List<string>();
183  if (_fileInfo.IsKifsPath)
184  {
185  filePaths.Add(_fileInfo.KifsPath!);
186  }
187  else
188  {
189  // For local files, they need to be accessible to the server
190  // This typically means they should be on a shared filesystem or uploaded to KiFS first
191  var files = ResolveLocalFiles(_fileInfo.FilePath!);
192  if (files.Count == 0)
193  {
194  throw new KineticaException($"No files found matching pattern: {_fileInfo.FilePath}");
195  }
196  filePaths.AddRange(files);
197  }
198 
199  // Build options dictionary
200  var apiOptions = new Dictionary<string, string>();
201 
202  // Set file type
203  var fileType = options.Format switch
204  {
211  };
212  apiOptions[InsertRecordsFromFilesRequest.Options.FILE_TYPE] = fileType;
213 
214  // Error handling
215  var errorHandling = options.ErrorMode switch
216  {
221  };
222  apiOptions[InsertRecordsFromFilesRequest.Options.ERROR_HANDLING] = errorHandling;
223 
224  // PK handling
225  if (options.UpdateOnExistingPk)
227  if (options.IgnoreExistingPk)
229 
230  // Truncate table
231  if (options.InitialClear)
233 
234  // Text-specific options
235  if (options.Format == FileFormat.DelimitedText || options.Format == FileFormat.Auto)
236  {
237  apiOptions[InsertRecordsFromFilesRequest.Options.TEXT_DELIMITER] = options.Delimiter.ToString();
238  apiOptions[InsertRecordsFromFilesRequest.Options.TEXT_HAS_HEADER] = options.HasHeader
241  apiOptions[InsertRecordsFromFilesRequest.Options.TEXT_QUOTE_CHARACTER] = options.QuoteChar.ToString();
242 
243  if (options.EscapeChar != '\0')
244  apiOptions[InsertRecordsFromFilesRequest.Options.TEXT_ESCAPE_CHARACTER] = options.EscapeChar.ToString();
245 
246  apiOptions[InsertRecordsFromFilesRequest.Options.TEXT_NULL_STRING] = options.NullString;
247 
248  if (!string.IsNullOrEmpty(options.CommentPrefix))
249  apiOptions[InsertRecordsFromFilesRequest.Options.TEXT_COMMENT_STRING] = options.CommentPrefix;
250 
251  if (options.Skip > 0)
252  apiOptions[InsertRecordsFromFilesRequest.Options.SKIP_LINES] = options.Skip.ToString();
253  }
254 
255  // Batch size
256  apiOptions[InsertRecordsFromFilesRequest.Options.BATCH_SIZE] = options.BatchSize.ToString();
257 
258  // Dry run
259  if (options.DryRun)
261 
262  // Columns to load
263  if (_fileInfo.InsertColumns.Count > 0)
264  {
265  apiOptions[InsertRecordsFromFilesRequest.Options.COLUMNS_TO_LOAD] = string.Join(",", _fileInfo.InsertColumns);
266  }
267 
268  // Create and execute the request
269  var request = new InsertRecordsFromFilesRequest(
270  _fileInfo.TableName,
271  filePaths,
272  null, // modify_columns
273  null, // create_table_options
274  apiOptions);
275 
276  var response = await Task.Run(() => _kinetica.insertRecordsFromFiles(request), cancellationToken).ConfigureAwait(false);
277 
278  RecordsInserted = response.count_inserted;
279  RecordsSkipped = response.count_skipped;
280 
281  // Add any info messages as warnings
282  if (response.info != null)
283  {
284  foreach (var kvp in response.info)
285  {
286  Warnings.Add($"{kvp.Key}: {kvp.Value}");
287  }
288  }
289 
290  return RecordsInserted;
291  }
292 
293  private static kinetica.Records.Type CreateTypeFromKineticaType(KineticaType ktype)
294  {
295  var columns = ktype.getColumns();
296  var columnDefs = new List<Column>();
297 
298  foreach (var col in columns)
299  {
300  var props = col.getProperties();
301  var colType = DetermineColumnType(col);
302 
303  columnDefs.Add(new Column(col.getName(), colType, props));
304  }
305 
306  return new kinetica.Records.Type("dynamic_record", columnDefs);
307  }
308 
309  private static ColumnType DetermineColumnType(KineticaType.Column col)
310  {
311  var props = col.getProperties();
312 
313  if (props.Contains("boolean")) return ColumnType.Boolean;
314  if (props.Contains("int8")) return ColumnType.Int8;
315  if (props.Contains("int16")) return ColumnType.Int16;
316  if (props.Contains("timestamp")) return ColumnType.Timestamp;
317  if (props.Contains("date")) return ColumnType.Date;
318  if (props.Contains("datetime")) return ColumnType.DateTime;
319  if (props.Contains("time")) return ColumnType.Time;
320  if (props.Contains("decimal")) return ColumnType.Decimal;
321  if (props.Contains("ipv4")) return ColumnType.Ipv4;
322  if (props.Contains("uuid")) return ColumnType.Uuid;
323  if (props.Contains("char1")) return ColumnType.Char1;
324  if (props.Contains("char2")) return ColumnType.Char2;
325  if (props.Contains("char4")) return ColumnType.Char4;
326  if (props.Contains("char8")) return ColumnType.Char8;
327  if (props.Contains("char16")) return ColumnType.Char16;
328  if (props.Contains("char32")) return ColumnType.Char32;
329  if (props.Contains("char64")) return ColumnType.Char64;
330  if (props.Contains("char128")) return ColumnType.Char128;
331  if (props.Contains("char256")) return ColumnType.Char256;
332 
333  return col.getType() switch
334  {
335  KineticaType.Column.ColumnType.INT => ColumnType.Integer,
336  KineticaType.Column.ColumnType.LONG => ColumnType.Long,
337  KineticaType.Column.ColumnType.FLOAT => ColumnType.Float,
338  KineticaType.Column.ColumnType.DOUBLE => ColumnType.Double,
339  KineticaType.Column.ColumnType.STRING => ColumnType.String,
340  KineticaType.Column.ColumnType.BYTES => ColumnType.Bytes,
341  _ => ColumnType.String
342  };
343  }
344 
345  private async Task ProcessFileAsync(
346  string filePath,
347  RecordSchema recordSchema,
348  kinetica.Records.Type recordType,
349  IList<string> insertColumns,
351  CancellationToken cancellationToken)
352  {
353  var options = _fileInfo.Options;
354  var parser = new CsvParser(options.Delimiter, options.QuoteChar, options.EscapeChar);
355 
356  using var reader = new StreamReader(filePath, Encoding.UTF8);
357 
358  int lineNumber = 0;
359  int linesSkipped = 0;
360  var columnMapping = new List<int>(); // Maps file column index to table column index
361 
362  // Skip initial lines
363  while (linesSkipped < options.Skip && !reader.EndOfStream)
364  {
365  await reader.ReadLineAsync().ConfigureAwait(false);
366  lineNumber++;
367  linesSkipped++;
368  }
369 
370  // Process header if present
371  if (options.HasHeader && !reader.EndOfStream)
372  {
373  var headerLine = await reader.ReadLineAsync().ConfigureAwait(false);
374  lineNumber++;
375 
376  if (!string.IsNullOrEmpty(headerLine))
377  {
378  var headers = parser.ParseLine(headerLine);
379  columnMapping = BuildColumnMapping(headers, insertColumns);
380  }
381  }
382  else
383  {
384  // Use positional mapping
385  for (int i = 0; i < insertColumns.Count; i++)
386  {
387  columnMapping.Add(i);
388  }
389  }
390 
391  // Process data lines
392  long recordCount = 0;
393  while (!reader.EndOfStream)
394  {
395  cancellationToken.ThrowIfCancellationRequested();
396 
397  var line = await reader.ReadLineAsync().ConfigureAwait(false);
398  lineNumber++;
399 
400  if (string.IsNullOrEmpty(line))
401  continue;
402 
403  // Skip comment lines
404  if (!string.IsNullOrEmpty(options.CommentPrefix) && line.StartsWith(options.CommentPrefix))
405  continue;
406 
407  try
408  {
409  var values = parser.ParseLine(line);
410  var record = recordType.NewInstance();
411 
412  for (int i = 0; i < insertColumns.Count; i++)
413  {
414  var fileIndex = i < columnMapping.Count ? columnMapping[i] : -1;
415 
416  if (fileIndex >= 0 && fileIndex < values.Length)
417  {
418  var stringValue = values[fileIndex];
419  var columnName = insertColumns[i];
420  var column = recordSchema.Fields
421  .FirstOrDefault(f => f.Name.Equals(columnName, StringComparison.OrdinalIgnoreCase));
422 
423  if (column != null)
424  {
425  var value = ConvertValue(stringValue, column.Schema.Tag.ToString(), options);
426  SetRecordValue(record, columnName, value);
427  }
428  }
429  }
430 
431  await inserter.InsertAsync(record, cancellationToken).ConfigureAwait(false);
432  recordCount++;
433 
434  // Check limit
435  if (options.Limit > 0 && recordCount >= options.Limit)
436  break;
437  }
438  catch (Exception ex)
439  {
440  if (options.ErrorMode == FileErrorMode.Abort)
441  {
442  throw new KineticaException($"Error at line {lineNumber} in file '{filePath}': {ex.Message}", ex);
443  }
444 
445  RecordsSkipped++;
446  if (options.ErrorMode == FileErrorMode.Skip)
447  {
448  Warnings.Add($"Skipped line {lineNumber} in file '{filePath}': {ex.Message}");
449  }
450  }
451  }
452  }
453 
454  private List<int> BuildColumnMapping(string[] headers, IList<string> insertColumns)
455  {
456  var mapping = new List<int>(insertColumns.Count);
457  var headerMap = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);
458 
459  for (int i = 0; i < headers.Length; i++)
460  {
461  headerMap[headers[i].Trim()] = i;
462  }
463 
464  foreach (var col in insertColumns)
465  {
466  if (headerMap.TryGetValue(col, out int index))
467  {
468  mapping.Add(index);
469  }
470  else
471  {
472  // Column not found in file, will use null/default
473  mapping.Add(-1);
474  Warnings.Add($"Column '{col}' not found in file header");
475  }
476  }
477 
478  return mapping;
479  }
480 
481  private static void SetRecordValue(GenericRecord record, string columnName, object? value)
482  {
483  if (value == null)
484  {
485  record.PutNull(columnName);
486  return;
487  }
488 
489  switch (value)
490  {
491  case int intVal:
492  record.Put(columnName, intVal);
493  break;
494  case long longVal:
495  record.Put(columnName, longVal);
496  break;
497  case float floatVal:
498  record.Put(columnName, floatVal);
499  break;
500  case double doubleVal:
501  record.Put(columnName, doubleVal);
502  break;
503  case string strVal:
504  record.Put(columnName, strVal);
505  break;
506  case bool boolVal:
507  record.Put(columnName, boolVal);
508  break;
509  case byte[] bytesVal:
510  record.Put(columnName, bytesVal);
511  break;
512  default:
513  record.Put(columnName, value.ToString());
514  break;
515  }
516  }
517 
518  private object? ConvertValue(string stringValue, string avroType, FileInsertOptions options)
519  {
520  // Handle NULL values
521  if (string.IsNullOrEmpty(stringValue) || stringValue == options.NullString)
522  {
523  return null;
524  }
525 
526  // Convert based on Avro type
527  return avroType.ToLowerInvariant() switch
528  {
529  "string" => stringValue,
530  "int" => int.TryParse(stringValue, NumberStyles.Any, CultureInfo.InvariantCulture, out var i) ? i : null,
531  "long" => long.TryParse(stringValue, NumberStyles.Any, CultureInfo.InvariantCulture, out var l) ? l : null,
532  "float" => float.TryParse(stringValue, NumberStyles.Any, CultureInfo.InvariantCulture, out var f) ? f : null,
533  "double" => double.TryParse(stringValue, NumberStyles.Any, CultureInfo.InvariantCulture, out var d) ? d : null,
534  "boolean" => ParseBoolean(stringValue),
535  "bytes" => Convert.FromBase64String(stringValue),
536  _ => stringValue
537  };
538  }
539 
540  private static bool? ParseBoolean(string value)
541  {
542  if (string.IsNullOrEmpty(value))
543  return null;
544 
545  value = value.Trim().ToUpperInvariant();
546 
547  // Common true values
548  if (value == "TRUE" || value == "T" || value == "YES" || value == "Y" || value == "1")
549  return true;
550 
551  // Common false values
552  if (value == "FALSE" || value == "F" || value == "NO" || value == "N" || value == "0")
553  return false;
554 
555  return null;
556  }
557 
558  private static IList<string> ResolveLocalFiles(string pattern)
559  {
560  // Check if it's a glob pattern or a direct file path
561  if (!ContainsGlobCharacters(pattern))
562  {
563  if (File.Exists(pattern))
564  {
565  return new List<string> { Path.GetFullPath(pattern) };
566  }
567  return new List<string>();
568  }
569 
570  // Use glob matching
571  var matcher = new Matcher();
572 
573  // Get the base directory and the pattern
574  var (baseDir, globPattern) = SplitPathAndPattern(pattern);
575 
576  matcher.AddInclude(globPattern);
577 
578  var directoryInfo = new DirectoryInfo(baseDir);
579  if (!directoryInfo.Exists)
580  {
581  return new List<string>();
582  }
583 
584  var result = matcher.Execute(new DirectoryInfoWrapper(directoryInfo));
585  return result.Files.Select(f => Path.GetFullPath(Path.Combine(baseDir, f.Path))).ToList();
586  }
587 
588  private static bool ContainsGlobCharacters(string path)
589  {
590  return path.Contains('*') || path.Contains('?') || path.Contains('[');
591  }
592 
593  private static (string baseDir, string pattern) SplitPathAndPattern(string path)
594  {
595  var normalizedPath = path.Replace('\\', '/');
596  var lastSeparatorBeforeGlob = -1;
597 
598  for (int i = 0; i < normalizedPath.Length; i++)
599  {
600  char c = normalizedPath[i];
601  if (c == '*' || c == '?' || c == '[')
602  break;
603  if (c == '/')
604  lastSeparatorBeforeGlob = i;
605  }
606 
607  if (lastSeparatorBeforeGlob < 0)
608  {
609  return (Directory.GetCurrentDirectory(), normalizedPath);
610  }
611 
612  return (normalizedPath.Substring(0, lastSeparatorBeforeGlob), normalizedPath.Substring(lastSeparatorBeforeGlob + 1));
613  }
614  }
615 
619  public class CsvParser
620  {
621  private readonly char _delimiter;
622  private readonly char _quote;
623  private readonly char _escape;
624 
631  public CsvParser(char delimiter = ',', char quote = '"', char escape = '\0')
632  {
633  _delimiter = delimiter;
634  _quote = quote;
635  _escape = escape;
636  }
637 
641  public string[] ParseLine(string line)
642  {
643  var fields = new List<string>();
644  var current = new StringBuilder();
645  bool inQuotes = false;
646  bool escaped = false;
647 
648  for (int i = 0; i < line.Length; i++)
649  {
650  char c = line[i];
651 
652  if (escaped)
653  {
654  current.Append(c);
655  escaped = false;
656  continue;
657  }
658 
659  if (_escape != '\0' && c == _escape)
660  {
661  escaped = true;
662  continue;
663  }
664 
665  if (c == _quote)
666  {
667  // Check for escaped quote (double quote)
668  if (inQuotes && i + 1 < line.Length && line[i + 1] == _quote)
669  {
670  current.Append(_quote);
671  i++; // Skip next quote
672  continue;
673  }
674 
675  inQuotes = !inQuotes;
676  continue;
677  }
678 
679  if (c == _delimiter && !inQuotes)
680  {
681  fields.Add(current.ToString());
682  current.Clear();
683  continue;
684  }
685 
686  current.Append(c);
687  }
688 
689  // Add last field
690  fields.Add(current.ToString());
691 
692  return fields.ToArray();
693  }
694  }
695 }
const string TRUNCATE_TABLE
If set to TRUE, truncates the table specified by table_name prior to loading the file(s).
List< string > Warnings
Gets any warnings generated during processing.
Class for record schemas
Definition: RecordSchema.cs:31
const string IGNORE_BAD_RECORDS
Malformed records are skipped.
int BatchSize
Number of records per batch before triggering a flush.
const string TEXT_ESCAPE_CHARACTER
Specifies the character that is used to escape other characters in the source data.
const string BATCH_SIZE
Number of records to insert per batch when inserting data.
async Task< long > ReadAndInsertAsync(CancellationToken cancellationToken=default)
Reads the file(s) and inserts records into the table.
FileFormat
Supported file formats for INSERT FROM FILE operations.
A set of string constants for the parameter options.
const string DRY_RUN
Does not load data, but walks through the source data and determines the number of valid records,...
A set of parameters for Kinetica.deleteRecords.
bool PutNull(int index)
Sets a null value by column index.
Information about an INSERT INTO...SELECT FROM FILE statement.
bool Put(int index, int value)
Sets an integer value by column index.
FileErrorMode
Error handling modes for file insert operations.
CsvParser(char delimiter=',', char quote='"', char escape = '\0')
Creates a new CSV parser with the specified options.
Reads CSV/TSV/PSV files and inserts records into Kinetica using bulk insert.
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
const string DELETE_ALL_RECORDS
If set to TRUE, all records in the table will be deleted.
const string FALSE
Reject new records when primary keys match existing records
const string TEXT_DELIMITER
Specifies the character delimiting field values in the source data and field names in the header (if ...
const string SKIP_LINES
Skip a number of lines from the beginning of the file.
if(args.Length > 0)
Definition: Program.cs:5
CsvFileReader(Kinetica kinetica, InsertFromFileInfo fileInfo, KineticaType? tableType=null)
Creates a new CsvFileReader for the specified table and file info.
const string COLUMNS_TO_LOAD
Specifies a comma-delimited list of columns from the source data to load.
const string FILE_TYPE
Specifies the type of the file(s) whose records will be inserted.
Immutable metadata about a column in a Kinetica type.
Definition: Column.cs:11
long RecordsInserted
Gets the number of records successfully inserted.
async ValueTask InsertAsync(T record, CancellationToken cancellationToken=default)
Inserts a single record with async backpressure control.
const string TEXT_QUOTE_CHARACTER
Specifies the character that should be interpreted as a field value quoting character in the source d...
const string TEXT_NULL_STRING
Specifies the character string that should be interpreted as a null value in the source data.
A set of string constants for the parameter options.
const string TEXT_COMMENT_STRING
Specifies the character string that should be interpreted as a comment line prefix in the source data...
IList< Column > getColumns()
const string PARQUET
Apache Parquet file format
const string TRUE
Upsert new records when primary keys match existing records
Simple CSV parser that handles quoted fields and escapes.
string [] ParseLine(string line)
Parses a single CSV line into fields.
const string INGESTION_MODE
Whether to do a full load, dry run, or perform a type inference on the source data.
const string IGNORE_EXISTING_PK
Specifies the record collision error-suppression policy for inserting into a table with a primary key...
const string UPDATE_ON_EXISTING_PK
Specifies the record collision policy for inserting into a table with a primary key.
const string DELIMITED_TEXT
Delimited text file format; e.g., CSV, TSV, PSV, etc.
long ReadAndInsert()
Synchronous version of ReadAndInsertAsync.
A generic record that can hold values for any Kinetica type.
const string PERMISSIVE
Records with missing columns are populated with nulls if possible; otherwise, the malformed records a...
const string ERROR_HANDLING
Specifies how errors should be handled upon insertion.
List< Field > Fields
List of fields in the record
Definition: RecordSchema.cs:36
A set of parameters for Kinetica.insertRecordsFromFiles.
long RecordsSkipped
Gets the number of records skipped due to errors.
const string ABORT
Stops current insertion and aborts entire operation when an error is encountered.
Configuration options for the BulkInserter<T>.
const string TEXT_HAS_HEADER
Indicates whether the source data contains a header row.