2 using System.Collections.Generic;
3 using System.Globalization;
7 using System.Threading;
8 using System.Threading.Tasks;
12 using Microsoft.Extensions.FileSystemGlobbing;
13 using Microsoft.Extensions.FileSystemGlobbing.Abstractions;
26 private readonly List<string> _columnNames;
27 private readonly Dictionary<string, int> _columnIndexMap;
53 _fileInfo = fileInfo ??
throw new ArgumentNullException(nameof(fileInfo));
54 _tableType = tableType;
55 _columnNames =
new List<string>();
56 _columnIndexMap =
new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);
66 if (_fileInfo.IsKifsPath)
68 return await ImportViaServerApiAsync(cancellationToken).ConfigureAwait(
false);
73 if (_fileInfo.Options.Format !=
FileFormat.DelimitedText &&
76 return await ImportViaServerApiAsync(cancellationToken).ConfigureAwait(
false);
80 var tableType = _tableType ?? await Task.Run(
82 cancellationToken).ConfigureAwait(
false);
85 if (_fileInfo.Options.InitialClear)
87 await ClearTableAsync(cancellationToken).ConfigureAwait(
false);
94 InsertOptions =
new Dictionary<string, string>()
97 if (_fileInfo.Options.UpdateOnExistingPk)
98 bulkOptions.InsertOptions[
"update_on_existing_pk"] =
"true";
99 if (_fileInfo.Options.IgnoreExistingPk)
100 bulkOptions.InsertOptions[
"ignore_existing_pk"] =
"true";
101 if (_fileInfo.Options.TruncateStrings)
102 bulkOptions.InsertOptions[
"truncate_strings"] =
"true";
105 var recordSchema = tableType.getSchema() as
RecordSchema;
106 if (recordSchema ==
null)
108 throw new KineticaException($
"Table '{_fileInfo.TableName}' schema is not a record schema");
111 var insertColumns = _fileInfo.InsertColumns.Count > 0
112 ? _fileInfo.InsertColumns
113 : recordSchema.Fields.Select(f => f.Name).ToList();
117 var recordType = CreateTypeFromKineticaType(tableType);
121 var files = ResolveLocalFiles(_fileInfo.FilePath!);
122 if (files.Count == 0)
124 throw new KineticaException($
"No files found matching pattern: {_fileInfo.FilePath}");
127 foreach (var filePath
in files)
129 await ProcessFileAsync(filePath, recordSchema, recordType, insertColumns, inserter, cancellationToken).ConfigureAwait(
false);
133 await inserter.FlushAsync(cancellationToken).ConfigureAwait(
false);
136 while (inserter.PendingBatches > 0)
138 cancellationToken.ThrowIfCancellationRequested();
139 await Task.Delay(10, cancellationToken).ConfigureAwait(
false);
143 var errors = inserter.DrainErrors();
144 if (errors.Count > 0 && _fileInfo.Options.ErrorMode ==
FileErrorMode.Abort)
146 throw new KineticaException($
"File insert failed with {errors.Count} error(s): {string.Join(";
", errors.Select(e => e.Message))}");
158 return Task.Run(() =>
ReadAndInsertAsync(CancellationToken.None)).GetAwaiter().GetResult();
161 private async Task ClearTableAsync(CancellationToken cancellationToken)
165 var options =
new Dictionary<string, string>
169 _kinetica.deleteRecords(_fileInfo.TableName,
new List<string>(), options);
170 }, cancellationToken).ConfigureAwait(
false);
177 private async Task<long> ImportViaServerApiAsync(CancellationToken cancellationToken)
179 var options = _fileInfo.Options;
182 var filePaths =
new List<string>();
183 if (_fileInfo.IsKifsPath)
185 filePaths.Add(_fileInfo.KifsPath!);
191 var files = ResolveLocalFiles(_fileInfo.FilePath!);
192 if (files.Count == 0)
194 throw new KineticaException($
"No files found matching pattern: {_fileInfo.FilePath}");
196 filePaths.AddRange(files);
200 var apiOptions =
new Dictionary<string, string>();
203 var fileType = options.Format
switch 215 var errorHandling = options.ErrorMode
switch 225 if (options.UpdateOnExistingPk)
227 if (options.IgnoreExistingPk)
231 if (options.InitialClear)
243 if (options.EscapeChar !=
'\0')
248 if (!
string.IsNullOrEmpty(options.CommentPrefix))
251 if (options.Skip > 0)
263 if (_fileInfo.InsertColumns.Count > 0)
276 var response = await Task.Run(() => _kinetica.insertRecordsFromFiles(request), cancellationToken).ConfigureAwait(
false);
282 if (response.info !=
null)
284 foreach (var kvp
in response.info)
286 Warnings.Add($
"{kvp.Key}: {kvp.Value}");
296 var columnDefs =
new List<Column>();
298 foreach (var col
in columns)
300 var props = col.getProperties();
301 var colType = DetermineColumnType(col);
303 columnDefs.Add(
new Column(col.getName(), colType, props));
306 return new kinetica.Records.Type(
"dynamic_record", columnDefs);
311 var props = col.getProperties();
313 if (props.Contains(
"boolean"))
return ColumnType.Boolean;
314 if (props.Contains(
"int8"))
return ColumnType.Int8;
315 if (props.Contains(
"int16"))
return ColumnType.Int16;
316 if (props.Contains(
"timestamp"))
return ColumnType.Timestamp;
317 if (props.Contains(
"date"))
return ColumnType.Date;
318 if (props.Contains(
"datetime"))
return ColumnType.DateTime;
319 if (props.Contains(
"time"))
return ColumnType.Time;
320 if (props.Contains(
"decimal"))
return ColumnType.Decimal;
321 if (props.Contains(
"ipv4"))
return ColumnType.Ipv4;
322 if (props.Contains(
"uuid"))
return ColumnType.Uuid;
323 if (props.Contains(
"char1"))
return ColumnType.Char1;
324 if (props.Contains(
"char2"))
return ColumnType.Char2;
325 if (props.Contains(
"char4"))
return ColumnType.Char4;
326 if (props.Contains(
"char8"))
return ColumnType.Char8;
327 if (props.Contains(
"char16"))
return ColumnType.Char16;
328 if (props.Contains(
"char32"))
return ColumnType.Char32;
329 if (props.Contains(
"char64"))
return ColumnType.Char64;
330 if (props.Contains(
"char128"))
return ColumnType.Char128;
331 if (props.Contains(
"char256"))
return ColumnType.Char256;
333 return col.getType()
switch 341 _ => ColumnType.String
345 private async Task ProcessFileAsync(
349 IList<string> insertColumns,
351 CancellationToken cancellationToken)
353 var options = _fileInfo.Options;
354 var parser =
new CsvParser(options.Delimiter, options.QuoteChar, options.EscapeChar);
356 using var reader =
new StreamReader(filePath, Encoding.UTF8);
359 int linesSkipped = 0;
360 var columnMapping =
new List<int>();
363 while (linesSkipped < options.Skip && !reader.EndOfStream)
365 await reader.ReadLineAsync().ConfigureAwait(
false);
371 if (options.HasHeader && !reader.EndOfStream)
373 var headerLine = await reader.ReadLineAsync().ConfigureAwait(
false);
376 if (!
string.IsNullOrEmpty(headerLine))
378 var headers = parser.ParseLine(headerLine);
379 columnMapping = BuildColumnMapping(headers, insertColumns);
385 for (
int i = 0; i < insertColumns.Count; i++)
387 columnMapping.Add(i);
392 long recordCount = 0;
393 while (!reader.EndOfStream)
395 cancellationToken.ThrowIfCancellationRequested();
397 var line = await reader.ReadLineAsync().ConfigureAwait(
false);
400 if (
string.IsNullOrEmpty(line))
404 if (!
string.IsNullOrEmpty(options.CommentPrefix) && line.StartsWith(options.CommentPrefix))
409 var values = parser.ParseLine(line);
410 var record = recordType.NewInstance();
412 for (
int i = 0; i < insertColumns.Count; i++)
414 var fileIndex = i < columnMapping.Count ? columnMapping[i] : -1;
416 if (fileIndex >= 0 && fileIndex < values.Length)
418 var stringValue = values[fileIndex];
419 var columnName = insertColumns[i];
420 var column = recordSchema.
Fields 421 .FirstOrDefault(f => f.Name.Equals(columnName, StringComparison.OrdinalIgnoreCase));
425 var value = ConvertValue(stringValue, column.Schema.Tag.ToString(), options);
426 SetRecordValue(record, columnName, value);
431 await inserter.
InsertAsync(record, cancellationToken).ConfigureAwait(
false);
435 if (options.Limit > 0 && recordCount >= options.Limit)
442 throw new KineticaException($
"Error at line {lineNumber} in file '{filePath}': {ex.Message}", ex);
448 Warnings.Add($
"Skipped line {lineNumber} in file '{filePath}': {ex.Message}");
454 private List<int> BuildColumnMapping(
string[] headers, IList<string> insertColumns)
456 var mapping =
new List<int>(insertColumns.Count);
457 var headerMap =
new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);
459 for (
int i = 0; i < headers.Length; i++)
461 headerMap[headers[i].Trim()] = i;
464 foreach (var col
in insertColumns)
466 if (headerMap.TryGetValue(col, out
int index))
474 Warnings.Add($
"Column '{col}' not found in file header");
481 private static void SetRecordValue(
GenericRecord record,
string columnName,
object? value)
492 record.
Put(columnName, intVal);
495 record.
Put(columnName, longVal);
498 record.
Put(columnName, floatVal);
500 case double doubleVal:
501 record.
Put(columnName, doubleVal);
504 record.
Put(columnName, strVal);
507 record.
Put(columnName, boolVal);
509 case byte[] bytesVal:
510 record.
Put(columnName, bytesVal);
513 record.
Put(columnName, value.ToString());
518 private object? ConvertValue(
string stringValue,
string avroType, FileInsertOptions options)
521 if (
string.IsNullOrEmpty(stringValue) || stringValue == options.NullString)
527 return avroType.ToLowerInvariant()
switch 529 "string" => stringValue,
530 "int" =>
int.TryParse(stringValue, NumberStyles.Any, CultureInfo.InvariantCulture, out var i) ? i :
null,
531 "long" =>
long.TryParse(stringValue, NumberStyles.Any, CultureInfo.InvariantCulture, out var l) ? l :
null,
532 "float" =>
float.TryParse(stringValue, NumberStyles.Any, CultureInfo.InvariantCulture, out var f) ? f :
null,
533 "double" =>
double.TryParse(stringValue, NumberStyles.Any, CultureInfo.InvariantCulture, out var d) ? d :
null,
534 "boolean" => ParseBoolean(stringValue),
535 "bytes" => Convert.FromBase64String(stringValue),
540 private static bool? ParseBoolean(
string value)
542 if (
string.IsNullOrEmpty(value))
545 value = value.Trim().ToUpperInvariant();
548 if (value ==
"TRUE" || value ==
"T" || value ==
"YES" || value ==
"Y" || value ==
"1")
552 if (value ==
"FALSE" || value ==
"F" || value ==
"NO" || value ==
"N" || value ==
"0")
558 private static IList<string> ResolveLocalFiles(
string pattern)
561 if (!ContainsGlobCharacters(pattern))
563 if (File.Exists(pattern))
565 return new List<string> { Path.GetFullPath(pattern) };
567 return new List<string>();
571 var matcher =
new Matcher();
574 var (baseDir, globPattern) = SplitPathAndPattern(pattern);
576 matcher.AddInclude(globPattern);
578 var directoryInfo =
new DirectoryInfo(baseDir);
579 if (!directoryInfo.Exists)
581 return new List<string>();
584 var result = matcher.Execute(
new DirectoryInfoWrapper(directoryInfo));
585 return result.Files.Select(f => Path.GetFullPath(Path.Combine(baseDir, f.Path))).ToList();
588 private static bool ContainsGlobCharacters(
string path)
590 return path.Contains(
'*') || path.Contains(
'?') || path.Contains(
'[');
593 private static (
string baseDir,
string pattern) SplitPathAndPattern(
string path)
595 var normalizedPath = path.Replace(
'\\',
'/');
596 var lastSeparatorBeforeGlob = -1;
598 for (
int i = 0; i < normalizedPath.Length; i++)
600 char c = normalizedPath[i];
601 if (c ==
'*' || c ==
'?' || c ==
'[')
604 lastSeparatorBeforeGlob = i;
607 if (lastSeparatorBeforeGlob < 0)
609 return (Directory.GetCurrentDirectory(), normalizedPath);
612 return (normalizedPath.Substring(0, lastSeparatorBeforeGlob), normalizedPath.Substring(lastSeparatorBeforeGlob + 1));
621 private readonly
char _delimiter;
622 private readonly
char _quote;
623 private readonly
char _escape;
631 public CsvParser(
char delimiter =
',',
char quote =
'"',
char escape =
'\0')
633 _delimiter = delimiter;
643 var fields =
new List<string>();
644 var current =
new StringBuilder();
645 bool inQuotes =
false;
646 bool escaped =
false;
648 for (
int i = 0; i < line.Length; i++)
659 if (_escape !=
'\0' && c == _escape)
668 if (inQuotes && i + 1 < line.Length && line[i + 1] == _quote)
670 current.Append(_quote);
675 inQuotes = !inQuotes;
679 if (c == _delimiter && !inQuotes)
681 fields.Add(current.ToString());
690 fields.Add(current.ToString());
692 return fields.ToArray();
const string TRUNCATE_TABLE
If set to TRUE, truncates the table specified by table_name prior to loading the file(s).
List< string > Warnings
Gets any warnings generated during processing.
const string SHAPEFILE
ShapeFile file format
const string IGNORE_BAD_RECORDS
Malformed records are skipped.
int BatchSize
Number of records per batch before triggering a flush.
const string TEXT_ESCAPE_CHARACTER
Specifies the character that is used to escape other characters in the source data.
const string BATCH_SIZE
Number of records to insert per batch when inserting data.
async Task< long > ReadAndInsertAsync(CancellationToken cancellationToken=default)
Reads the file(s) and inserts records into the table.
FileFormat
Supported file formats for INSERT FROM FILE operations.
A set of string constants for the parameter options.
const string DRY_RUN
Does not load data, but walks through the source data and determines the number of valid records,...
A set of parameters for Kinetica.deleteRecords.
bool PutNull(int index)
Sets a null value by column index.
Information about an INSERT INTO...SELECT FROM FILE statement.
bool Put(int index, int value)
Sets an integer value by column index.
FileErrorMode
Error handling modes for file insert operations.
CsvParser(char delimiter=',', char quote='"', char escape = '\0')
Creates a new CSV parser with the specified options.
Reads CSV/TSV/PSV files and inserts records into Kinetica using bulk insert.
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
const string DELETE_ALL_RECORDS
If set to TRUE, all records in the table will be deleted.
const string FALSE
Reject new records when primary keys match existing records
const string TEXT_DELIMITER
Specifies the character delimiting field values in the source data and field names in the header (if ...
const string SKIP_LINES
Skip a number of lines from the beginning of the file.
CsvFileReader(Kinetica kinetica, InsertFromFileInfo fileInfo, KineticaType? tableType=null)
Creates a new CsvFileReader for the specified table and file info.
const string COLUMNS_TO_LOAD
Specifies a comma-delimited list of columns from the source data to load.
const string FILE_TYPE
Specifies the type of the file(s) whose records will be inserted.
const string AVRO
Avro file format
Immutable metadata about a column in a Kinetica type.
long RecordsInserted
Gets the number of records successfully inserted.
async ValueTask InsertAsync(T record, CancellationToken cancellationToken=default)
Inserts a single record with async backpressure control.
const string TEXT_QUOTE_CHARACTER
Specifies the character that should be interpreted as a field value quoting character in the source d...
const string TEXT_NULL_STRING
Specifies the character string that should be interpreted as a null value in the source data.
A set of string constants for the parameter options.
const string TEXT_COMMENT_STRING
Specifies the character string that should be interpreted as a comment line prefix in the source data...
IList< Column > getColumns()
const string PARQUET
Apache Parquet file format
const string TRUE
Upsert new records when primary keys match existing records
Simple CSV parser that handles quoted fields and escapes.
const string JSON
Json file format
string [] ParseLine(string line)
Parses a single CSV line into fields.
const string INGESTION_MODE
Whether to do a full load, dry run, or perform a type inference on the source data.
const string IGNORE_EXISTING_PK
Specifies the record collision error-suppression policy for inserting into a table with a primary key...
const string UPDATE_ON_EXISTING_PK
Specifies the record collision policy for inserting into a table with a primary key.
const string DELIMITED_TEXT
Delimited text file format; e.g., CSV, TSV, PSV, etc.
long ReadAndInsert()
Synchronous version of ReadAndInsertAsync.
A generic record that can hold values for any Kinetica type.
const string PERMISSIVE
Records with missing columns are populated with nulls if possible; otherwise, the malformed records a...
const string ERROR_HANDLING
Specifies how errors should be handled upon insertion.
List< Field > Fields
List of fields in the record
A set of parameters for Kinetica.insertRecordsFromFiles.
long RecordsSkipped
Gets the number of records skipped due to errors.
const string ABORT
Stops current insertion and aborts entire operation when an error is encountered.
Configuration options for the BulkInserter<T>.
const string TEXT_HAS_HEADER
Indicates whether the source data contains a header row.