1 using System.Collections.Concurrent;
3 using System.Data.Common;
4 using System.Text.RegularExpressions;
14 private string? _connectionString;
15 private ConnectionState _state = ConnectionState.Closed;
16 private string _database =
"";
18 private bool _pooled =
true;
22 private bool _batchInsertMode =
false;
26 private string? _currentSchema;
27 private readonly Stack<string> _userStack =
new Stack<string>();
28 private string? _currentImpersonatedUser;
37 [System.Diagnostics.CodeAnalysis.AllowNull]
40 get => _connectionString ??
string.Empty;
43 _connectionString = value ??
string.Empty;
45 _pooled = _connectionStringBuilder.Pooling;
48 _batchInsertMode = _connectionStringBuilder.BatchInsertMode;
49 _batchOptions.
BatchSize = _connectionStringBuilder.BatchSize;
57 public override ConnectionState
State => _state;
65 get => _currentSchema ?? _connectionStringBuilder?.
Schema;
66 set => _currentSchema = value;
76 _database = databaseName;
84 _currentSchema = schemaName;
93 _currentImpersonatedUser = username;
102 if (_currentImpersonatedUser !=
null)
104 _userStack.Push(_currentImpersonatedUser);
106 _currentImpersonatedUser = username;
114 if (_userStack.Count > 0)
116 _currentImpersonatedUser = _userStack.Pop();
120 _currentImpersonatedUser =
null;
126 if (_state == ConnectionState.Open)
129 if (_batchManager !=
null)
139 _batchManager =
null;
142 if (_pooled && _connectionString !=
null && _kineticaClient !=
null)
144 _connectionPool.ReturnConnection(_connectionString, _kineticaClient);
148 _kineticaClient =
null;
149 _state = ConnectionState.Closed;
156 Task.Run(async () => await
OpenAsync(CancellationToken.None).ConfigureAwait(
false))
157 .ConfigureAwait(
false)
162 public override async Task
OpenAsync(CancellationToken cancellationToken)
164 if (_state == ConnectionState.Open)
169 _state = ConnectionState.Connecting;
173 _kineticaClient = await _connectionPool.GetConnectionAsync(_connectionString ??
string.Empty, cancellationToken).ConfigureAwait(
false);
177 _kineticaClient = await CreateKineticaClientAsync(_connectionStringBuilder ??
new KineticaConnectionStringBuilder(), cancellationToken).ConfigureAwait(
false);
179 _database = _connectionStringBuilder?.
Database ??
"";
180 _state = ConnectionState.Open;
183 if (_batchInsertMode && _kineticaClient !=
null)
190 _state = ConnectionState.Closed;
200 if (!
string.IsNullOrEmpty(builder.
Username) || !
string.IsNullOrEmpty(builder.
Password))
204 Username = builder.
Username ??
string.Empty,
205 Password = builder.
Password ??
string.Empty
210 if (!
string.IsNullOrEmpty(builder.
OAuthToken))
219 await TestConnectionAsync(client, cancellationToken).ConfigureAwait(
false);
223 private async Task TestConnectionAsync(
Kinetica client, CancellationToken cancellationToken)
228 await client.ShowSystemStatusAsync(
new Dictionary<string, string>(), cancellationToken).ConfigureAwait(
false);
246 private async Task<string> GetServerVersionAsync()
250 if (_kineticaClient ==
null)
253 var response = await _kineticaClient.ShowSystemStatusAsync(
new Dictionary<string, string>()).ConfigureAwait(
false);
254 return response?.status_map[
"status"] ??
"Unknown";
262 internal Kinetica GetKineticaClient() => _kineticaClient ??
throw new InvalidOperationException(
"Connection is not open");
264 #region Batch Insert Support 272 get => _batchInsertMode;
275 if (_batchInsertMode == value)
return;
277 _batchInsertMode = value;
279 if (_batchInsertMode && _state == ConnectionState.Open && _kineticaClient !=
null)
284 else if (!_batchInsertMode && _batchManager !=
null)
288 _batchManager =
null;
328 return Task.Run(async () => await
FlushBatchAsync(CancellationToken.None).ConfigureAwait(
false))
329 .ConfigureAwait(
false)
339 public async Task<long>
FlushBatchAsync(CancellationToken cancellationToken =
default)
341 if (_batchManager ==
null)
346 return await _batchManager.
FlushAllAsync(cancellationToken).ConfigureAwait(
false);
357 public override DataTable
GetSchema(
string collectionName)
362 public override DataTable
GetSchema(
string collectionName,
string?[]? restrictionValues)
365 return schemaProvider.GetSchema(collectionName, restrictionValues);
368 protected override void Dispose(
bool disposing)
374 base.Dispose(disposing);
383 private string _commandText =
string.Empty;
385 private int _commandTimeout = 30;
386 private int _fetchSize = 0;
388 private CancellationTokenSource? _cancellationTokenSource;
398 _connection = connection;
403 _commandText = commandText;
406 [System.Diagnostics.CodeAnalysis.AllowNull]
407 public override string CommandText {
get => _commandText;
set => _commandText = value ??
string.Empty; }
408 public override int CommandTimeout {
get => _commandTimeout;
set => _commandTimeout = value; }
422 set => _fetchSize = Math.Max(0, value);
436 _cancellationTokenSource?.Cancel();
442 return Task.Run(async () => await
ExecuteNonQueryAsync(CancellationToken.None).ConfigureAwait(
false))
443 .ConfigureAwait(
false)
450 var connection = ValidateCommand();
452 using (_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
454 var timeoutToken =
new CancellationTokenSource(TimeSpan.FromSeconds(_commandTimeout));
455 using var combinedToken = CancellationTokenSource.CreateLinkedTokenSource(
456 _cancellationTokenSource.Token, timeoutToken.Token);
460 var client = connection.GetKineticaClient();
463 var statements = SplitStatements(_commandText);
464 long totalAffected = 0;
466 foreach (var statement
in statements)
468 if (
string.IsNullOrWhiteSpace(statement))
471 var parsedCommand = _sqlParser.Parse(statement, _parameters);
472 totalAffected += await ExecuteParsedCommandAsync(client, parsedCommand, combinedToken.Token).ConfigureAwait(
false);
475 return (
int)totalAffected;
477 catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
481 catch (OperationCanceledException)
483 throw new TimeoutException($
"Command timeout ({_commandTimeout}s) exceeded");
495 private static IList<string> SplitStatements(
string sql)
497 var statements =
new List<string>();
498 var currentStatement =
new System.Text.StringBuilder();
499 bool inSingleQuote =
false;
500 bool inDoubleQuote =
false;
501 bool inLineComment =
false;
502 bool inBlockComment =
false;
504 for (
int i = 0; i < sql.Length; i++)
507 char nextChar = i + 1 < sql.Length ? sql[i + 1] :
'\0';
510 if (!inSingleQuote && !inDoubleQuote && !inBlockComment && c ==
'-' && nextChar ==
'-')
512 inLineComment =
true;
513 currentStatement.Append(c);
518 if (inLineComment && (c ==
'\n' || c ==
'\r'))
520 inLineComment =
false;
521 currentStatement.Append(c);
526 if (!inSingleQuote && !inDoubleQuote && !inLineComment && c ==
'/' && nextChar ==
'*')
528 inBlockComment =
true;
529 currentStatement.Append(c);
534 if (inBlockComment && c ==
'*' && nextChar ==
'/')
536 inBlockComment =
false;
537 currentStatement.Append(c);
538 currentStatement.Append(nextChar);
544 if (inLineComment || inBlockComment)
546 currentStatement.Append(c);
551 if (c ==
'\'' && !inDoubleQuote)
554 if (inSingleQuote && nextChar ==
'\'')
556 currentStatement.Append(c);
557 currentStatement.Append(nextChar);
561 inSingleQuote = !inSingleQuote;
563 else if (c ==
'"' && !inSingleQuote)
565 inDoubleQuote = !inDoubleQuote;
569 if (c ==
';' && !inSingleQuote && !inDoubleQuote)
571 var stmt = currentStatement.ToString().Trim();
572 if (!
string.IsNullOrEmpty(stmt))
574 statements.Add(stmt);
576 currentStatement.Clear();
580 currentStatement.Append(c);
584 var lastStmt = currentStatement.ToString().Trim();
585 if (!
string.IsNullOrEmpty(lastStmt))
587 statements.Add(lastStmt);
596 return Task.Run(async () => await
ExecuteScalarAsync(CancellationToken.None).ConfigureAwait(
false))
597 .ConfigureAwait(
false)
604 using (var reader = await ExecuteReaderAsync(cancellationToken).ConfigureAwait(
false))
606 if (await reader.ReadAsync(cancellationToken).ConfigureAwait(
false))
608 return reader.GetValue(0);
617 return Task.Run(async () => await ExecuteReaderAsync(behavior, CancellationToken.None).ConfigureAwait(
false))
618 .ConfigureAwait(
false)
623 protected override async Task<DbDataReader>
ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken)
625 var connection = ValidateCommand();
627 using (_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
629 var timeoutToken =
new CancellationTokenSource(TimeSpan.FromSeconds(_commandTimeout));
630 using var combinedToken = CancellationTokenSource.CreateLinkedTokenSource(
631 _cancellationTokenSource.Token, timeoutToken.Token);
635 var client = connection.GetKineticaClient();
636 var parsedCommand = _sqlParser.Parse(_commandText, _parameters);
643 parsedCommand.FinalSql,
646 combinedToken.Token);
652 response = await client.ExecuteSqlAsync(parsedCommand.FinalSql, 0, -9999, options:
null, cancellationToken: combinedToken.Token).ConfigureAwait(
false);
656 response = await ExecuteParsedQueryAsync(client, parsedCommand, combinedToken.Token).ConfigureAwait(
false);
661 catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
665 catch (OperationCanceledException)
667 throw new TimeoutException($
"Command timeout ({_commandTimeout}s) exceeded");
676 private async Task<long> ExecuteParsedCommandAsync(
Kinetica client,
ParsedCommand parsedCommand, CancellationToken cancellationToken)
680 ParsedCommandType.Insert => await ExecuteInsertAsync(client, parsedCommand, cancellationToken).ConfigureAwait(
false),
681 ParsedCommandType.InsertFromFile => await ExecuteInsertFromFileAsync(client, parsedCommand, cancellationToken).ConfigureAwait(
false),
682 ParsedCommandType.Update => await ExecuteUpdateAsync(client, parsedCommand, cancellationToken).ConfigureAwait(
false),
683 ParsedCommandType.Delete => await ExecuteDeleteAsync(client, parsedCommand, cancellationToken).ConfigureAwait(
false),
684 ParsedCommandType.CreateTable => await ExecuteCreateTableAsync(client, parsedCommand, cancellationToken).ConfigureAwait(
false),
685 ParsedCommandType.DropTable => await ExecuteDropTableAsync(client, parsedCommand, cancellationToken).ConfigureAwait(
false),
690 _ => await ExecuteGenericSqlAsync(client, parsedCommand, cancellationToken).ConfigureAwait(
false)
694 private long ExecuteSetSchema(ParsedCommand command)
696 if (_connection ==
null)
697 throw new InvalidOperationException(
"Connection is not open");
699 var schemaName = _sqlParser.ExtractSchemaName(command.OriginalSql);
700 if (!
string.IsNullOrEmpty(schemaName))
707 private long ExecuteSetUser(ParsedCommand command)
709 if (_connection ==
null)
710 throw new InvalidOperationException(
"Connection is not open");
712 if (command.UserImpersonation?.Username !=
null)
714 _connection.
SetUser(command.UserImpersonation.Username);
719 private long ExecuteAsUser(ParsedCommand command)
721 if (_connection ==
null)
722 throw new InvalidOperationException(
"Connection is not open");
724 if (command.UserImpersonation?.Username !=
null)
726 _connection.
ExecuteAsUser(command.UserImpersonation.Username);
731 private long ExecuteRevert()
733 if (_connection ==
null)
734 throw new InvalidOperationException(
"Connection is not open");
740 private async Task<ExecuteSqlResponse> ExecuteParsedQueryAsync(
Kinetica client, ParsedCommand parsedCommand, CancellationToken cancellationToken)
742 return await client.ExecuteSqlAsync(parsedCommand.FinalSql, 0, -9999, options:
null, cancellationToken: cancellationToken).ConfigureAwait(
false);
745 private async Task<long> ExecuteInsertAsync(
Kinetica client, ParsedCommand command, CancellationToken cancellationToken)
748 if (_connection?.BatchInsertMode ==
true && _connection.BatchManager !=
null)
751 if (InsertStatementParser.TryParse(command.FinalSql, out var parsedInsert) && parsedInsert !=
null)
755 parsedInsert.TableName,
756 parsedInsert.ColumnNames,
758 cancellationToken).ConfigureAwait(
false);
765 var response = await client.ExecuteSqlAsync(command.FinalSql, 0, -9999, options:
null, cancellationToken: cancellationToken).ConfigureAwait(
false);
766 return response.count_affected;
769 private async Task<long> ExecuteInsertFromFileAsync(
Kinetica client, ParsedCommand command, CancellationToken cancellationToken)
771 if (command.InsertFromFile ==
null)
773 throw new KineticaException(
"INSERT FROM FILE command is missing file information");
776 var fileInfo = command.InsertFromFile;
779 if (command.Hints.BatchSize.HasValue)
780 fileInfo.Options.BatchSize = command.Hints.BatchSize.Value;
781 if (command.Hints.TruncateStrings)
782 fileInfo.Options.TruncateStrings =
true;
783 if (command.Hints.UpdateOnExistingPk)
784 fileInfo.Options.UpdateOnExistingPk =
true;
785 if (command.Hints.IgnoreExistingPk)
786 fileInfo.Options.IgnoreExistingPk =
true;
789 var reader =
new CsvFileReader(client, fileInfo);
790 var recordCount = await reader.ReadAndInsertAsync(cancellationToken).ConfigureAwait(
false);
795 private async Task<long> ExecuteUpdateAsync(
Kinetica client, ParsedCommand command, CancellationToken cancellationToken)
797 var response = await client.ExecuteSqlAsync(command.FinalSql, 0, -9999, options:
null, cancellationToken: cancellationToken).ConfigureAwait(
false);
798 return response.count_affected;
801 private async Task<long> ExecuteDeleteAsync(
Kinetica client, ParsedCommand command, CancellationToken cancellationToken)
803 var response = await client.ExecuteSqlAsync(command.FinalSql, 0, -9999, options:
null, cancellationToken: cancellationToken).ConfigureAwait(
false);
804 return response.count_affected;
807 private async Task<int> ExecuteCreateTableAsync(
Kinetica client, ParsedCommand command, CancellationToken cancellationToken)
809 var response = await client.ExecuteSqlAsync(command.FinalSql, 0, -9999, options:
null, cancellationToken: cancellationToken).ConfigureAwait(
false);
813 private async Task<int> ExecuteDropTableAsync(
Kinetica client, ParsedCommand command, CancellationToken cancellationToken)
815 var response = await client.ExecuteSqlAsync(command.FinalSql, 0, -9999, options:
null, cancellationToken: cancellationToken).ConfigureAwait(
false);
819 private async Task<long> ExecuteGenericSqlAsync(
Kinetica client, ParsedCommand command, CancellationToken cancellationToken)
821 var response = await client.ExecuteSqlAsync(command.FinalSql, 0, -9999, options:
null, cancellationToken: cancellationToken).ConfigureAwait(
false);
822 return response.count_affected;
836 _sqlParser.Parse(_commandText, _parameters);
846 if (_connection ==
null || _connection.
State != ConnectionState.Open)
847 throw new InvalidOperationException(
"Connection is not open");
849 if (
string.IsNullOrEmpty(_commandText))
850 throw new InvalidOperationException(
"Command text is not set");
855 protected override void Dispose(
bool disposing)
859 _cancellationTokenSource?.Dispose();
861 base.Dispose(disposing);
869 private readonly CommandBehavior _behavior;
870 private readonly IList<string> _columnNames;
871 private readonly IList<string> _columnTypes;
872 private int _currentRow = -1;
873 private bool _closed =
false;
877 _response = response;
878 _behavior = behavior;
881 _columnNames =
new List<string>();
882 _columnTypes =
new List<string>();
884 if (_response.data !=
null && _response.data.Count > 0)
886 var firstRecord = _response.data[0];
887 if (firstRecord?.Schema?.Fields !=
null)
889 foreach (var field
in firstRecord.Schema.Fields)
891 _columnNames.Add(field.Name);
892 _columnTypes.Add(GetAvroFieldType(field.Schema));
898 private static string GetAvroFieldType(
Avro.
Schema schema)
903 foreach (var s
in unionSchema.Schemas)
906 return GetAvroFieldType(s);
910 return schema.Tag
switch 923 public override bool HasRows => _response.total_number_of_records > 0;
927 public override object this[
int ordinal] =>
GetValue(ordinal);
934 if (_closed || _response.data ==
null)
938 return _currentRow < _response.data.Count;
941 public override async Task<bool>
ReadAsync(CancellationToken cancellationToken)
946 if (_closed || _response.data ==
null)
950 return _currentRow < _response.data.Count;
971 public override byte GetByte(
int ordinal) => Convert.ToByte(
GetValue(ordinal));
972 public override char GetChar(
int ordinal) => Convert.ToChar(
GetValue(ordinal));
974 public override decimal
GetDecimal(
int ordinal) => Convert.ToDecimal(
GetValue(ordinal));
975 public override double GetDouble(
int ordinal) => Convert.ToDouble(
GetValue(ordinal));
976 public override float GetFloat(
int ordinal) => Convert.ToSingle(
GetValue(ordinal));
977 public override Guid
GetGuid(
int ordinal) => Guid.Parse(
GetValue(ordinal)?.ToString() ??
"");
978 public override short GetInt16(
int ordinal) => Convert.ToInt16(
GetValue(ordinal));
979 public override int GetInt32(
int ordinal) => Convert.ToInt32(
GetValue(ordinal));
980 public override long GetInt64(
int ordinal) => Convert.ToInt64(
GetValue(ordinal));
981 public override string GetString(
int ordinal) =>
GetValue(ordinal)?.ToString() ??
"";
983 public override long GetBytes(
int ordinal,
long dataOffset,
byte[]? buffer,
int bufferOffset,
int length)
986 if (value ==
null || value == DBNull.Value)
990 if (value is
byte[] byteArray)
994 else if (value is
string str)
996 bytes = System.Text.Encoding.UTF8.GetBytes(str);
1000 bytes = System.Text.Encoding.UTF8.GetBytes(value.ToString() ??
"");
1005 return bytes.Length;
1008 long availableBytes = bytes.Length - dataOffset;
1009 if (availableBytes <= 0)
1012 int bytesToCopy = (int)Math.Min(availableBytes, length);
1013 Array.Copy(bytes, dataOffset, buffer, bufferOffset, bytesToCopy);
1017 public override long GetChars(
int ordinal,
long dataOffset,
char[]? buffer,
int bufferOffset,
int length)
1020 if (value ==
null || value == DBNull.Value)
1023 string str = value.ToString() ??
"";
1030 long availableChars = str.Length - dataOffset;
1031 if (availableChars <= 0)
1034 int charsToCopy = (int)Math.Min(availableChars, length);
1035 str.CopyTo((
int)dataOffset, buffer, bufferOffset, charsToCopy);
1041 if (ordinal >= 0 && ordinal < _columnTypes.Count)
1042 return _columnTypes[ordinal];
1049 return dataType
switch 1052 "int" or
"integer" or
"int32" => typeof(
int),
1053 "int8" or
"tinyint" => typeof(sbyte),
1054 "int16" or
"smallint" => typeof(
short),
1055 "long" or
"bigint" or
"int64" => typeof(
long),
1056 "ulong" or
"uint64" => typeof(ulong),
1059 "float" or
"real" => typeof(
float),
1060 "double" or
"float8" => typeof(
double),
1061 "decimal" or
"numeric" => typeof(decimal),
1064 "bool" or
"boolean" => typeof(
bool),
1067 "string" or
"varchar" or
"text" => typeof(
string),
1068 var s when s.StartsWith(
"char") => typeof(
string),
1072 "time" => typeof(TimeSpan),
1073 "datetime" or
"timestamp" => typeof(
DateTime),
1076 "uuid" or
"guid" => typeof(Guid),
1077 "ipv4" => typeof(
string),
1078 "json" => typeof(
string),
1079 "wkt" => typeof(
string),
1082 "bytes" or
"binary" or
"varbinary" => typeof(
byte[]),
1085 "vector" => typeof(
float[]),
1086 var a when a.StartsWith(
"array") => typeof(
object),
1095 if (ordinal >= 0 && ordinal < _columnNames.Count)
1096 return _columnNames[ordinal];
1097 return $
"Column{ordinal}";
1102 for (
int i = 0; i < _columnNames.Count; i++)
1104 if (
string.Equals(_columnNames[i], name, StringComparison.OrdinalIgnoreCase))
1107 throw new ArgumentException($
"Column '{name}' not found");
1112 if (_currentRow < 0 || _response.data == null || _currentRow >= _response.data.Count)
1113 throw new InvalidOperationException(
"No current row");
1115 if (ordinal < 0 || ordinal >= _columnNames.Count)
1116 throw new ArgumentOutOfRangeException(nameof(ordinal));
1118 var record = _response.data[_currentRow];
1119 var fieldName = _columnNames[ordinal];
1121 if (record.TryGetValue(fieldName, out var value))
1123 return value ?? DBNull.Value;
1126 return DBNull.Value;
1131 int count = Math.Min(values.Length,
FieldCount);
1132 for (
int i = 0; i < count; i++)
1142 return value ==
null || value == DBNull.Value;
1161 private readonly
string _originalSql;
1162 private readonly
int _fetchSize;
1163 private readonly CommandBehavior _behavior;
1164 private readonly CancellationToken _cancellationToken;
1167 private IList<string>? _columnNames;
1168 private IList<string>? _columnTypes;
1169 private int _currentRowInPage = -1;
1170 private long _totalRowsRead = 0;
1171 private long _offset = 0;
1172 private bool _closed =
false;
1173 private bool _hasMore =
true;
1174 private long _totalRecordCount = -1;
1180 CommandBehavior behavior,
1181 CancellationToken cancellationToken =
default)
1183 _client = client ??
throw new ArgumentNullException(nameof(client));
1184 _originalSql = sql ??
throw new ArgumentNullException(nameof(sql));
1185 _fetchSize = fetchSize;
1186 _behavior = behavior;
1187 _cancellationToken = cancellationToken;
1193 private void FetchNextPage()
1197 Task.Run(async () => await FetchNextPageAsync(_cancellationToken).ConfigureAwait(
false))
1198 .ConfigureAwait(
false)
1203 private async Task FetchNextPageAsync(CancellationToken cancellationToken =
default)
1205 if (!_hasMore || _closed)
1209 var pagedSql = BuildPagedQuery(_originalSql, _offset, _fetchSize);
1211 var response = await _client.ExecuteSqlAsync(pagedSql, 0, -9999, options:
null, cancellationToken: cancellationToken).ConfigureAwait(
false);
1212 _currentPage = response;
1213 _currentRowInPage = -1;
1216 if (_columnNames ==
null && response.data !=
null && response.data.Count > 0)
1218 _columnNames =
new List<string>();
1219 _columnTypes =
new List<string>();
1221 var firstRecord = response.data[0];
1222 if (firstRecord?.Schema?.Fields !=
null)
1224 foreach (var field
in firstRecord.Schema.Fields)
1226 _columnNames.Add(field.Name);
1227 _columnTypes!.Add(GetAvroFieldType(field.Schema));
1233 if (_totalRecordCount < 0)
1235 _totalRecordCount = response.total_number_of_records;
1239 var recordsInPage = response.data?.Count ?? 0;
1240 _offset += recordsInPage;
1241 _hasMore = recordsInPage == _fetchSize && _offset < _totalRecordCount;
1244 private static string BuildPagedQuery(
string sql,
long offset,
int limit)
1247 var upperSql = sql.ToUpperInvariant();
1248 if (upperSql.Contains(
" LIMIT ") || upperSql.Contains(
" OFFSET "))
1255 return $
"{sql.TrimEnd(';', ' ')} LIMIT {limit} OFFSET {offset}";
1258 private static string GetAvroFieldType(
Avro.
Schema schema)
1262 foreach (var s
in unionSchema.Schemas)
1265 return GetAvroFieldType(s);
1269 return schema.Tag
switch 1282 public override bool HasRows => _totalRecordCount > 0;
1286 public override object this[
int ordinal] =>
GetValue(ordinal);
1302 if (_closed || _currentPage?.data ==
null)
1305 _currentRowInPage++;
1308 if (_currentRowInPage >= _currentPage.
data.Count)
1315 if (_currentPage?.data ==
null || _currentPage.
data.Count == 0)
1318 _currentRowInPage = 0;
1325 public override async Task<bool>
ReadAsync(CancellationToken cancellationToken)
1327 if (_closed || _currentPage?.data ==
null)
1330 _currentRowInPage++;
1333 if (_currentRowInPage >= _currentPage.
data.Count)
1338 await FetchNextPageAsync(cancellationToken).ConfigureAwait(
false);
1340 if (_currentPage?.data ==
null || _currentPage.
data.Count == 0)
1343 _currentRowInPage = 0;
1355 _currentPage =
null;
1360 var schemaTable =
new DataTable(
"SchemaTable");
1361 schemaTable.Columns.Add(
"ColumnName", typeof(
string));
1362 schemaTable.Columns.Add(
"ColumnOrdinal", typeof(
int));
1363 schemaTable.Columns.Add(
"DataType", typeof(
Type));
1364 schemaTable.Columns.Add(
"ColumnSize", typeof(
int));
1366 if (_columnNames !=
null)
1368 for (
int i = 0; i < _columnNames.Count; i++)
1370 var row = schemaTable.NewRow();
1371 row[
"ColumnName"] = _columnNames[i];
1372 row[
"ColumnOrdinal"] = i;
1374 row[
"ColumnSize"] = -1;
1375 schemaTable.Rows.Add(row);
1382 public override bool GetBoolean(
int ordinal) => Convert.ToBoolean(
GetValue(ordinal));
1383 public override byte GetByte(
int ordinal) => Convert.ToByte(
GetValue(ordinal));
1384 public override char GetChar(
int ordinal) => Convert.ToChar(
GetValue(ordinal));
1386 public override decimal
GetDecimal(
int ordinal) => Convert.ToDecimal(
GetValue(ordinal));
1387 public override double GetDouble(
int ordinal) => Convert.ToDouble(
GetValue(ordinal));
1388 public override float GetFloat(
int ordinal) => Convert.ToSingle(
GetValue(ordinal));
1389 public override Guid
GetGuid(
int ordinal) => Guid.Parse(
GetValue(ordinal)?.ToString() ?? Guid.Empty.ToString());
1390 public override short GetInt16(
int ordinal) => Convert.ToInt16(
GetValue(ordinal));
1391 public override int GetInt32(
int ordinal) => Convert.ToInt32(
GetValue(ordinal));
1392 public override long GetInt64(
int ordinal) => Convert.ToInt64(
GetValue(ordinal));
1393 public override string GetString(
int ordinal) =>
GetValue(ordinal)?.ToString() ??
string.Empty;
1395 public override long GetBytes(
int ordinal,
long dataOffset,
byte[]? buffer,
int bufferOffset,
int length)
1398 if (value is not
byte[] bytes)
1402 return bytes.Length;
1404 int bytesToCopy = Math.Min(bytes.Length - (
int)dataOffset, length);
1405 Array.Copy(bytes, (
int)dataOffset, buffer, bufferOffset, bytesToCopy);
1409 public override long GetChars(
int ordinal,
long dataOffset,
char[]? buffer,
int bufferOffset,
int length)
1415 int availableChars = str.Length - (int)dataOffset;
1416 int charsToCopy = Math.Min(availableChars, length);
1417 str.CopyTo((
int)dataOffset, buffer, bufferOffset, charsToCopy);
1423 if (_columnTypes !=
null && ordinal >= 0 && ordinal < _columnTypes.Count)
1424 return _columnTypes[ordinal];
1431 return dataType
switch 1433 "int" or
"integer" or
"int32" => typeof(
int),
1434 "int8" or
"tinyint" => typeof(sbyte),
1435 "int16" or
"smallint" => typeof(
short),
1436 "long" or
"bigint" or
"int64" => typeof(
long),
1437 "float" or
"real" => typeof(
float),
1438 "double" or
"float8" => typeof(
double),
1439 "decimal" or
"numeric" => typeof(decimal),
1440 "bool" or
"boolean" => typeof(
bool),
1441 "string" or
"varchar" or
"text" => typeof(
string),
1442 var s when s.StartsWith(
"char") => typeof(
string),
1444 "time" => typeof(TimeSpan),
1445 "datetime" or
"timestamp" => typeof(
DateTime),
1446 "uuid" or
"guid" => typeof(Guid),
1447 "bytes" or
"binary" => typeof(
byte[]),
1454 if (_columnNames !=
null && ordinal >= 0 && ordinal < _columnNames.Count)
1455 return _columnNames[ordinal];
1456 return $
"Column{ordinal}";
1461 if (_columnNames !=
null)
1463 for (
int i = 0; i < _columnNames.Count; i++)
1465 if (
string.Equals(_columnNames[i], name, StringComparison.OrdinalIgnoreCase))
1469 throw new ArgumentException($
"Column '{name}' not found");
1474 if (_currentRowInPage < 0 || _currentPage?.data == null || _currentRowInPage >= _currentPage.
data.Count)
1475 throw new InvalidOperationException(
"No current row");
1478 throw new ArgumentOutOfRangeException(nameof(ordinal));
1480 var record = _currentPage.
data[_currentRowInPage];
1482 return DBNull.Value;
1484 var fieldName =
GetName(ordinal);
1485 if (record.TryGetValue(fieldName, out var value))
1487 return value ?? DBNull.Value;
1490 return DBNull.Value;
1495 int count = Math.Min(values.Length,
FieldCount);
1496 for (
int i = 0; i < count; i++)
1506 return value ==
null || value == DBNull.Value;
1525 private readonly Dictionary<string, object> _properties;
1532 public const string Url =
"URL";
1573 public const string Ttl =
"TTL";
1622 _properties =
new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase);
1623 ParseConnectionString(connectionString);
1626 #region Connection Properties 1706 #region Timeout Properties 1746 #region SSL/TLS Properties 1786 #region Network Properties 1835 #region Query Optimization Properties 1911 #region Query Control Properties 1980 #region Insertion Properties 2056 #region File I/O Properties 2097 public string FileReadQuoteChar
2099 get => GetProperty<string>(PropertyKeys.FileReadQuoteChar,
"\"");
2100 set => SetProperty(PropertyKeys.FileReadQuoteChar, value);
2106 public string FileReadComment
2108 get => GetProperty<string>(PropertyKeys.FileReadComment,
"#");
2109 set => SetProperty(PropertyKeys.FileReadComment, value);
2115 public bool FileReadInitialClear
2117 get => GetProperty<bool>(PropertyKeys.FileReadInitialClear,
false);
2118 set => SetProperty(PropertyKeys.FileReadInitialClear, value);
2124 public int FileReadLimit
2126 get => GetProperty<int>(PropertyKeys.FileReadLimit,
int.MaxValue);
2127 set => SetProperty(PropertyKeys.FileReadLimit, value);
2133 public int FileReadSkip
2135 get => GetProperty<int>(PropertyKeys.FileReadSkip, 0);
2136 set => SetProperty(PropertyKeys.FileReadSkip, value);
2141 #region Connection Pooling Properties 2148 get => GetProperty<bool>(PropertyKeys.Pooling,
true);
2149 set => SetProperty(PropertyKeys.Pooling, value);
2155 public int MaxPoolSize
2157 get => GetProperty<int>(PropertyKeys.MaxPoolSize, 100);
2158 set => SetProperty(PropertyKeys.MaxPoolSize, value);
2164 public int MinPoolSize
2166 get => GetProperty<int>(PropertyKeys.MinPoolSize, 0);
2167 set => SetProperty(PropertyKeys.MinPoolSize, value);
2172 #region Batch Insert Properties 2178 public bool BatchInsertMode
2180 get => GetProperty<bool>(PropertyKeys.BatchInsertMode,
false);
2181 set => SetProperty(PropertyKeys.BatchInsertMode, value);
2187 public int BatchSize
2189 get => GetProperty<int>(PropertyKeys.BatchSize, 10000);
2190 set => SetProperty(PropertyKeys.BatchSize, value);
2196 public bool BatchUpdateOnExistingPk
2198 get => GetProperty<bool>(PropertyKeys.BatchUpdateOnExistingPk,
false);
2199 set => SetProperty(PropertyKeys.BatchUpdateOnExistingPk, value);
2204 #region Misc Properties 2209 public string? TimeZoneOverride
2211 get => GetProperty<string?>(PropertyKeys.TimeZoneOverride,
null);
2212 set => SetProperty(PropertyKeys.TimeZoneOverride, value);
2218 public string TokenNameClaim
2220 get => GetProperty<string>(PropertyKeys.TokenNameClaim,
"sub");
2221 set => SetProperty(PropertyKeys.TokenNameClaim, value);
2227 public bool FakeTransactions
2229 get => GetProperty<bool>(PropertyKeys.FakeTransactions,
false);
2230 set => SetProperty(PropertyKeys.FakeTransactions, value);
2236 public string? LogLevel
2238 get => GetProperty<string?>(PropertyKeys.LogLevel,
null);
2239 set => SetProperty(PropertyKeys.LogLevel, value);
2244 #region Property Access Methods 2246 private T GetProperty<T>(
string key, T defaultValue)
2248 if (_properties.TryGetValue(key, out var value))
2251 return defaultValue;
2254 if (typeof(T) == typeof(
bool) && value is
string strVal)
2256 if (
bool.TryParse(strVal, out var boolResult))
2257 return (T)(object)boolResult;
2259 if (strVal ==
"1" || strVal.Equals(
"true", StringComparison.OrdinalIgnoreCase))
2260 return (T)(object)
true;
2261 if (strVal ==
"0" || strVal.Equals(
"false", StringComparison.OrdinalIgnoreCase))
2262 return (T)(object)
false;
2266 if (typeof(T) == typeof(
int) && value is
string strIntVal)
2268 if (
int.TryParse(strIntVal, out var intResult))
2269 return (T)(object)intResult;
2272 return (T)Convert.ChangeType(value, typeof(T));
2274 return defaultValue;
2277 private void SetProperty(
string key,
object? value)
2280 _properties[key] = value;
2282 _properties.Remove(key);
2288 public IReadOnlyDictionary<string, object> Properties => _properties;
2293 public object?
this[
string key]
2295 get => _properties.TryGetValue(key, out var value) ? value :
null;
2296 set => SetProperty(key, value);
2303 private void ParseConnectionString(
string connectionString)
2305 if (
string.IsNullOrEmpty(connectionString))
2308 var pairs = connectionString.Split(
';');
2309 foreach (var pair
in pairs)
2311 var keyValue = pair.Split(
'=', 2);
2312 if (keyValue.Length == 2)
2314 var key = keyValue[0].Trim();
2315 var value = keyValue[1].Trim();
2318 key = NormalizePropertyKey(key);
2319 _properties[key] = value;
2324 private static string NormalizePropertyKey(
string key)
2327 return key.ToUpperInvariant()
switch 2329 "URL" or
"HOST" or
"DATA SOURCE" => PropertyKeys.Server,
2330 "UID" or
"USER" or
"USER ID" => PropertyKeys.Username,
2331 "PWD" => PropertyKeys.Password,
2332 "INITIAL CATALOG" => PropertyKeys.Database,
2333 "OAUTHTOKEN" or
"OAUTH TOKEN" or
"OAUTH_TOKEN" => PropertyKeys.OAuthToken,
2338 public override string ToString()
2340 return string.Join(
";", _properties.Select(kvp => $
"{kvp.Key}={kvp.Value}"));
2345 #region Query Options Builder 2350 public IDictionary<string, string> BuildQueryOptions()
2352 var options =
new Dictionary<string, string>();
2356 options[
"cost_based_optimization"] =
"false";
2358 options[
"distributed_joins"] =
"false";
2360 options[
"parallel_execution"] =
"false";
2362 options[
"plan_cache"] =
"false";
2364 options[
"results_caching"] =
"false";
2366 options[
"rule_based_optimizations"] =
"false";
2368 options[
"ssq_optimization"] =
"false";
2370 options[
"use_approx_count_distinct"] =
"true";
2374 options[
"ttl"] =
Ttl.ToString();
2378 options[
"limit"] =
Limit.ToString();
2386 public IDictionary<string, string> BuildInsertOptions()
2388 var options =
new Dictionary<string, string>();
2391 options[
"update_on_existing_pk"] =
"true";
2393 options[
"ignore_existing_pk"] =
"true";
2395 options[
"truncate_strings"] =
"true";
2397 options[
"no_sync"] =
"true";
2412 private readonly ConcurrentDictionary<string, ConnectionPoolEntry> _pools =
new();
2413 private readonly Timer _cleanupTimer;
2417 _cleanupTimer =
new Timer(CleanupExpiredConnections!,
null, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(5));
2420 public async Task<Kinetica>
GetConnectionAsync(
string connectionString, CancellationToken cancellationToken)
2422 var pool = _pools.GetOrAdd(connectionString, cs =>
new ConnectionPoolEntry(cs));
2423 return await pool.GetConnectionAsync(cancellationToken).ConfigureAwait(
false);
2428 if (_pools.TryGetValue(connectionString, out var pool))
2430 pool.ReturnConnection(connection);
2434 private void CleanupExpiredConnections(
object state)
2436 foreach (var pool
in _pools.Values)
2438 pool.CleanupExpiredConnections();
2442 private class ConnectionPoolEntry
2444 private readonly KineticaConnectionStringBuilder _connectionStringBuilder;
2445 private readonly ConcurrentQueue<PooledConnection> _connections =
new();
2446 private readonly SemaphoreSlim _semaphore;
2447 private int _currentCount = 0;
2449 public ConnectionPoolEntry(
string connectionString)
2451 _connectionStringBuilder =
new KineticaConnectionStringBuilder(connectionString);
2452 _semaphore =
new SemaphoreSlim(_connectionStringBuilder.MaxPoolSize, _connectionStringBuilder.MaxPoolSize);
2457 await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(
false);
2462 while (_connections.TryDequeue(out var pooledConnection))
2464 if (pooledConnection.IsValid && await ValidateConnectionAsync(pooledConnection.Connection, cancellationToken).ConfigureAwait(
false))
2466 return pooledConnection.Connection;
2469 Interlocked.Decrement(ref _currentCount);
2475 if (!
string.IsNullOrEmpty(_connectionStringBuilder.Username) || !
string.IsNullOrEmpty(_connectionStringBuilder.Password))
2479 Username = _connectionStringBuilder.Username ??
string.Empty,
2480 Password = _connectionStringBuilder.Password ??
string.Empty
2484 if (!
string.IsNullOrEmpty(_connectionStringBuilder.OAuthToken))
2486 options ??=
new Kinetica.Options();
2487 options.OauthToken = _connectionStringBuilder.OAuthToken;
2490 var client =
new Kinetica(_connectionStringBuilder.Server, options);
2493 await client.ShowSystemStatusAsync(
new Dictionary<string, string>(), cancellationToken).ConfigureAwait(
false);
2495 Interlocked.Increment(ref _currentCount);
2500 _semaphore.Release();
2505 private async Task<bool> ValidateConnectionAsync(
Kinetica connection, CancellationToken cancellationToken)
2510 await connection.ShowSystemStatusAsync(
new Dictionary<string, string>(), cancellationToken).ConfigureAwait(
false);
2523 _connections.Enqueue(
new PooledConnection(connection));
2527 _semaphore.Release();
2531 public void CleanupExpiredConnections()
2533 var activeConnections =
new List<PooledConnection>();
2535 while (_connections.TryDequeue(out var pooledConnection))
2537 if (pooledConnection.IsValid)
2539 activeConnections.Add(pooledConnection);
2544 Interlocked.Decrement(ref _currentCount);
2548 foreach (var connection
in activeConnections)
2550 _connections.Enqueue(connection);
2554 private class PooledConnection
2556 public Kinetica Connection {
get; }
2559 public PooledConnection(
Kinetica connection)
2561 Connection = connection;
2565 public bool IsValid =>
DateTime.UtcNow - CreatedAt < TimeSpan.FromMinutes(30);
2581 private static readonly Regex _selectRegex =
new Regex(
@"^\s*SELECT\s", RegexOptions.IgnoreCase | RegexOptions.Compiled);
2582 private static readonly Regex _insertRegex =
new Regex(
@"^\s*INSERT\s", RegexOptions.IgnoreCase | RegexOptions.Compiled);
2583 private static readonly Regex _updateRegex =
new Regex(
@"^\s*UPDATE\s", RegexOptions.IgnoreCase | RegexOptions.Compiled);
2584 private static readonly Regex _deleteRegex =
new Regex(
@"^\s*DELETE\s", RegexOptions.IgnoreCase | RegexOptions.Compiled);
2585 private static readonly Regex _createTableRegex =
new Regex(
@"^\s*CREATE\s+TABLE\s", RegexOptions.IgnoreCase | RegexOptions.Compiled);
2586 private static readonly Regex _dropTableRegex =
new Regex(
@"^\s*DROP\s+TABLE\s", RegexOptions.IgnoreCase | RegexOptions.Compiled);
2587 private static readonly Regex _parameterRegex =
new Regex(
@"@(\w+)", RegexOptions.Compiled);
2589 private static readonly Regex _positionalParamRegex =
new Regex(
@"\?", RegexOptions.Compiled);
2592 private static readonly Regex _setUserRegex =
new Regex(
2593 @"^\s*SET\s+USER\s+['""]?(\w+)['""]?\s*$",
2594 RegexOptions.IgnoreCase | RegexOptions.Compiled);
2595 private static readonly Regex _executeAsUserRegex =
new Regex(
2596 @"^\s*EXECUTE\s+AS\s+USER\s+['""]?(\w+)['""]?\s*$",
2597 RegexOptions.IgnoreCase | RegexOptions.Compiled);
2598 private static readonly Regex _revertRegex =
new Regex(
2600 RegexOptions.IgnoreCase | RegexOptions.Compiled);
2603 private static readonly Regex _setSchemaRegex =
new Regex(
2604 @"^\s*SET\s+(?:SCHEMA|SQLID)\s+['""]?([.\w]+)['""]?\s*$",
2605 RegexOptions.IgnoreCase | RegexOptions.Compiled);
2609 private static readonly Regex _hintPattern =
new Regex(
2610 @"/\*\+?\s*(?:KI_HINT_\w+(?:\([^)]*\))?\s*)+\*/",
2611 RegexOptions.IgnoreCase | RegexOptions.Compiled);
2612 private static readonly Regex _batchSizeHint =
new Regex(
2613 @"KI_HINT_BATCH_SIZE\s*\(\s*(\d+)\s*\)",
2614 RegexOptions.IgnoreCase | RegexOptions.Compiled);
2615 private static readonly Regex _truncateStringsHint =
new Regex(
2616 @"KI_HINT_TRUNCATE_STRINGS",
2617 RegexOptions.IgnoreCase | RegexOptions.Compiled);
2618 private static readonly Regex _updateOnPkHint =
new Regex(
2619 @"KI_HINT_UPDATE_ON_EXISTING_PK",
2620 RegexOptions.IgnoreCase | RegexOptions.Compiled);
2621 private static readonly Regex _disableMultiheadHint =
new Regex(
2622 @"KI_HINT_DISABLE_MULTIHEAD",
2623 RegexOptions.IgnoreCase | RegexOptions.Compiled);
2624 private static readonly Regex _keyLookupHint =
new Regex(
2625 @"KI_HINT_KEY_LOOKUP",
2626 RegexOptions.IgnoreCase | RegexOptions.Compiled);
2627 private static readonly Regex _replSyncHint =
new Regex(
2628 @"KI_HINT_REPL_SYNC(?:_PARALLEL)?",
2629 RegexOptions.IgnoreCase | RegexOptions.Compiled);
2630 private static readonly Regex _ignoreExistingPkHint =
new Regex(
2631 @"KI_HINT_IGNORE_EXISTING_PK",
2632 RegexOptions.IgnoreCase | RegexOptions.Compiled);
2633 private static readonly Regex _serverSideInsertHint =
new Regex(
2634 @"KI_HINT_SERVER_SIDE_INSERT",
2635 RegexOptions.IgnoreCase | RegexOptions.Compiled);
2637 private static readonly Regex _pkConflictPredicateLowerHint =
new Regex(
2638 @"KI_HINT_PK_CONFLICT_PREDICATE_LOWER\s*\(\s*(\w+)\s*\)",
2639 RegexOptions.IgnoreCase | RegexOptions.Compiled);
2640 private static readonly Regex _pkConflictPredicateHigherHint =
new Regex(
2641 @"KI_HINT_PK_CONFLICT_PREDICATE_HIGHER\s*\(\s*(\w+)\s*\)",
2642 RegexOptions.IgnoreCase | RegexOptions.Compiled);
2647 private static readonly Regex _insertFromFileRegex =
new Regex(
2648 @"^\s*INSERT\s+INTO\s+(?<table>[\w\.]+)\s*(?:\(\s*(?<insertColumns>[\w\s,""]+)\s*\))?\s*SELECT\s+(?:(?<selectAll>\*)|(?<selectColumns>[\w\s,""]+))\s+FROM\s+(?:FILE\s*\.\s*""(?<filePath>[^""]+)""|'(?<kifsPath>kifs://[^']+)')(?:\s+WITH\s+OPTIONS\s*\(\s*(?<options>[^)]*)\s*\))?\s*$",
2649 RegexOptions.IgnoreCase | RegexOptions.Compiled);
2653 var commandType = DetermineCommandType(sql);
2654 var hints = ExtractHints(sql);
2657 var strippedSql = StripHints(sql);
2658 var finalSql = SubstituteParameters(strippedSql, parameters);
2661 var userImpersonation = ExtractUserImpersonation(sql);
2667 insertFromFileInfo = ExtractInsertFromFileInfo(sql);
2673 FinalSql = finalSql,
2674 CommandType = commandType,
2675 Parameters = parameters,
2677 UserImpersonation = userImpersonation,
2685 private string StripHints(
string sql)
2688 return _hintPattern.Replace(sql,
"").Trim();
2714 private QueryHints ExtractHints(
string sql)
2716 var hints =
new QueryHints();
2719 var batchMatch = _batchSizeHint.Match(sql);
2720 if (batchMatch.Success &&
int.TryParse(batchMatch.Groups[1].Value, out var batchSize))
2722 hints.BatchSize = batchSize;
2726 hints.TruncateStrings = _truncateStringsHint.IsMatch(sql);
2727 hints.UpdateOnExistingPk = _updateOnPkHint.IsMatch(sql);
2728 hints.IgnoreExistingPk = _ignoreExistingPkHint.IsMatch(sql);
2729 hints.DisableMultihead = _disableMultiheadHint.IsMatch(sql);
2730 hints.UseKeyLookup = _keyLookupHint.IsMatch(sql);
2731 hints.ReplicationSync = _replSyncHint.IsMatch(sql);
2732 hints.ServerSideInsert = _serverSideInsertHint.IsMatch(sql);
2735 var lowerMatch = _pkConflictPredicateLowerHint.Match(sql);
2736 if (lowerMatch.Success)
2738 hints.PkConflictPredicateLowerColumn = lowerMatch.Groups[1].Value;
2741 var higherMatch = _pkConflictPredicateHigherHint.Match(sql);
2742 if (higherMatch.Success)
2744 hints.PkConflictPredicateHigherColumn = higherMatch.Groups[1].Value;
2755 var match = _setSchemaRegex.Match(sql);
2756 return match.Success ? match.Groups[1].Value :
null;
2764 var setUserMatch = _setUserRegex.Match(sql);
2765 if (setUserMatch.Success)
2770 Username = setUserMatch.Groups[1].Value
2774 var executeAsMatch = _executeAsUserRegex.Match(sql);
2775 if (executeAsMatch.Success)
2777 return new UserImpersonationInfo
2780 Username = executeAsMatch.Groups[1].Value
2784 if (_revertRegex.IsMatch(sql))
2786 return new UserImpersonationInfo
2799 private InsertFromFileInfo? ExtractInsertFromFileInfo(
string sql)
2801 var match = _insertFromFileRegex.Match(sql);
2805 var info =
new InsertFromFileInfo
2807 TableName = match.Groups[
"table"].Value.Trim(),
2808 SelectAll = match.Groups[
"selectAll"].Success && match.Groups[
"selectAll"].Value ==
"*",
2809 FilePath = match.Groups[
"filePath"].Success ? match.Groups[
"filePath"].Value :
null,
2810 KifsPath = match.Groups[
"kifsPath"].Success ? match.Groups[
"kifsPath"].Value :
null 2814 if (match.Groups[
"insertColumns"].Success && !
string.IsNullOrEmpty(match.Groups[
"insertColumns"].Value))
2816 info.InsertColumns = ParseColumnList(match.Groups[
"insertColumns"].Value);
2820 if (!info.SelectAll && match.Groups[
"selectColumns"].Success && !
string.IsNullOrEmpty(match.Groups[
"selectColumns"].Value))
2822 info.SelectColumns = ParseColumnList(match.Groups[
"selectColumns"].Value);
2826 if (match.Groups[
"options"].Success && !
string.IsNullOrEmpty(match.Groups[
"options"].Value))
2828 ParseFileOptions(match.Groups[
"options"].Value, info.Options);
2832 var path = info.ActualPath.ToLowerInvariant();
2835 if (path.EndsWith(
".parquet") || path.EndsWith(
".pqt"))
2837 else if (path.EndsWith(
".json") || path.EndsWith(
".jsonl"))
2839 else if (path.EndsWith(
".avro"))
2841 else if (path.EndsWith(
".shp"))
2844 info.Options.Format =
FileFormat.DelimitedText;
2848 if (info.Options.Format ==
FileFormat.DelimitedText)
2850 if (path.EndsWith(
".psv"))
2851 info.Options.Delimiter =
'|';
2852 else if (path.EndsWith(
".tsv"))
2853 info.Options.Delimiter =
'\t';
2862 private static List<string> ParseColumnList(
string columns)
2864 var result =
new List<string>();
2865 var current =
new System.Text.StringBuilder();
2866 bool inQuote =
false;
2868 foreach (
char c
in columns)
2874 else if (c ==
',' && !inQuote)
2876 var col = current.ToString().Trim().Trim(
'"');
2877 if (!
string.IsNullOrEmpty(col))
2888 var lastCol = current.ToString().Trim().Trim(
'"');
2889 if (!
string.IsNullOrEmpty(lastCol))
2890 result.Add(lastCol);
2898 private static void ParseFileOptions(
string optionsStr, FileInsertOptions options)
2901 var optionPattern =
new Regex(
2902 @"(\w+)\s*=\s*(?:'([^']*)'|""([^""]*)""|(\d+)|(\w+))",
2903 RegexOptions.IgnoreCase);
2905 var matches = optionPattern.Matches(optionsStr);
2906 foreach (Match m
in matches)
2908 var name = m.Groups[1].Value.ToUpperInvariant().Replace(
"_",
"");
2909 var value = m.Groups[2].Success ? m.Groups[2].Value :
2910 m.Groups[3].Success ? m.Groups[3].Value :
2911 m.Groups[4].Success ? m.Groups[4].Value :
2917 if (
int.TryParse(value, out var batchSize))
2918 options.BatchSize = batchSize;
2921 if (!
string.IsNullOrEmpty(value))
2922 options.Delimiter = value[0];
2925 if (!
string.IsNullOrEmpty(value))
2926 options.QuoteChar = value[0];
2929 if (!
string.IsNullOrEmpty(value))
2930 options.EscapeChar = value[0];
2934 options.NullString = value;
2937 options.CommentPrefix = value;
2940 if (
int.TryParse(value, out var skip))
2941 options.Skip = skip;
2944 if (
int.TryParse(value, out var limit))
2945 options.Limit = limit;
2949 options.HasHeader = value.Equals(
"TRUE", StringComparison.OrdinalIgnoreCase) ||
2950 value.Equals(
"YES", StringComparison.OrdinalIgnoreCase) ||
2953 case "INITIALCLEAR":
2954 options.InitialClear = value.Equals(
"TRUE", StringComparison.OrdinalIgnoreCase) ||
2955 value.Equals(
"YES", StringComparison.OrdinalIgnoreCase) ||
2960 if (Enum.TryParse<
FileErrorMode>(value,
true, out var errorMode))
2961 options.ErrorMode = errorMode;
2963 case "IGNOREEXISTINGPK":
2964 options.IgnoreExistingPk = value.Equals(
"TRUE", StringComparison.OrdinalIgnoreCase) ||
2965 value.Equals(
"YES", StringComparison.OrdinalIgnoreCase) ||
2968 case "UPDATEONEXISTINGPK":
2969 options.UpdateOnExistingPk = value.Equals(
"TRUE", StringComparison.OrdinalIgnoreCase) ||
2970 value.Equals(
"YES", StringComparison.OrdinalIgnoreCase) ||
2973 case "TRUNCATESTRINGS":
2974 options.TruncateStrings = value.Equals(
"TRUE", StringComparison.OrdinalIgnoreCase) ||
2975 value.Equals(
"YES", StringComparison.OrdinalIgnoreCase) ||
2979 options.DryRun = value.Equals(
"TRUE", StringComparison.OrdinalIgnoreCase) ||
2980 value.Equals(
"YES", StringComparison.OrdinalIgnoreCase) ||
2985 if (Enum.TryParse<
FileFormat>(value,
true, out var format))
2986 options.Format = format;
2987 else if (value.Equals(
"DELIMITED_TEXT", StringComparison.OrdinalIgnoreCase) ||
2988 value.Equals(
"CSV", StringComparison.OrdinalIgnoreCase) ||
2989 value.Equals(
"TSV", StringComparison.OrdinalIgnoreCase) ||
2990 value.Equals(
"PSV", StringComparison.OrdinalIgnoreCase))
2997 private string SubstituteParameters(
string sql, DbParameterCollection parameters)
3000 var result = _parameterRegex.Replace(sql, match =>
3002 var paramName = match.Groups[1].Value;
3003 var param = parameters.Cast<DbParameter>().FirstOrDefault(p =>
3004 p.ParameterName.Equals($
"@{paramName}", StringComparison.OrdinalIgnoreCase) ||
3005 p.ParameterName.Equals(paramName, StringComparison.OrdinalIgnoreCase));
3007 return FormatParameterValue(param);
3011 if (_positionalParamRegex.IsMatch(result))
3013 var positionalParams = parameters.Cast<DbParameter>()
3014 .Where(p =>
string.IsNullOrEmpty(p.ParameterName) || p.ParameterName.StartsWith(
"?"))
3018 var numberedParams = parameters.Cast<DbParameter>()
3019 .Where(p =>
int.TryParse(p.ParameterName?.TrimStart(
'@'), out _))
3020 .OrderBy(p =>
int.
Parse(p.ParameterName!.TrimStart(
'@')))
3023 var allPositional = positionalParams.Count > 0 ? positionalParams : numberedParams;
3026 result = _positionalParamRegex.Replace(result, match =>
3028 if (paramIndex < allPositional.Count)
3030 return FormatParameterValue(allPositional[paramIndex++]);
3040 private string FormatParameterValue(DbParameter? param)
3042 if (param?.Value ==
null || param.Value == DBNull.Value)
3046 return param.DbType
switch 3049 DbType.String or DbType.AnsiString or DbType.StringFixedLength or DbType.AnsiStringFixedLength
3050 => $
"'{EscapeString(param.Value.ToString())}'",
3053 DbType.DateTime or DbType.DateTime2 => FormatDateTime(param.Value),
3054 DbType.Date => FormatDate(param.Value),
3055 DbType.Time => FormatTime(param.Value),
3056 DbType.DateTimeOffset => FormatDateTimeOffset(param.Value),
3059 DbType.Boolean => FormatBoolean(param.Value),
3062 DbType.Guid => $
"'{FormatGuid(param.Value)}'",
3065 DbType.Binary => FormatBinary(param.Value),
3068 DbType.Decimal or DbType.Currency or DbType.VarNumeric
3069 => Convert.ToDecimal(param.Value).ToString(System.Globalization.CultureInfo.InvariantCulture),
3070 DbType.Double => Convert.ToDouble(param.Value).ToString(System.Globalization.CultureInfo.InvariantCulture),
3071 DbType.Single => Convert.ToSingle(param.Value).ToString(System.Globalization.CultureInfo.InvariantCulture),
3074 _ => FormatValueByType(param.Value)
3078 private static string FormatValueByType(
object value)
3082 string s => $
"'{EscapeString(s)}'",
3083 DateTime dt => $
"'{dt:yyyy-MM-dd HH:mm:ss.fff}'",
3084 DateTimeOffset dto => $
"'{dto:yyyy-MM-dd HH:mm:ss.fff}'",
3085 DateOnly d => $
"'{d:yyyy-MM-dd}'",
3086 TimeOnly t => $
"'{t:HH:mm:ss.fff}'",
3087 TimeSpan ts => $
"'{ts:hh\\:mm\\:ss\\.fff}'",
3088 bool b => b ?
"TRUE" :
"FALSE",
3090 byte[] bytes =>
"0x" + BitConverter.ToString(bytes).Replace(
"-",
""),
3091 decimal dec => dec.ToString(System.Globalization.CultureInfo.InvariantCulture),
3092 double dbl => dbl.ToString(System.Globalization.CultureInfo.InvariantCulture),
3093 float flt => flt.ToString(System.Globalization.CultureInfo.InvariantCulture),
3094 _ => value.ToString() ??
"NULL" 3098 private static string EscapeString(
string? value)
3100 if (value ==
null)
return "";
3101 return value.Replace(
"'",
"''");
3104 private static string FormatDateTime(
object value)
3106 if (value is DateTime dt)
3107 return $
"'{dt:yyyy-MM-dd HH:mm:ss.fff}'";
3108 if (value is DateTimeOffset dto)
3109 return $
"'{dto:yyyy-MM-dd HH:mm:ss.fff}'";
3110 return $
"'{value}'";
3113 private static string FormatDate(
object value)
3115 if (value is DateTime dt)
3116 return $
"'{dt:yyyy-MM-dd}'";
3117 if (value is DateTimeOffset dto)
3118 return $
"'{dto:yyyy-MM-dd}'";
3119 if (value is DateOnly d)
3120 return $
"'{d:yyyy-MM-dd}'";
3121 return $
"'{value}'";
3124 private static string FormatTime(
object value)
3126 if (value is DateTime dt)
3127 return $
"'{dt:HH:mm:ss.fff}'";
3128 if (value is TimeSpan ts)
3129 return $
"'{ts:hh\\:mm\\:ss\\.fff}'";
3130 if (value is TimeOnly t)
3131 return $
"'{t:HH:mm:ss.fff}'";
3132 return $
"'{value}'";
3135 private static string FormatDateTimeOffset(
object value)
3137 if (value is DateTimeOffset dto)
3138 return $
"'{dto:yyyy-MM-dd HH:mm:ss.fffzzz}'";
3139 return FormatDateTime(value);
3142 private static string FormatBoolean(
object value)
3144 if (value is
bool b)
3145 return b ?
"TRUE" :
"FALSE";
3148 return i != 0 ?
"TRUE" :
"FALSE";
3149 if (value is
string s)
3150 return s.Equals(
"true", StringComparison.OrdinalIgnoreCase) || s ==
"1" ?
"TRUE" :
"FALSE";
3151 return Convert.ToBoolean(value) ?
"TRUE" :
"FALSE";
3154 private static string FormatGuid(
object value)
3156 if (value is Guid g)
3157 return g.ToString();
3158 return value.ToString() ??
"";
3161 private static string FormatBinary(
object value)
3163 if (value is
byte[] bytes)
3166 return "0x" + BitConverter.ToString(bytes).Replace(
"-",
"");
3268 var options =
new Dictionary<string, string>();
3271 options[
"truncate_strings"] =
"true";
3273 options[
"update_on_existing_pk"] =
"true";
3275 options[
"ignore_existing_pk"] =
"true";
3277 options[
"replication_mode"] =
"sync";
3291 var options =
new Dictionary<string, string>();
3294 options[
"key_lookup"] =
"true";
3449 _connection = connection;
3452 public DataTable
GetSchema(
string collectionName,
string?[]? restrictionValues)
3454 return collectionName.ToUpper()
switch 3456 "METADATACOLLECTIONS" => GetMetaDataCollections(),
3457 "TABLES" => GetTables(restrictionValues),
3458 "COLUMNS" => GetColumns(restrictionValues),
3459 "VIEWS" => GetViews(restrictionValues),
3460 "INDEXES" => GetIndexes(restrictionValues),
3461 "PROCEDURES" => GetProcedures(restrictionValues),
3462 "USERS" => GetUsers(restrictionValues),
3463 "ROLES" => GetRoles(restrictionValues),
3464 "DATATYPES" => GetDataTypes(),
3465 _ =>
throw new ArgumentException($
"Unsupported schema collection: {collectionName}")
3469 private DataTable GetMetaDataCollections()
3471 var table =
new DataTable(
"MetaDataCollections");
3472 table.Columns.Add(
"CollectionName", typeof(
string));
3473 table.Columns.Add(
"NumberOfRestrictions", typeof(
int));
3474 table.Columns.Add(
"NumberOfIdentifierParts", typeof(
int));
3476 table.Rows.Add(
"MetaDataCollections", 0, 0);
3477 table.Rows.Add(
"Tables", 4, 3);
3478 table.Rows.Add(
"Columns", 4, 4);
3479 table.Rows.Add(
"Views", 3, 3);
3480 table.Rows.Add(
"Indexes", 4, 3);
3481 table.Rows.Add(
"Procedures", 4, 3);
3482 table.Rows.Add(
"Users", 1, 1);
3483 table.Rows.Add(
"Roles", 1, 1);
3484 table.Rows.Add(
"DataTypes", 0, 0);
3489 private DataTable GetTables(
string?[]? restrictionValues)
3491 var table =
new DataTable(
"Tables");
3492 table.Columns.Add(
"TABLE_CATALOG", typeof(
string));
3493 table.Columns.Add(
"TABLE_SCHEMA", typeof(
string));
3494 table.Columns.Add(
"TABLE_NAME", typeof(
string));
3495 table.Columns.Add(
"TABLE_TYPE", typeof(
string));
3499 var client = _connection.GetKineticaClient();
3501 var response = client.showTable(
"",
new Dictionary<string, string>
3503 {
"show_children",
"true" }
3506 foreach (var tableName
in response.table_names ??
new List<string>())
3509 if (restrictionValues !=
null && restrictionValues.Length > 2 &&
3510 !
string.IsNullOrEmpty(restrictionValues[2]) &&
3511 !tableName.Equals(restrictionValues[2], StringComparison.OrdinalIgnoreCase))
3514 table.Rows.Add(
null,
"public", tableName,
"BASE TABLE");
3517 catch (Exception ex)
3519 throw new KineticaException($
"Failed to retrieve table schema: {ex.Message}", ex);
3525 private DataTable GetColumns(
string?[]? restrictionValues)
3527 var table =
new DataTable(
"Columns");
3528 table.Columns.Add(
"TABLE_CATALOG", typeof(
string));
3529 table.Columns.Add(
"TABLE_SCHEMA", typeof(
string));
3530 table.Columns.Add(
"TABLE_NAME", typeof(
string));
3531 table.Columns.Add(
"COLUMN_NAME", typeof(
string));
3532 table.Columns.Add(
"ORDINAL_POSITION", typeof(
int));
3533 table.Columns.Add(
"COLUMN_DEFAULT", typeof(
string));
3534 table.Columns.Add(
"IS_NULLABLE", typeof(
string));
3535 table.Columns.Add(
"DATA_TYPE", typeof(
string));
3536 table.Columns.Add(
"CHARACTER_MAXIMUM_LENGTH", typeof(
int));
3537 table.Columns.Add(
"NUMERIC_PRECISION", typeof(
int));
3538 table.Columns.Add(
"NUMERIC_SCALE", typeof(
int));
3542 var client = _connection.GetKineticaClient();
3543 var tablesResponse = client.showTable(
"",
new Dictionary<string, string>
3545 {
"show_children",
"true" }
3548 foreach (var tableName
in tablesResponse.table_names ??
new List<string>())
3551 if (restrictionValues !=
null && restrictionValues.Length > 2 &&
3552 !
string.IsNullOrEmpty(restrictionValues[2]) &&
3553 !tableName.Equals(restrictionValues[2], StringComparison.OrdinalIgnoreCase))
3558 var tableInfo = client.showTable(tableName);
3559 var typeSchemas = tableInfo.type_schemas;
3560 var properties = tableInfo.properties;
3562 if (typeSchemas !=
null && typeSchemas.Count > 0)
3565 var schemaJson = typeSchemas[0];
3569 if (avroSchema !=
null)
3572 IDictionary<string, IList<string>>? columnProperties =
null;
3573 if (properties !=
null && properties.Count > 0)
3575 columnProperties = properties[0];
3577 _ = columnProperties;
3580 foreach (var field
in avroSchema.Fields)
3582 var columnName = field.
Name;
3583 var dataType = GetKineticaTypeFromAvro(field.Schema);
3586 if (restrictionValues !=
null && restrictionValues.Length > 3 &&
3587 !
string.IsNullOrEmpty(restrictionValues[3]) &&
3588 !columnName.Equals(restrictionValues[3], StringComparison.OrdinalIgnoreCase))
3594 var isNullable = IsNullableField(field.Schema) ?
"YES" :
"NO";
3595 var maxLength = GetMaxLength(dataType);
3596 var (precision, scale) = GetPrecisionAndScale(dataType);
3606 MapKineticaTypeToSqlType(dataType),
3616 catch (Exception ex)
3619 System.Diagnostics.Debug.WriteLine($
"Failed to get schema for table {tableName}: {ex.Message}");
3623 catch (Exception ex)
3625 throw new KineticaException($
"Failed to retrieve column schema: {ex.Message}", ex);
3631 private DataTable GetViews(
string?[]? restrictionValues)
3633 var table =
new DataTable(
"Views");
3634 table.Columns.Add(
"TABLE_CATALOG", typeof(
string));
3635 table.Columns.Add(
"TABLE_SCHEMA", typeof(
string));
3636 table.Columns.Add(
"TABLE_NAME", typeof(
string));
3637 table.Columns.Add(
"VIEW_DEFINITION", typeof(
string));
3638 table.Columns.Add(
"VIEW_TYPE", typeof(
string));
3639 table.Columns.Add(
"IS_UPDATABLE", typeof(
string));
3643 var client = _connection.GetKineticaClient();
3645 var response = client.showTable(
"",
new Dictionary<string, string>
3647 {
"show_children",
"true" }
3650 for (
int i = 0; i < response.table_names.Count; i++)
3652 var viewName = response.table_names[i];
3653 var descriptions = response.table_descriptions[i];
3656 bool isView = descriptions.Any(d =>
3657 d ==
"LOGICAL_VIEW" ||
3658 d ==
"MATERIALIZED_VIEW" ||
3660 d ==
"MATERIALIZED_VIEW_MEMBER");
3666 if (restrictionValues !=
null && restrictionValues.Length > 2 &&
3667 !
string.IsNullOrEmpty(restrictionValues[2]) &&
3668 !viewName.Equals(restrictionValues[2], StringComparison.OrdinalIgnoreCase))
3672 string schemaName =
"public";
3673 string viewDefinition =
"";
3674 if (response.additional_info !=
null && i < response.additional_info.Count)
3676 var info = response.additional_info[i];
3677 if (info.TryGetValue(
"schema_name", out var schema))
3678 schemaName = schema;
3679 if (info.TryGetValue(
"request_avro_json", out var definition))
3680 viewDefinition = definition;
3684 string viewType = descriptions.Contains(
"MATERIALIZED_VIEW") ?
"MATERIALIZED" :
"LOGICAL";
3696 catch (Exception ex)
3704 private DataTable GetIndexes(
string?[]? restrictionValues)
3706 var table =
new DataTable(
"Indexes");
3707 table.Columns.Add(
"TABLE_CATALOG", typeof(
string));
3708 table.Columns.Add(
"TABLE_SCHEMA", typeof(
string));
3709 table.Columns.Add(
"TABLE_NAME", typeof(
string));
3710 table.Columns.Add(
"INDEX_NAME", typeof(
string));
3711 table.Columns.Add(
"INDEX_TYPE", typeof(
string));
3712 table.Columns.Add(
"COLUMN_NAME", typeof(
string));
3713 table.Columns.Add(
"ORDINAL_POSITION", typeof(
int));
3714 table.Columns.Add(
"IS_UNIQUE", typeof(
bool));
3718 var client = _connection.GetKineticaClient();
3719 var tablesResponse = client.showTable(
"",
new Dictionary<string, string>
3721 {
"show_children",
"true" }
3724 for (
int i = 0; i < tablesResponse.table_names.Count; i++)
3726 var tableName = tablesResponse.table_names[i];
3727 var descriptions = tablesResponse.table_descriptions[i];
3730 if (descriptions.Contains(
"SCHEMA") ||
3731 descriptions.Contains(
"LOGICAL_VIEW") ||
3732 descriptions.Contains(
"MATERIALIZED_VIEW"))
3736 if (restrictionValues !=
null && restrictionValues.Length > 2 &&
3737 !
string.IsNullOrEmpty(restrictionValues[2]) &&
3738 !tableName.Equals(restrictionValues[2], StringComparison.OrdinalIgnoreCase))
3742 string schemaName =
"public";
3743 string attributeIndexes =
"";
3745 if (tablesResponse.additional_info !=
null && i < tablesResponse.additional_info.Count)
3747 var info = tablesResponse.additional_info[i];
3748 if (info.TryGetValue(
"schema_name", out var schema))
3749 schemaName = schema;
3750 if (info.TryGetValue(
"attribute_indexes", out var indexes))
3751 attributeIndexes = indexes;
3756 if (!
string.IsNullOrEmpty(attributeIndexes))
3758 var indexList = attributeIndexes.Split(
';', StringSplitOptions.RemoveEmptyEntries);
3761 foreach (var indexEntry
in indexList)
3767 if (indexEntry.Contains(
'@'))
3770 var parts = indexEntry.Split(
'@');
3771 indexType = parts[0];
3772 columnName = parts.Length > 1 ? parts[1] : indexEntry;
3773 indexName = $
"{indexType}_{columnName.Replace(",
", "_
")}";
3778 indexType =
"COLUMN";
3779 columnName = indexEntry.Trim();
3780 indexName = $
"IX_{tableName}_{columnName}";
3784 if (restrictionValues !=
null && restrictionValues.Length > 3 &&
3785 !
string.IsNullOrEmpty(restrictionValues[3]) &&
3786 !indexName.Equals(restrictionValues[3], StringComparison.OrdinalIgnoreCase))
3790 var columns = columnName.Split(
',', StringSplitOptions.RemoveEmptyEntries);
3792 foreach (var col
in columns)
3811 catch (Exception ex)
3819 private DataTable GetProcedures(
string?[]? restrictionValues)
3821 var table =
new DataTable(
"Procedures");
3822 table.Columns.Add(
"PROCEDURE_CATALOG", typeof(
string));
3823 table.Columns.Add(
"PROCEDURE_SCHEMA", typeof(
string));
3824 table.Columns.Add(
"PROCEDURE_NAME", typeof(
string));
3825 table.Columns.Add(
"PROCEDURE_TYPE", typeof(
string));
3826 table.Columns.Add(
"EXECUTION_MODE", typeof(
string));
3827 table.Columns.Add(
"COMMAND", typeof(
string));
3831 var client = _connection.GetKineticaClient();
3834 string procNameFilter =
"";
3835 if (restrictionValues !=
null && restrictionValues.Length > 2 &&
3836 !
string.IsNullOrEmpty(restrictionValues[2]))
3838 procNameFilter = restrictionValues[2] ??
"";
3841 var response = client.showProc(procNameFilter);
3843 for (
int i = 0; i < response.proc_names.Count; i++)
3845 var procName = response.proc_names[i];
3846 var executionMode = i < response.execution_modes.Count ? response.execution_modes[i] :
"unknown";
3847 var command = i < response.commands.Count ? response.commands[i] :
"";
3859 catch (Exception ex)
3862 System.Diagnostics.Debug.WriteLine($
"Failed to retrieve procedures: {ex.Message}");
3868 private DataTable GetUsers(
string?[]? restrictionValues)
3870 var table =
new DataTable(
"Users");
3871 table.Columns.Add(
"USER_NAME", typeof(
string));
3872 table.Columns.Add(
"USER_TYPE", typeof(
string));
3873 table.Columns.Add(
"RESOURCE_GROUP", typeof(
string));
3877 var client = _connection.GetKineticaClient();
3880 var names =
new List<string>();
3881 if (restrictionValues !=
null && restrictionValues.Length > 0 &&
3882 !
string.IsNullOrEmpty(restrictionValues[0]))
3884 names.Add(restrictionValues[0]!);
3887 var response = client.showSecurity(names);
3889 foreach (var kvp
in response.types)
3891 var userName = kvp.Key;
3892 var userType = kvp.Value;
3895 if (userType ==
"role")
3899 string resourceGroup =
"";
3900 if (response.resource_groups.TryGetValue(userName, out var rg))
3910 catch (Exception ex)
3913 System.Diagnostics.Debug.WriteLine($
"Failed to retrieve users: {ex.Message}");
3919 private DataTable GetRoles(
string?[]? restrictionValues)
3921 var table =
new DataTable(
"Roles");
3922 table.Columns.Add(
"ROLE_NAME", typeof(
string));
3923 table.Columns.Add(
"MEMBER_ROLES", typeof(
string));
3927 var client = _connection.GetKineticaClient();
3930 var names =
new List<string>();
3931 if (restrictionValues !=
null && restrictionValues.Length > 0 &&
3932 !
string.IsNullOrEmpty(restrictionValues[0]))
3934 names.Add(restrictionValues[0]!);
3937 var response = client.showSecurity(names);
3939 foreach (var kvp
in response.types)
3942 var type = kvp.Value;
3949 string memberRoles =
"";
3950 if (response.roles.TryGetValue(name, out var roles) && roles !=
null)
3951 memberRoles =
string.Join(
", ", roles);
3959 catch (Exception ex)
3962 System.Diagnostics.Debug.WriteLine($
"Failed to retrieve roles: {ex.Message}");
3968 private DataTable GetDataTypes()
3970 var table =
new DataTable(
"DataTypes");
3971 table.Columns.Add(
"TYPE_NAME", typeof(
string));
3972 table.Columns.Add(
"PROVIDER_TYPE", typeof(
int));
3973 table.Columns.Add(
"COLUMN_SIZE", typeof(
int));
3974 table.Columns.Add(
"LITERAL_PREFIX", typeof(
string));
3975 table.Columns.Add(
"LITERAL_SUFFIX", typeof(
string));
3976 table.Columns.Add(
"IS_NULLABLE", typeof(
bool));
3977 table.Columns.Add(
"IS_CASE_SENSITIVE", typeof(
bool));
3978 table.Columns.Add(
"IS_SEARCHABLE", typeof(
bool));
3979 table.Columns.Add(
"IS_UNSIGNED", typeof(
bool));
3980 table.Columns.Add(
"IS_FIXED_PRECISION_SCALE", typeof(
bool));
3981 table.Columns.Add(
"IS_AUTO_INCREMENT", typeof(
bool));
3982 table.Columns.Add(
"MINIMUM_SCALE", typeof(
short));
3983 table.Columns.Add(
"MAXIMUM_SCALE", typeof(
short));
3989 table.Rows.Add(
"int", (
int)DbType.Int32, 10,
null,
null,
true,
false,
true,
false,
true,
false, (
short)0, (
short)0);
3990 table.Rows.Add(
"int8", (
int)DbType.SByte, 3,
null,
null,
true,
false,
true,
false,
true,
false, (
short)0, (
short)0);
3991 table.Rows.Add(
"int16", (
int)DbType.Int16, 5,
null,
null,
true,
false,
true,
false,
true,
false, (
short)0, (
short)0);
3992 table.Rows.Add(
"long", (
int)DbType.Int64, 19,
null,
null,
true,
false,
true,
false,
true,
false, (
short)0, (
short)0);
3993 table.Rows.Add(
"ulong", (
int)DbType.UInt64, 20,
null,
null,
true,
false,
true,
true,
true,
false, (
short)0, (
short)0);
3996 table.Rows.Add(
"float", (
int)DbType.Single, 7,
null,
null,
true,
false,
true,
false,
false,
false, (
short)0, (
short)7);
3997 table.Rows.Add(
"double", (
int)DbType.Double, 15,
null,
null,
true,
false,
true,
false,
false,
false, (
short)0, (
short)15);
3998 table.Rows.Add(
"decimal", (
int)DbType.Decimal, 38,
null,
null,
true,
false,
true,
false,
true,
false, (
short)0, (
short)38);
4001 table.Rows.Add(
"boolean", (
int)DbType.Boolean, 1,
null,
null,
true,
false,
true,
false,
true,
false, (
short)0, (
short)0);
4004 table.Rows.Add(
"string", (
int)DbType.String, 8000,
"'",
"'",
true,
true,
true,
false,
false,
false, (
short)0, (
short)0);
4006 table.Rows.Add(
"char1", (
int)DbType.StringFixedLength, 1,
"'",
"'",
true,
true,
true,
false,
false,
false, (
short)0, (
short)0);
4007 table.Rows.Add(
"char2", (
int)DbType.StringFixedLength, 2,
"'",
"'",
true,
true,
true,
false,
false,
false, (
short)0, (
short)0);
4008 table.Rows.Add(
"char4", (
int)DbType.StringFixedLength, 4,
"'",
"'",
true,
true,
true,
false,
false,
false, (
short)0, (
short)0);
4009 table.Rows.Add(
"char8", (
int)DbType.StringFixedLength, 8,
"'",
"'",
true,
true,
true,
false,
false,
false, (
short)0, (
short)0);
4010 table.Rows.Add(
"char16", (
int)DbType.StringFixedLength, 16,
"'",
"'",
true,
true,
true,
false,
false,
false, (
short)0, (
short)0);
4011 table.Rows.Add(
"char32", (
int)DbType.StringFixedLength, 32,
"'",
"'",
true,
true,
true,
false,
false,
false, (
short)0, (
short)0);
4012 table.Rows.Add(
"char64", (
int)DbType.StringFixedLength, 64,
"'",
"'",
true,
true,
true,
false,
false,
false, (
short)0, (
short)0);
4013 table.Rows.Add(
"char128", (
int)DbType.StringFixedLength, 128,
"'",
"'",
true,
true,
true,
false,
false,
false, (
short)0, (
short)0);
4014 table.Rows.Add(
"char256", (
int)DbType.StringFixedLength, 256,
"'",
"'",
true,
true,
true,
false,
false,
false, (
short)0, (
short)0);
4017 table.Rows.Add(
"bytes", (
int)DbType.Binary, 8000,
"0x",
null,
true,
false,
false,
false,
false,
false, (
short)0, (
short)0);
4020 table.Rows.Add(
"date", (
int)DbType.Date, 10,
"'",
"'",
true,
false,
true,
false,
false,
false, (
short)0, (
short)0);
4021 table.Rows.Add(
"time", (
int)DbType.Time, 12,
"'",
"'",
true,
false,
true,
false,
false,
false, (
short)0, (
short)0);
4022 table.Rows.Add(
"datetime", (
int)DbType.DateTime, 23,
"'",
"'",
true,
false,
true,
false,
false,
false, (
short)0, (
short)3);
4023 table.Rows.Add(
"timestamp", (
int)DbType.Int64, 19,
null,
null,
true,
false,
true,
false,
false,
false, (
short)0, (
short)0);
4026 table.Rows.Add(
"uuid", (
int)DbType.Guid, 36,
"'",
"'",
true,
false,
true,
false,
false,
false, (
short)0, (
short)0);
4027 table.Rows.Add(
"ipv4", (
int)DbType.String, 15,
"'",
"'",
true,
false,
true,
false,
false,
false, (
short)0, (
short)0);
4028 table.Rows.Add(
"json", (
int)DbType.String, -1,
"'",
"'",
true,
true,
true,
false,
false,
false, (
short)0, (
short)0);
4031 table.Rows.Add(
"wkt", (
int)DbType.String, -1,
"'",
"'",
true,
false,
true,
false,
false,
false, (
short)0, (
short)0);
4034 table.Rows.Add(
"vector", (
int)DbType.Object, -1,
null,
null,
true,
false,
false,
false,
false,
false, (
short)0, (
short)0);
4037 table.Rows.Add(
"array", (
int)DbType.Object, -1,
null,
null,
true,
false,
false,
false,
false,
false, (
short)0, (
short)0);
4042 private static string GetKineticaTypeFromAvro(
Avro.
Schema schema)
4047 foreach (var s
in unionSchema.Schemas)
4050 return GetKineticaTypeFromAvro(s);
4054 return schema.Tag
switch 4068 private static bool IsNullableField(
Avro.
Schema schema)
4072 return unionSchema.Schemas.Any(s => s.Tag ==
Avro.
Schema.
Type.Null);
4077 private string MapKineticaTypeToSqlType(
string kineticaType)
4079 var typeLower = kineticaType.ToLower();
4082 if (typeLower.StartsWith(
"char") && typeLower.Length > 4)
4086 if (typeLower.StartsWith(
"array"))
4090 if (typeLower.StartsWith(
"vector"))
4093 return typeLower
switch 4096 "int" or
"integer" or
"int32" =>
"int",
4097 "int8" or
"tinyint" =>
"tinyint",
4098 "int16" or
"smallint" =>
"smallint",
4099 "long" or
"bigint" or
"int64" =>
"bigint",
4100 "ulong" or
"uint64" =>
"bigint unsigned",
4103 "float" or
"real" =>
"real",
4104 "double" or
"float8" =>
"float",
4105 "decimal" or
"numeric" =>
"decimal",
4108 "bool" or
"boolean" =>
"boolean",
4111 "string" or
"varchar" or
"text" =>
"varchar",
4117 "datetime" =>
"datetime",
4118 "timestamp" =>
"timestamp",
4122 "ipv4" =>
"varchar",
4124 "wkt" =>
"geometry",
4127 "bytes" or
"binary" =>
"varbinary",
4133 private int? GetMaxLength(
string dataType)
4135 return dataType.ToLower()
switch 4137 "string" or
"varchar" => 8000,
4143 private (
int? precision,
int? scale) GetPrecisionAndScale(
string dataType)
4145 return dataType.ToLower()
switch 4147 "int" or
"integer" => (10, 0),
4148 "long" or
"bigint" => (19, 0),
4149 "float" => (24,
null),
4150 "double" => (53,
null),
4151 "decimal" => (18, 2),
4171 ErrorCode = MapExceptionToErrorCode(innerException);
4172 SqlState = MapExceptionToSqlState(innerException);
4181 private int MapExceptionToErrorCode(Exception exception)
4183 return exception
switch 4185 TimeoutException => -2,
4186 UnauthorizedAccessException => -3,
4187 ArgumentException => -4,
4188 InvalidOperationException => -5,
4193 private string MapExceptionToSqlState(Exception exception)
4195 return exception
switch 4197 TimeoutException =>
"HYT00",
4198 UnauthorizedAccessException =>
"28000",
4199 ArgumentException =>
"22000",
4200 InvalidOperationException =>
"24000",
4222 public override ParameterDirection
Direction {
get;
set; } = ParameterDirection.Input;
4224 [System.Diagnostics.CodeAnalysis.AllowNull]
4226 public override int Size {
get;
set; }
4227 [System.Diagnostics.CodeAnalysis.AllowNull]
4230 public override object?
Value {
get;
set; }
4240 private readonly List<DbParameter> _parameters =
new List<DbParameter>();
4242 public override int Count => _parameters.Count;
4245 public override int Add(
object value)
4247 _parameters.Add((DbParameter)value);
4248 return _parameters.Count - 1;
4253 foreach (DbParameter param
in values)
4255 _parameters.Add(param);
4259 public override void Clear() => _parameters.Clear();
4260 public override bool Contains(
object value) => _parameters.Contains((DbParameter)value);
4261 public override bool Contains(
string value) => _parameters.Any(p => p.ParameterName == value);
4262 public override void CopyTo(Array array,
int index) => _parameters.CopyTo((DbParameter[])array, index);
4263 public override System.Collections.IEnumerator
GetEnumerator() => _parameters.GetEnumerator();
4264 public override int IndexOf(
object value) => _parameters.IndexOf((DbParameter)value);
4265 public override int IndexOf(
string parameterName) => _parameters.FindIndex(p => p.ParameterName == parameterName);
4266 public override void Insert(
int index,
object value) => _parameters.Insert(index, (DbParameter)value);
4267 public override void Remove(
object value) => _parameters.Remove((DbParameter)value);
4268 public override void RemoveAt(
int index) => _parameters.RemoveAt(index);
4272 var index =
IndexOf(parameterName);
4276 protected override DbParameter
GetParameter(
int index) => _parameters[index];
4279 var index =
IndexOf(parameterName);
4280 return index >= 0 ? _parameters[index] :
throw new ArgumentException($
"Parameter '{parameterName}' not found");
4283 protected override void SetParameter(
int index, DbParameter value) => _parameters[index] = value;
4284 protected override void SetParameter(
string parameterName, DbParameter value)
4286 var index =
IndexOf(parameterName);
4288 _parameters[index] = value;
4290 _parameters.Add(value);
4329 private bool _completed =
false;
4333 _connection = connection;
4359 Task.Run(async () => await
CommitAsync(CancellationToken.None).ConfigureAwait(
false))
4360 .ConfigureAwait(
false)
4374 public override async Task
CommitAsync(CancellationToken cancellationToken =
default)
4377 throw new InvalidOperationException(
"Transaction already completed");
4382 if (_connection.BatchManager !=
null)
4384 await _connection.FlushBatchAsync(cancellationToken).ConfigureAwait(
false);
4389 catch (Exception ex)
4391 throw new KineticaException($
"Failed to flush batch inserts during commit: {ex.Message}", ex);
4410 throw new InvalidOperationException(
"Transaction already completed");
4419 if (disposing && !_completed)
4424 base.Dispose(disposing);
4438 new DbConnectionStringBuilder();
4442 throw new NotSupportedException(
"Data source enumeration not supported");
4448 private static bool _registered =
false;
4449 private static readonly
object _lock =
new object();
4453 if (_registered)
return;
4457 if (_registered)
return;
4463 var invariantName =
"KineticaAdo";
4467 DbProviderFactories.RegisterFactory(invariantName, factoryType);
4471 catch (Exception ex)
4473 throw new InvalidOperationException($
"Failed to register Kinetica provider: {ex.Message}", ex);
override async Task CommitAsync(CancellationToken cancellationToken=default)
Asynchronously commits the transaction.
const string FileReadInitialClear
int Limit
Gets or sets the result row limit.
override async Task< bool > NextResultAsync(CancellationToken cancellationToken)
override void Dispose(bool disposing)
int Timeout
Gets or sets the query timeout in minutes.
const string DistributedJoins
Manages bulk insert operations for the ADO.NET driver.
const string SsqOptimizations
ImpersonationType
Types of user impersonation commands.
override void RemoveAt(int index)
KineticaSqlException(string message)
const string DisableFailover
override? DbConnection DbConnection
bool UpdateOnExistingPk
Gets or sets whether to update on existing primary key.
const string FileReadHasHeader
const string FailoverOrder
bool UpdateOnExistingPk
If true, updates existing records with matching primary keys.
override object GetValue(int ordinal)
bool PlanCache
Gets or sets whether plan caching is enabled.
override string GetDataTypeName(int ordinal)
Options for INSERT FROM FILE operations.
UserImpersonationInfo? UserImpersonation
override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel)
override void AddRange(Array values)
const string FakeTransactions
bool ServerSideInsert
Whether to force server-side insert execution (KI_HINT_SERVER_SIDE_INSERT).
override long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length)
bool UseApproxCountDistinct
Gets or sets whether to use approximate count distinct.
KineticaSchemaProvider(KineticaConnection connection)
override double GetDouble(int ordinal)
string PrimaryUrl
Gets or sets the primary URL for HA configurations.
int FetchSize
Gets or sets the default fetch size for paged result sets.
override bool GetBoolean(int ordinal)
const string SslCertPassword
List< string > InsertColumns
Column names specified in the INSERT INTO clause (optional).
int PendingBatchCount
Gets the number of records currently pending in the batch buffer.
override char GetChar(int ordinal)
KineticaPagingDataReader(Kinetica client, string sql, int fetchSize, CommandBehavior behavior, CancellationToken cancellationToken=default)
override void Dispose(bool disposing)
string FileReadEscapeChar
Gets or sets the file escape character.
const string RowsPerFetch
FileFormat
Supported file formats for INSERT FROM FILE operations.
override void SetParameter(int index, DbParameter value)
bool DryRun
Whether this is a dry run (validate without inserting). Default: false.
override DataTable GetSchema()
override float GetFloat(int ordinal)
const string ResultsCaching
override Guid GetGuid(int ordinal)
string? KifsPath
KiFS path (for kifs:// syntax).
Skip problematic rows and continue.
FileFormat Format
File format. Default: Auto (detected from extension).
bool CostBasedOptimization
Gets or sets whether cost-based optimization is enabled.
FileErrorMode ErrorMode
Error handling mode.
override? object ExecuteScalar()
User impersonation information for SET USER/EXECUTE AS commands.
override bool SourceColumnNullMapping
override int GetValues(object[] values)
bool FileReadHasHeader
Gets or sets whether the file has a header row.
const string BatchInsertMode
Base class for all schema types
override IEnumerator< IDataRecord > GetEnumerator()
string ActualPath
The actual path to use (either FilePath or KifsPath).
const string FileReadNullString
override void Dispose(bool disposing)
KineticaConnectionStringBuilder(string connectionString)
override DateTime GetDateTime(int ordinal)
override int IndexOf(object value)
IDictionary< string, string > ToQueryOptions()
Converts hints to query options dictionary.
override DbCommand CreateCommand()
bool ResultsCaching
Gets or sets whether results caching is enabled.
string? FilePath
File path (local file path for FILE."path" syntax).
string Server
Gets or sets the server URL (e.g., "http://localhost:9191").
int Limit
Maximum number of rows to read. Default: unlimited (0).
const string FailbackPollInterval
override string?? CommandText
override DateTime GetDateTime(int ordinal)
override DbConnection DbConnection
Information about an INSERT INTO...SELECT FROM FILE statement.
const string SslAllowHostMismatch
const string InitialConnectionTimeout
const string BypassSslCertCheck
override string GetName(int ordinal)
override short GetInt16(int ordinal)
const string ConnectionTimeout
override string Name
Name of the schema
void RevertUser()
Reverts to previous user context from ExecuteAsUser.
long TotalRecordCount
Gets the total number of records in the result set.
override DbParameter GetParameter(string parameterName)
override CommandType CommandType
override string GetString(int ordinal)
string OAuthToken
Gets or sets the OAuth token for authentication.
string?? Database
Gets or sets the database name.
override string ServerVersion
void SetSchema(string schemaName)
Sets the current schema (equivalent to SET SCHEMA command).
string FileReadDelimiter
Gets or sets the file read delimiter.
override long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length)
override int GetOrdinal(string name)
override string DataSource
string Password
Gets or sets the password for authentication.
override DbDataSourceEnumerator CreateDataSourceEnumerator()
const string DisableAutoDiscovery
const string FileReadComment
override void Insert(int index, object value)
int FailbackPollInterval
Gets or sets the failback poll interval in seconds.
async Task< long > FlushAllAsync(CancellationToken cancellationToken=default)
Flushes all pending records for all tables.
const string TruncateStrings
string?? ImpersonateUser
Gets or sets the user to impersonate.
FileErrorMode
Error handling modes for file insert operations.
int ServerConnectionTimeout
Gets or sets the server connection timeout in seconds.
bool DisableMultihead
Whether to disable multi-head insert (KI_HINT_DISABLE_MULTIHEAD).
const string BatchUpdateOnExistingPk
override bool NextResult()
KineticaException(string message, Exception innerException)
SQL parser with support for JDBC-compatible features including:
override bool Contains(object value)
Type
Enum for schema types
async Task< bool > InsertAsync(string tableName, IList< string > columnNames, IList< object?> values, CancellationToken cancellationToken=default)
Inserts a record into the specified table using batch processing.
string? CommentPrefix
Comment line prefix (lines starting with this are skipped).
override object GetValue(int ordinal)
override async Task< bool > ReadAsync(CancellationToken cancellationToken)
override DbParameter CreateDbParameter()
override int GetInt32(int ordinal)
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
const string UpdateOnExistingPk
KineticaConnectionException(string message, Exception innerException)
bool ParallelExecution
Gets or sets whether parallel execution is enabled.
bool NoSync
Gets or sets whether to disable sync mode.
override short GetInt16(int ordinal)
void ExecuteAsUser(string username)
Executes as a specific user, pushing current context to stack.
IList< KineticaRecord > data
Avro binary encoded response.
override byte GetByte(int ordinal)
bool ReadOnly
Gets or sets whether the connection is read-only.
int GetTotalPendingCount()
Gets the total number of pending records across all tables.
bool InitialClear
Whether to clear the table before inserting. Default: false.
const string TokenNameClaim
KineticaSqlException(string message, Exception innerException)
override float GetFloat(int ordinal)
KineticaConnection(string connectionString)
bool IgnoreExistingPk
Gets or sets whether to ignore existing primary keys on insert.
const string PagingTableTtl
override ConnectionState State
override bool GetBoolean(int ordinal)
override DataTable GetSchema(string collectionName)
bool UseKeyLookup
Gets or sets whether to use key lookup optimization.
bool DisableAutoDiscovery
Gets or sets whether to disable auto-discovery of cluster nodes.
const string UseKeyLookup
int ConnectionTimeout
Gets or sets the connection timeout in seconds.
Provides a comprehensive connection string builder with JDBC-compatible properties.
override int GetValues(object[] values)
override void Rollback()
Rolls back the transaction.
KineticaCommand(string commandText, KineticaConnection connection)
ParsedCommandType CommandType
override IsolationLevel IsolationLevel
Gets the isolation level for this transaction.
int FetchSize
Gets or sets the fetch size (number of records to retrieve per batch).
string TableName
Target table name (may include schema).
string?? Schema
Gets or sets the default schema.
ParsedCommand Parse(string sql, DbParameterCollection parameters)
const string ImpersonateUser
KineticaConnectionException(string message)
string?? SslCaCertPath
Gets or sets the path to the SSL CA certificate.
string Username
Gets or sets the username for authentication.
override DbConnection CreateConnection()
bool BatchUpdateOnExistingPk
Gets or sets whether to update existing records with matching primary keys during batch inserts.
override int Add(object value)
DataTable GetSchema(string collectionName, string?[]? restrictionValues)
override async Task OpenAsync(CancellationToken cancellationToken)
override long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length)
string FileReadNullString
Gets or sets the null string representation in files.
int InitialConnectionTimeout
Gets or sets the initial connection timeout in seconds.
bool HasHeader
Whether the file has a header row.
bool IgnoreExistingPk
Whether to ignore records with duplicate primary keys. Default: false.
bool TruncateStrings
Whether to truncate strings that exceed column length (KI_HINT_TRUNCATE_STRINGS).
IDictionary< string, string > ToInsertOptions()
Converts hints to insert options dictionary.
override Guid GetGuid(int ordinal)
KineticaCommand(KineticaConnection connection)
List< string > SelectColumns
Column names specified in the SELECT clause (optional, * means all).
long FlushBatch()
Flushes all pending batch inserts to the database.
static readonly KineticaProviderFactory Instance
override int CommandTimeout
KineticaTransaction(KineticaConnection connection, IsolationLevel isolationLevel)
override decimal GetDecimal(int ordinal)
override Type GetFieldType(int ordinal)
override long GetInt64(int ordinal)
override bool CanCreateDataSourceEnumerator
override void ResetDbType()
const string FileReadSkip
static void RegisterProvider()
string???? CurrentSchema
Gets or sets the current schema for this session.
KineticaConnectionStringBuilder()
FileInsertOptions Options
File insert options parsed from WITH OPTIONS clause.
bool DistributedJoins
Gets or sets whether distributed joins are enabled.
override bool DesignTimeVisible
bool SsqOptimizations
Gets or sets whether SSQ optimizations are enabled.
override int RecordsAffected
EXECUTE AS USER "username" - Temporary context switch (can be reverted).
const string RuleBasedOptimizations
override bool NextResult()
override async Task< bool > ReadAsync(CancellationToken cancellationToken)
void ReturnConnection(string connectionString, Kinetica connection)
string?? Replication
Gets or sets the replication mode.
const string RowsPerInsertion
override DbParameter CreateParameter()
const string DisableSnappy
override void ChangeDatabase(string databaseName)
override int GetInt32(int ordinal)
override void RemoveAt(string parameterName)
override int GetOrdinal(string name)
const string ServerConnectionTimeout
string? PkConflictPredicateLowerColumn
Column name for lower predicate PK conflict resolution (KI_HINT_PK_CONFLICT_PREDICATE_LOWER).
int Ttl
Gets or sets the time-to-live in minutes for query results.
bool SelectAll
Whether SELECT * was used.
Query optimization hints extracted from SQL.
const string ValidateChange
override void Commit()
Commits the transaction.
override int RecordsAffected
string? PkConflictPredicateHigherColumn
Column name for higher predicate PK conflict resolution (KI_HINT_PK_CONFLICT_PREDICATE_HIGHER).
override void Remove(object value)
int Skip
Number of lines to skip from beginning. Default: 0.
int RowsPerInsertion
Gets or sets the number of rows per insertion batch.
bool SslAllowHostMismatch
Gets or sets whether to allow SSL host mismatch.
bool BypassSslCertCheck
Gets or sets whether to bypass SSL certificate validation.
override string GetName(int ordinal)
ParsedCommandType
Types of parsed SQL commands.
override double GetDouble(int ordinal)
bool UseKeyLookup
Whether to use key lookup optimization (KI_HINT_KEY_LOOKUP).
override byte GetByte(int ordinal)
override void SetParameter(string parameterName, DbParameter value)
char Delimiter
Field delimiter character. Default: comma for CSV, auto-detected from extension.
char QuoteChar
Quote character for string fields. Default: double-quote.
override string???? ConnectionString
bool ReplicationSync
Whether to use synchronous replication (KI_HINT_REPL_SYNC).
bool TruncateStrings
Whether to truncate strings that exceed column length. Default: false.
override DbParameter GetParameter(int index)
bool DisableSnappy
Gets or sets whether to disable Snappy compression.
int RowsPerFetch
Gets or sets the number of rows per fetch.
string? ExtractSchemaName(string sql)
Extracts schema name from SET SCHEMA command.
Represents a parsed SQL command with extracted metadata.
Provides ADO.NET transaction API compatibility for Kinetica.
override async Task< int > ExecuteNonQueryAsync(CancellationToken cancellationToken)
override DbParameterCollection DbParameterCollection
void SetUser(string username)
Sets the impersonated user context (equivalent to SET USER command).
Options for batch insert operations.
Delimited text (CSV, TSV, PSV).
override void CopyTo(Array array, int index)
override string SourceColumn
string? ImpersonatedUser
Gets the current impersonated user, if any.
override bool IsDBNull(int ordinal)
string?? ErrorMode
Gets or sets the error mode for insertions.
bool IsKifsPath
Whether this is a KiFS file operation.
Immutable collection of metadata about a Kinetica type.
const string FileReadDelimiter
KineticaException(string message)
bool IgnoreExistingPk
Whether to ignore existing primary key (KI_HINT_IGNORE_EXISTING_PK).
string?? SslCertPassword
Gets or sets the SSL certificate password.
override System.Collections.IEnumerator GetEnumerator()
A paging data reader that fetches records in batches for large result sets.
DbParameterCollection? Parameters
string NullString
String that represents NULL values. Default: \N.
int BatchSize
The number of records to batch before flushing.
InsertFromFileInfo? InsertFromFile
const string TimeZoneOverride
override long GetInt64(int ordinal)
Auto-detect format from file extension.
override async Task< object?> ExecuteScalarAsync(CancellationToken cancellationToken)
override IEnumerator< IDataRecord > GetEnumerator()
override DbDataReader ExecuteDbDataReader(CommandBehavior behavior)
override DataTable GetSchemaTable()
async Task< long > FlushBatchAsync(CancellationToken cancellationToken=default)
Flushes all pending batch inserts to the database asynchronously.
bool BatchInsertMode
Gets or sets whether batch insert mode is enabled.
char EscapeChar
Escape character for special chars. Default: none.
const string CostBasedOptimization
override string GetDataTypeName(int ordinal)
override DbCommand CreateDbCommand()
long TotalRowsRead
Gets the total number of records read so far.
const string IgnoreExistingPk
override string ParameterName
int BatchSize
Batch size for bulk insert operations. Default: 10000.
override async Task< DbDataReader > ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken)
const string FileReadLimit
override long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length)
const string DisableMultiheadInsert
bool DisableMultiheadInsert
Gets or sets whether to disable multi-head insert.
static Schema Parse(string json)
Parses a given JSON string to create a new schema object
bool UpdateOnExistingPk
Whether to update records with duplicate primary keys. Default: false.
int? BatchSize
Batch size hint for bulk operations (KI_HINT_BATCH_SIZE).
override ParameterDirection Direction
KineticaDataReader(ExecuteSqlResponse response, CommandBehavior behavior)
bool RuleBasedOptimizations
Gets or sets whether rule-based optimizations are enabled.
int BatchSize
Gets or sets the batch size for batch insert operations.
const string SslCaCertPath
override decimal GetDecimal(int ordinal)
override char GetChar(int ordinal)
DateTime in YYYY-MM-DD HH:MM:SS.mmm format
override DataTable GetSchema(string collectionName, string?[]? restrictionValues)
async Task< Kinetica > GetConnectionAsync(string connectionString, CancellationToken cancellationToken)
const string UseApproxCountDistinct
Attempt to insert partial records.
string?? FailoverOrder
Gets or sets the HA failover order.
override UpdateRowSource UpdatedRowSource
const string FileReadQuoteChar
const string FileReadEscapeChar
const string ParallelExecution
KineticaException(string message, int errorCode, string sqlState)
override DbConnectionStringBuilder CreateConnectionStringBuilder()
override string GetString(int ordinal)
int PagingTableTtl
Gets or sets the TTL for paging tables in minutes.
override bool IsDBNull(int ordinal)
override int ExecuteNonQuery()
override? DbTransaction DbTransaction
bool TruncateStrings
Gets or sets whether to truncate strings that exceed column length.
bool DisableFailover
Gets or sets whether to disable automatic failover.
bool UpdateOnExistingPk
Whether to update on existing primary key (KI_HINT_UPDATE_ON_EXISTING_PK).
override Type GetFieldType(int ordinal)
A set of results returned by Kinetica.executeSql.