Kinetica   C#   API  Version 7.2.3.1
KineticaAdo.cs
Go to the documentation of this file.
1 using System.Collections.Concurrent;
2 using System.Data;
3 using System.Data.Common;
4 using System.Text.RegularExpressions;
5 using kinetica;
6 
7 namespace KineticaAdo
8 {
9  // 1. Enhanced Connection with Connection Pooling and Async Support
10  public class KineticaConnection : DbConnection
11  {
12  private static readonly KineticaConnectionPool _connectionPool = new KineticaConnectionPool();
13  private Kinetica? _kineticaClient;
14  private string? _connectionString;
15  private ConnectionState _state = ConnectionState.Closed;
16  private string _database = "";
17  private KineticaConnectionStringBuilder? _connectionStringBuilder;
18  private bool _pooled = true;
19 
20  // Batch insert support
21  private InsertBatchManager? _batchManager;
22  private bool _batchInsertMode = false;
23  private InsertBatchOptions _batchOptions = new InsertBatchOptions();
24 
25  // Session state tracking
26  private string? _currentSchema;
27  private readonly Stack<string> _userStack = new Stack<string>();
28  private string? _currentImpersonatedUser;
29 
30  public KineticaConnection() { }
31 
32  public KineticaConnection(string connectionString)
33  {
34  ConnectionString = connectionString;
35  }
36 
37  [System.Diagnostics.CodeAnalysis.AllowNull]
38  public override string ConnectionString
39  {
40  get => _connectionString ?? string.Empty;
41  set
42  {
43  _connectionString = value ?? string.Empty;
44  _connectionStringBuilder = new KineticaConnectionStringBuilder(_connectionString);
45  _pooled = _connectionStringBuilder.Pooling;
46 
47  // Apply batch settings from connection string
48  _batchInsertMode = _connectionStringBuilder.BatchInsertMode;
49  _batchOptions.BatchSize = _connectionStringBuilder.BatchSize;
50  _batchOptions.UpdateOnExistingPk = _connectionStringBuilder.BatchUpdateOnExistingPk;
51  }
52  }
53 
54  public override string Database => _database;
55  public override string DataSource => _connectionStringBuilder?.Server ?? string.Empty;
56  public override string ServerVersion => "7.2"; // Default version
57  public override ConnectionState State => _state;
58 
63  public string? CurrentSchema
64  {
65  get => _currentSchema ?? _connectionStringBuilder?.Schema;
66  set => _currentSchema = value;
67  }
68 
72  public string? ImpersonatedUser => _currentImpersonatedUser;
73 
74  public override void ChangeDatabase(string databaseName)
75  {
76  _database = databaseName;
77  }
78 
82  public void SetSchema(string schemaName)
83  {
84  _currentSchema = schemaName;
85  }
86 
91  public void SetUser(string username)
92  {
93  _currentImpersonatedUser = username;
94  }
95 
100  public void ExecuteAsUser(string username)
101  {
102  if (_currentImpersonatedUser != null)
103  {
104  _userStack.Push(_currentImpersonatedUser);
105  }
106  _currentImpersonatedUser = username;
107  }
108 
112  public void RevertUser()
113  {
114  if (_userStack.Count > 0)
115  {
116  _currentImpersonatedUser = _userStack.Pop();
117  }
118  else
119  {
120  _currentImpersonatedUser = null;
121  }
122  }
123 
124  public override void Close()
125  {
126  if (_state == ConnectionState.Open)
127  {
128  // Flush and dispose batch manager before closing
129  if (_batchManager != null)
130  {
131  try
132  {
133  _batchManager.Dispose();
134  }
135  catch
136  {
137  // Ignore errors during close
138  }
139  _batchManager = null;
140  }
141 
142  if (_pooled && _connectionString != null && _kineticaClient != null)
143  {
144  _connectionPool.ReturnConnection(_connectionString, _kineticaClient);
145  }
146  // Note: Kinetica client doesn't implement IDisposable, so we just release the reference
147  // The client will be garbage collected when no longer referenced
148  _kineticaClient = null;
149  _state = ConnectionState.Closed;
150  }
151  }
152 
153  public override void Open()
154  {
155  // Use Task.Run to avoid capturing synchronization context which can cause deadlocks
156  Task.Run(async () => await OpenAsync(CancellationToken.None).ConfigureAwait(false))
157  .ConfigureAwait(false)
158  .GetAwaiter()
159  .GetResult();
160  }
161 
162  public override async Task OpenAsync(CancellationToken cancellationToken)
163  {
164  if (_state == ConnectionState.Open)
165  return;
166 
167  try
168  {
169  _state = ConnectionState.Connecting;
170 
171  if (_pooled)
172  {
173  _kineticaClient = await _connectionPool.GetConnectionAsync(_connectionString ?? string.Empty, cancellationToken).ConfigureAwait(false);
174  }
175  else
176  {
177  _kineticaClient = await CreateKineticaClientAsync(_connectionStringBuilder ?? new KineticaConnectionStringBuilder(), cancellationToken).ConfigureAwait(false);
178  }
179  _database = _connectionStringBuilder?.Database ?? "";
180  _state = ConnectionState.Open;
181 
182  // Initialize batch manager if batch mode is enabled
183  if (_batchInsertMode && _kineticaClient != null)
184  {
185  _batchManager = new InsertBatchManager(_kineticaClient, _batchOptions);
186  }
187  }
188  catch (Exception ex)
189  {
190  _state = ConnectionState.Closed;
191  throw new KineticaException($"Failed to connect to Kinetica: {ex.Message}", ex);
192  }
193  }
194 
195  private async Task<Kinetica> CreateKineticaClientAsync(KineticaConnectionStringBuilder builder, CancellationToken cancellationToken)
196  {
197  Kinetica.Options? options = null;
198 
199  // Configure authentication if credentials are provided
200  if (!string.IsNullOrEmpty(builder.Username) || !string.IsNullOrEmpty(builder.Password))
201  {
202  options = new Kinetica.Options
203  {
204  Username = builder.Username ?? string.Empty,
205  Password = builder.Password ?? string.Empty
206  };
207  }
208 
209  // Support OAuth token if provided
210  if (!string.IsNullOrEmpty(builder.OAuthToken))
211  {
212  options ??= new Kinetica.Options();
213  options.OauthToken = builder.OAuthToken;
214  }
215 
216  var client = new Kinetica(builder.Server, options);
217 
218  // Test connection
219  await TestConnectionAsync(client, cancellationToken).ConfigureAwait(false);
220  return client;
221  }
222 
223  private async Task TestConnectionAsync(Kinetica client, CancellationToken cancellationToken)
224  {
225  try
226  {
227  // Use a simple operation to test connectivity
228  await client.ShowSystemStatusAsync(new Dictionary<string, string>(), cancellationToken).ConfigureAwait(false);
229  }
230  catch (Exception ex)
231  {
232  throw new KineticaException("Connection test failed", ex);
233  }
234  }
235 
236  protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel)
237  {
238  return new KineticaTransaction(this, isolationLevel);
239  }
240 
241  protected override DbCommand CreateDbCommand()
242  {
243  return new KineticaCommand(this);
244  }
245 
246  private async Task<string> GetServerVersionAsync()
247  {
248  try
249  {
250  if (_kineticaClient == null)
251  return "Unknown";
252 
253  var response = await _kineticaClient.ShowSystemStatusAsync(new Dictionary<string, string>()).ConfigureAwait(false);
254  return response?.status_map["status"] ?? "Unknown";
255  }
256  catch
257  {
258  return "Unknown";
259  }
260  }
261 
262  internal Kinetica GetKineticaClient() => _kineticaClient ?? throw new InvalidOperationException("Connection is not open");
263 
264  #region Batch Insert Support
265 
270  public bool BatchInsertMode
271  {
272  get => _batchInsertMode;
273  set
274  {
275  if (_batchInsertMode == value) return;
276 
277  _batchInsertMode = value;
278 
279  if (_batchInsertMode && _state == ConnectionState.Open && _kineticaClient != null)
280  {
281  // Create batch manager when enabling batch mode on open connection
282  _batchManager ??= new InsertBatchManager(_kineticaClient, _batchOptions);
283  }
284  else if (!_batchInsertMode && _batchManager != null)
285  {
286  // Flush and dispose when disabling batch mode
287  _batchManager.Dispose();
288  _batchManager = null;
289  }
290  }
291  }
292 
296  public int BatchSize
297  {
298  get => _batchOptions.BatchSize;
299  set => _batchOptions.BatchSize = value;
300  }
301 
305  public bool BatchUpdateOnExistingPk
306  {
307  get => _batchOptions.UpdateOnExistingPk;
308  set => _batchOptions.UpdateOnExistingPk = value;
309  }
310 
314  internal InsertBatchManager? BatchManager => _batchManager;
315 
319  public int PendingBatchCount => _batchManager?.GetTotalPendingCount() ?? 0;
320 
325  public long FlushBatch()
326  {
327  // Use Task.Run to avoid capturing synchronization context which can cause deadlocks
328  return Task.Run(async () => await FlushBatchAsync(CancellationToken.None).ConfigureAwait(false))
329  .ConfigureAwait(false)
330  .GetAwaiter()
331  .GetResult();
332  }
333 
339  public async Task<long> FlushBatchAsync(CancellationToken cancellationToken = default)
340  {
341  if (_batchManager == null)
342  {
343  return 0;
344  }
345 
346  return await _batchManager.FlushAllAsync(cancellationToken).ConfigureAwait(false);
347  }
348 
349  #endregion
350 
351  // Schema Support
352  public override DataTable GetSchema()
353  {
354  return GetSchema("MetaDataCollections");
355  }
356 
357  public override DataTable GetSchema(string collectionName)
358  {
359  return GetSchema(collectionName, Array.Empty<string>());
360  }
361 
362  public override DataTable GetSchema(string collectionName, string?[]? restrictionValues)
363  {
364  var schemaProvider = new KineticaSchemaProvider(this);
365  return schemaProvider.GetSchema(collectionName, restrictionValues);
366  }
367 
368  protected override void Dispose(bool disposing)
369  {
370  if (disposing)
371  {
372  Close();
373  }
374  base.Dispose(disposing);
375  }
376  }
377 
378  // 2. Enhanced Command with SQL Parsing and Async Support
379  public class KineticaCommand : DbCommand
380  {
381  private static readonly SQLParser _sqlParser = new SQLParser();
382  private KineticaConnection? _connection;
383  private string _commandText = string.Empty;
384  private CommandType _commandType = CommandType.Text;
385  private int _commandTimeout = 30;
386  private int _fetchSize = 0; // 0 means fetch all at once
387  private DbParameterCollection _parameters;
388  private CancellationTokenSource? _cancellationTokenSource;
389  private DbTransaction? _dbTransaction;
390 
392  {
393  _parameters = new KineticaParameterCollection();
394  }
395 
396  public KineticaCommand(KineticaConnection connection) : this()
397  {
398  _connection = connection;
399  }
400 
401  public KineticaCommand(string commandText, KineticaConnection connection) : this(connection)
402  {
403  _commandText = commandText;
404  }
405 
406  [System.Diagnostics.CodeAnalysis.AllowNull]
407  public override string CommandText { get => _commandText; set => _commandText = value ?? string.Empty; }
408  public override int CommandTimeout { get => _commandTimeout; set => _commandTimeout = value; }
409  public override CommandType CommandType { get => _commandType; set => _commandType = value; }
410  public override bool DesignTimeVisible { get; set; }
411  public override UpdateRowSource UpdatedRowSource { get; set; }
412 
419  public int FetchSize
420  {
421  get => _fetchSize;
422  set => _fetchSize = Math.Max(0, value);
423  }
424 
425  protected override DbConnection? DbConnection
426  {
427  get => _connection;
428  set => _connection = value as KineticaConnection;
429  }
430 
431  protected override DbParameterCollection DbParameterCollection => _parameters;
432  protected override DbTransaction? DbTransaction { get => _dbTransaction; set => _dbTransaction = value; }
433 
434  public override void Cancel()
435  {
436  _cancellationTokenSource?.Cancel();
437  }
438 
439  public override int ExecuteNonQuery()
440  {
441  // Use Task.Run to avoid capturing synchronization context which can cause deadlocks
442  return Task.Run(async () => await ExecuteNonQueryAsync(CancellationToken.None).ConfigureAwait(false))
443  .ConfigureAwait(false)
444  .GetAwaiter()
445  .GetResult();
446  }
447 
448  public override async Task<int> ExecuteNonQueryAsync(CancellationToken cancellationToken)
449  {
450  var connection = ValidateCommand();
451 
452  using (_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
453  {
454  var timeoutToken = new CancellationTokenSource(TimeSpan.FromSeconds(_commandTimeout));
455  using var combinedToken = CancellationTokenSource.CreateLinkedTokenSource(
456  _cancellationTokenSource.Token, timeoutToken.Token);
457 
458  try
459  {
460  var client = connection.GetKineticaClient();
461 
462  // Support multi-statement execution (statements separated by semicolons)
463  var statements = SplitStatements(_commandText);
464  long totalAffected = 0;
465 
466  foreach (var statement in statements)
467  {
468  if (string.IsNullOrWhiteSpace(statement))
469  continue;
470 
471  var parsedCommand = _sqlParser.Parse(statement, _parameters);
472  totalAffected += await ExecuteParsedCommandAsync(client, parsedCommand, combinedToken.Token).ConfigureAwait(false);
473  }
474 
475  return (int)totalAffected;
476  }
477  catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
478  {
479  throw;
480  }
481  catch (OperationCanceledException)
482  {
483  throw new TimeoutException($"Command timeout ({_commandTimeout}s) exceeded");
484  }
485  catch (Exception ex)
486  {
487  throw new KineticaException($"Failed to execute command: {ex.Message}", ex);
488  }
489  }
490  }
491 
495  private static IList<string> SplitStatements(string sql)
496  {
497  var statements = new List<string>();
498  var currentStatement = new System.Text.StringBuilder();
499  bool inSingleQuote = false;
500  bool inDoubleQuote = false;
501  bool inLineComment = false;
502  bool inBlockComment = false;
503 
504  for (int i = 0; i < sql.Length; i++)
505  {
506  char c = sql[i];
507  char nextChar = i + 1 < sql.Length ? sql[i + 1] : '\0';
508 
509  // Handle line comment start
510  if (!inSingleQuote && !inDoubleQuote && !inBlockComment && c == '-' && nextChar == '-')
511  {
512  inLineComment = true;
513  currentStatement.Append(c);
514  continue;
515  }
516 
517  // Handle line comment end
518  if (inLineComment && (c == '\n' || c == '\r'))
519  {
520  inLineComment = false;
521  currentStatement.Append(c);
522  continue;
523  }
524 
525  // Handle block comment start
526  if (!inSingleQuote && !inDoubleQuote && !inLineComment && c == '/' && nextChar == '*')
527  {
528  inBlockComment = true;
529  currentStatement.Append(c);
530  continue;
531  }
532 
533  // Handle block comment end
534  if (inBlockComment && c == '*' && nextChar == '/')
535  {
536  inBlockComment = false;
537  currentStatement.Append(c);
538  currentStatement.Append(nextChar);
539  i++; // Skip the '/'
540  continue;
541  }
542 
543  // Skip processing in comments
544  if (inLineComment || inBlockComment)
545  {
546  currentStatement.Append(c);
547  continue;
548  }
549 
550  // Handle string literals
551  if (c == '\'' && !inDoubleQuote)
552  {
553  // Check for escaped quote
554  if (inSingleQuote && nextChar == '\'')
555  {
556  currentStatement.Append(c);
557  currentStatement.Append(nextChar);
558  i++;
559  continue;
560  }
561  inSingleQuote = !inSingleQuote;
562  }
563  else if (c == '"' && !inSingleQuote)
564  {
565  inDoubleQuote = !inDoubleQuote;
566  }
567 
568  // Check for statement separator
569  if (c == ';' && !inSingleQuote && !inDoubleQuote)
570  {
571  var stmt = currentStatement.ToString().Trim();
572  if (!string.IsNullOrEmpty(stmt))
573  {
574  statements.Add(stmt);
575  }
576  currentStatement.Clear();
577  continue;
578  }
579 
580  currentStatement.Append(c);
581  }
582 
583  // Add last statement if any
584  var lastStmt = currentStatement.ToString().Trim();
585  if (!string.IsNullOrEmpty(lastStmt))
586  {
587  statements.Add(lastStmt);
588  }
589 
590  return statements;
591  }
592 
593  public override object? ExecuteScalar()
594  {
595  // Use Task.Run to avoid capturing synchronization context which can cause deadlocks
596  return Task.Run(async () => await ExecuteScalarAsync(CancellationToken.None).ConfigureAwait(false))
597  .ConfigureAwait(false)
598  .GetAwaiter()
599  .GetResult();
600  }
601 
602  public override async Task<object?> ExecuteScalarAsync(CancellationToken cancellationToken)
603  {
604  using (var reader = await ExecuteReaderAsync(cancellationToken).ConfigureAwait(false))
605  {
606  if (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
607  {
608  return reader.GetValue(0);
609  }
610  return null;
611  }
612  }
613 
614  protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior)
615  {
616  // Use Task.Run to avoid capturing synchronization context which can cause deadlocks
617  return Task.Run(async () => await ExecuteReaderAsync(behavior, CancellationToken.None).ConfigureAwait(false))
618  .ConfigureAwait(false)
619  .GetAwaiter()
620  .GetResult();
621  }
622 
623  protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken)
624  {
625  var connection = ValidateCommand();
626 
627  using (_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
628  {
629  var timeoutToken = new CancellationTokenSource(TimeSpan.FromSeconds(_commandTimeout));
630  using var combinedToken = CancellationTokenSource.CreateLinkedTokenSource(
631  _cancellationTokenSource.Token, timeoutToken.Token);
632 
633  try
634  {
635  var client = connection.GetKineticaClient();
636  var parsedCommand = _sqlParser.Parse(_commandText, _parameters);
637 
638  // Use paging reader if FetchSize is set and this is a SELECT query
639  if (_fetchSize > 0 && parsedCommand.CommandType == ParsedCommandType.Select)
640  {
641  return new KineticaPagingDataReader(
642  client,
643  parsedCommand.FinalSql,
644  _fetchSize,
645  behavior,
646  combinedToken.Token);
647  }
648 
649  ExecuteSqlResponse response;
650  if (parsedCommand.CommandType == ParsedCommandType.Select)
651  {
652  response = await client.ExecuteSqlAsync(parsedCommand.FinalSql, 0, -9999, options: null, cancellationToken: combinedToken.Token).ConfigureAwait(false);
653  }
654  else
655  {
656  response = await ExecuteParsedQueryAsync(client, parsedCommand, combinedToken.Token).ConfigureAwait(false);
657  }
658 
659  return new KineticaDataReader(response, behavior);
660  }
661  catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
662  {
663  throw;
664  }
665  catch (OperationCanceledException)
666  {
667  throw new TimeoutException($"Command timeout ({_commandTimeout}s) exceeded");
668  }
669  catch (Exception ex)
670  {
671  throw new KineticaException($"Failed to execute reader: {ex.Message}", ex);
672  }
673  }
674  }
675 
676  private async Task<long> ExecuteParsedCommandAsync(Kinetica client, ParsedCommand parsedCommand, CancellationToken cancellationToken)
677  {
678  return parsedCommand.CommandType switch
679  {
680  ParsedCommandType.Insert => await ExecuteInsertAsync(client, parsedCommand, cancellationToken).ConfigureAwait(false),
681  ParsedCommandType.InsertFromFile => await ExecuteInsertFromFileAsync(client, parsedCommand, cancellationToken).ConfigureAwait(false),
682  ParsedCommandType.Update => await ExecuteUpdateAsync(client, parsedCommand, cancellationToken).ConfigureAwait(false),
683  ParsedCommandType.Delete => await ExecuteDeleteAsync(client, parsedCommand, cancellationToken).ConfigureAwait(false),
684  ParsedCommandType.CreateTable => await ExecuteCreateTableAsync(client, parsedCommand, cancellationToken).ConfigureAwait(false),
685  ParsedCommandType.DropTable => await ExecuteDropTableAsync(client, parsedCommand, cancellationToken).ConfigureAwait(false),
686  ParsedCommandType.SetSchema => ExecuteSetSchema(parsedCommand),
687  ParsedCommandType.SetUser => ExecuteSetUser(parsedCommand),
688  ParsedCommandType.ExecuteAsUser => ExecuteAsUser(parsedCommand),
689  ParsedCommandType.Revert => ExecuteRevert(),
690  _ => await ExecuteGenericSqlAsync(client, parsedCommand, cancellationToken).ConfigureAwait(false)
691  };
692  }
693 
694  private long ExecuteSetSchema(ParsedCommand command)
695  {
696  if (_connection == null)
697  throw new InvalidOperationException("Connection is not open");
698 
699  var schemaName = _sqlParser.ExtractSchemaName(command.OriginalSql);
700  if (!string.IsNullOrEmpty(schemaName))
701  {
702  _connection.SetSchema(schemaName);
703  }
704  return 0;
705  }
706 
707  private long ExecuteSetUser(ParsedCommand command)
708  {
709  if (_connection == null)
710  throw new InvalidOperationException("Connection is not open");
711 
712  if (command.UserImpersonation?.Username != null)
713  {
714  _connection.SetUser(command.UserImpersonation.Username);
715  }
716  return 0;
717  }
718 
719  private long ExecuteAsUser(ParsedCommand command)
720  {
721  if (_connection == null)
722  throw new InvalidOperationException("Connection is not open");
723 
724  if (command.UserImpersonation?.Username != null)
725  {
726  _connection.ExecuteAsUser(command.UserImpersonation.Username);
727  }
728  return 0;
729  }
730 
731  private long ExecuteRevert()
732  {
733  if (_connection == null)
734  throw new InvalidOperationException("Connection is not open");
735 
736  _connection.RevertUser();
737  return 0;
738  }
739 
740  private async Task<ExecuteSqlResponse> ExecuteParsedQueryAsync(Kinetica client, ParsedCommand parsedCommand, CancellationToken cancellationToken)
741  {
742  return await client.ExecuteSqlAsync(parsedCommand.FinalSql, 0, -9999, options: null, cancellationToken: cancellationToken).ConfigureAwait(false);
743  }
744 
745  private async Task<long> ExecuteInsertAsync(Kinetica client, ParsedCommand command, CancellationToken cancellationToken)
746  {
747  // Check if batch mode is enabled on the connection
748  if (_connection?.BatchInsertMode == true && _connection.BatchManager != null)
749  {
750  // Try to parse the INSERT statement for batch processing
751  if (InsertStatementParser.TryParse(command.FinalSql, out var parsedInsert) && parsedInsert != null)
752  {
753  // Use batch insert
754  await _connection.BatchManager.InsertAsync(
755  parsedInsert.TableName,
756  parsedInsert.ColumnNames,
757  parsedInsert.Values,
758  cancellationToken).ConfigureAwait(false);
759 
760  return 1; // Return 1 to indicate one record was queued
761  }
762  }
763 
764  // Fall back to SQL execution
765  var response = await client.ExecuteSqlAsync(command.FinalSql, 0, -9999, options: null, cancellationToken: cancellationToken).ConfigureAwait(false);
766  return response.count_affected;
767  }
768 
769  private async Task<long> ExecuteInsertFromFileAsync(Kinetica client, ParsedCommand command, CancellationToken cancellationToken)
770  {
771  if (command.InsertFromFile == null)
772  {
773  throw new KineticaException("INSERT FROM FILE command is missing file information");
774  }
775 
776  var fileInfo = command.InsertFromFile;
777 
778  // Apply hints to file options
779  if (command.Hints.BatchSize.HasValue)
780  fileInfo.Options.BatchSize = command.Hints.BatchSize.Value;
781  if (command.Hints.TruncateStrings)
782  fileInfo.Options.TruncateStrings = true;
783  if (command.Hints.UpdateOnExistingPk)
784  fileInfo.Options.UpdateOnExistingPk = true;
785  if (command.Hints.IgnoreExistingPk)
786  fileInfo.Options.IgnoreExistingPk = true;
787 
788  // Use CsvFileReader to read and insert
789  var reader = new CsvFileReader(client, fileInfo);
790  var recordCount = await reader.ReadAndInsertAsync(cancellationToken).ConfigureAwait(false);
791 
792  return recordCount;
793  }
794 
795  private async Task<long> ExecuteUpdateAsync(Kinetica client, ParsedCommand command, CancellationToken cancellationToken)
796  {
797  var response = await client.ExecuteSqlAsync(command.FinalSql, 0, -9999, options: null, cancellationToken: cancellationToken).ConfigureAwait(false);
798  return response.count_affected;
799  }
800 
801  private async Task<long> ExecuteDeleteAsync(Kinetica client, ParsedCommand command, CancellationToken cancellationToken)
802  {
803  var response = await client.ExecuteSqlAsync(command.FinalSql, 0, -9999, options: null, cancellationToken: cancellationToken).ConfigureAwait(false);
804  return response.count_affected;
805  }
806 
807  private async Task<int> ExecuteCreateTableAsync(Kinetica client, ParsedCommand command, CancellationToken cancellationToken)
808  {
809  var response = await client.ExecuteSqlAsync(command.FinalSql, 0, -9999, options: null, cancellationToken: cancellationToken).ConfigureAwait(false);
810  return 1; // Table created
811  }
812 
813  private async Task<int> ExecuteDropTableAsync(Kinetica client, ParsedCommand command, CancellationToken cancellationToken)
814  {
815  var response = await client.ExecuteSqlAsync(command.FinalSql, 0, -9999, options: null, cancellationToken: cancellationToken).ConfigureAwait(false);
816  return 1; // Table dropped
817  }
818 
819  private async Task<long> ExecuteGenericSqlAsync(Kinetica client, ParsedCommand command, CancellationToken cancellationToken)
820  {
821  var response = await client.ExecuteSqlAsync(command.FinalSql, 0, -9999, options: null, cancellationToken: cancellationToken).ConfigureAwait(false);
822  return response.count_affected;
823  }
824 
825  protected override DbParameter CreateDbParameter()
826  {
827  return new KineticaParameter();
828  }
829 
830  public override void Prepare()
831  {
832  // Validate SQL and parameters
833  ValidateCommand();
834  try
835  {
836  _sqlParser.Parse(_commandText, _parameters);
837  }
838  catch (Exception ex)
839  {
840  throw new KineticaException($"Failed to prepare command: {ex.Message}", ex);
841  }
842  }
843 
844  private KineticaConnection ValidateCommand()
845  {
846  if (_connection == null || _connection.State != ConnectionState.Open)
847  throw new InvalidOperationException("Connection is not open");
848 
849  if (string.IsNullOrEmpty(_commandText))
850  throw new InvalidOperationException("Command text is not set");
851 
852  return _connection;
853  }
854 
855  protected override void Dispose(bool disposing)
856  {
857  if (disposing)
858  {
859  _cancellationTokenSource?.Dispose();
860  }
861  base.Dispose(disposing);
862  }
863  }
864 
865  // 3. Enhanced DataReader with Async Support
866  public class KineticaDataReader : DbDataReader
867  {
868  private readonly ExecuteSqlResponse _response;
869  private readonly CommandBehavior _behavior;
870  private readonly IList<string> _columnNames;
871  private readonly IList<string> _columnTypes;
872  private int _currentRow = -1;
873  private bool _closed = false;
874 
875  public KineticaDataReader(ExecuteSqlResponse response, CommandBehavior behavior)
876  {
877  _response = response;
878  _behavior = behavior;
879 
880  // Extract column information from the first record's schema if available
881  _columnNames = new List<string>();
882  _columnTypes = new List<string>();
883 
884  if (_response.data != null && _response.data.Count > 0)
885  {
886  var firstRecord = _response.data[0];
887  if (firstRecord?.Schema?.Fields != null)
888  {
889  foreach (var field in firstRecord.Schema.Fields)
890  {
891  _columnNames.Add(field.Name);
892  _columnTypes.Add(GetAvroFieldType(field.Schema));
893  }
894  }
895  }
896  }
897 
898  private static string GetAvroFieldType(Avro.Schema schema)
899  {
900  if (schema is Avro.UnionSchema unionSchema)
901  {
902  // For union types, find the non-null type
903  foreach (var s in unionSchema.Schemas)
904  {
905  if (s.Tag != Avro.Schema.Type.Null)
906  return GetAvroFieldType(s);
907  }
908  }
909 
910  return schema.Tag switch
911  {
912  Avro.Schema.Type.Int => "int",
913  Avro.Schema.Type.Long => "long",
914  Avro.Schema.Type.Float => "float",
915  Avro.Schema.Type.Double => "double",
916  Avro.Schema.Type.String => "string",
917  Avro.Schema.Type.Boolean => "boolean",
918  Avro.Schema.Type.Bytes => "bytes",
919  _ => "string"
920  };
921  }
922 
923  public override bool HasRows => _response.total_number_of_records > 0;
924  public override bool IsClosed => _closed;
925  public override int RecordsAffected => (int)(_response.count_affected);
926  public override int FieldCount => _columnNames.Count;
927  public override object this[int ordinal] => GetValue(ordinal);
928  public override object this[string name] => GetValue(GetOrdinal(name));
929  public override int Depth => 0;
930 
931  public override bool Read()
932  {
933  // Read is synchronous - data is already loaded in memory
934  if (_closed || _response.data == null)
935  return false;
936 
937  _currentRow++;
938  return _currentRow < _response.data.Count;
939  }
940 
941  public override async Task<bool> ReadAsync(CancellationToken cancellationToken)
942  {
943  // Yield to make it properly async (Task.Yield doesn't support ConfigureAwait)
944  await Task.Yield();
945 
946  if (_closed || _response.data == null)
947  return false;
948 
949  _currentRow++;
950  return _currentRow < _response.data.Count;
951  }
952 
953  public override bool NextResult()
954  {
955  return false; // Kinetica doesn't support multiple result sets
956  }
957 
958  public override async Task<bool> NextResultAsync(CancellationToken cancellationToken)
959  {
960  // Yield to make it properly async (Task.Yield doesn't support ConfigureAwait)
961  await Task.Yield();
962  return false; // Kinetica doesn't support multiple result sets
963  }
964 
965  public override void Close()
966  {
967  _closed = true;
968  }
969 
970  public override bool GetBoolean(int ordinal) => Convert.ToBoolean(GetValue(ordinal));
971  public override byte GetByte(int ordinal) => Convert.ToByte(GetValue(ordinal));
972  public override char GetChar(int ordinal) => Convert.ToChar(GetValue(ordinal));
973  public override DateTime GetDateTime(int ordinal) => Convert.ToDateTime(GetValue(ordinal));
974  public override decimal GetDecimal(int ordinal) => Convert.ToDecimal(GetValue(ordinal));
975  public override double GetDouble(int ordinal) => Convert.ToDouble(GetValue(ordinal));
976  public override float GetFloat(int ordinal) => Convert.ToSingle(GetValue(ordinal));
977  public override Guid GetGuid(int ordinal) => Guid.Parse(GetValue(ordinal)?.ToString() ?? "");
978  public override short GetInt16(int ordinal) => Convert.ToInt16(GetValue(ordinal));
979  public override int GetInt32(int ordinal) => Convert.ToInt32(GetValue(ordinal));
980  public override long GetInt64(int ordinal) => Convert.ToInt64(GetValue(ordinal));
981  public override string GetString(int ordinal) => GetValue(ordinal)?.ToString() ?? "";
982 
983  public override long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length)
984  {
985  var value = GetValue(ordinal);
986  if (value == null || value == DBNull.Value)
987  return 0;
988 
989  byte[] bytes;
990  if (value is byte[] byteArray)
991  {
992  bytes = byteArray;
993  }
994  else if (value is string str)
995  {
996  bytes = System.Text.Encoding.UTF8.GetBytes(str);
997  }
998  else
999  {
1000  bytes = System.Text.Encoding.UTF8.GetBytes(value.ToString() ?? "");
1001  }
1002 
1003  // If buffer is null, return total length
1004  if (buffer == null)
1005  return bytes.Length;
1006 
1007  // Calculate how many bytes to copy
1008  long availableBytes = bytes.Length - dataOffset;
1009  if (availableBytes <= 0)
1010  return 0;
1011 
1012  int bytesToCopy = (int)Math.Min(availableBytes, length);
1013  Array.Copy(bytes, dataOffset, buffer, bufferOffset, bytesToCopy);
1014  return bytesToCopy;
1015  }
1016 
1017  public override long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length)
1018  {
1019  var value = GetValue(ordinal);
1020  if (value == null || value == DBNull.Value)
1021  return 0;
1022 
1023  string str = value.ToString() ?? "";
1024 
1025  // If buffer is null, return total length
1026  if (buffer == null)
1027  return str.Length;
1028 
1029  // Calculate how many chars to copy
1030  long availableChars = str.Length - dataOffset;
1031  if (availableChars <= 0)
1032  return 0;
1033 
1034  int charsToCopy = (int)Math.Min(availableChars, length);
1035  str.CopyTo((int)dataOffset, buffer, bufferOffset, charsToCopy);
1036  return charsToCopy;
1037  }
1038 
1039  public override string GetDataTypeName(int ordinal)
1040  {
1041  if (ordinal >= 0 && ordinal < _columnTypes.Count)
1042  return _columnTypes[ordinal];
1043  return "string";
1044  }
1045 
1046  public override Type GetFieldType(int ordinal)
1047  {
1048  var dataType = GetDataTypeName(ordinal).ToLower();
1049  return dataType switch
1050  {
1051  // Integer types
1052  "int" or "integer" or "int32" => typeof(int),
1053  "int8" or "tinyint" => typeof(sbyte),
1054  "int16" or "smallint" => typeof(short),
1055  "long" or "bigint" or "int64" => typeof(long),
1056  "ulong" or "uint64" => typeof(ulong),
1057 
1058  // Floating point types
1059  "float" or "real" => typeof(float),
1060  "double" or "float8" => typeof(double),
1061  "decimal" or "numeric" => typeof(decimal),
1062 
1063  // Boolean
1064  "bool" or "boolean" => typeof(bool),
1065 
1066  // String types (including charN variants)
1067  "string" or "varchar" or "text" => typeof(string),
1068  var s when s.StartsWith("char") => typeof(string),
1069 
1070  // Date/Time types
1071  "date" => typeof(DateTime),
1072  "time" => typeof(TimeSpan),
1073  "datetime" or "timestamp" => typeof(DateTime),
1074 
1075  // Special types
1076  "uuid" or "guid" => typeof(Guid),
1077  "ipv4" => typeof(string),
1078  "json" => typeof(string),
1079  "wkt" => typeof(string),
1080 
1081  // Binary
1082  "bytes" or "binary" or "varbinary" => typeof(byte[]),
1083 
1084  // Vector and array types - return as object
1085  "vector" => typeof(float[]),
1086  var a when a.StartsWith("array") => typeof(object),
1087 
1088  // Default
1089  _ => typeof(string)
1090  };
1091  }
1092 
1093  public override string GetName(int ordinal)
1094  {
1095  if (ordinal >= 0 && ordinal < _columnNames.Count)
1096  return _columnNames[ordinal];
1097  return $"Column{ordinal}";
1098  }
1099 
1100  public override int GetOrdinal(string name)
1101  {
1102  for (int i = 0; i < _columnNames.Count; i++)
1103  {
1104  if (string.Equals(_columnNames[i], name, StringComparison.OrdinalIgnoreCase))
1105  return i;
1106  }
1107  throw new ArgumentException($"Column '{name}' not found");
1108  }
1109 
1110  public override object GetValue(int ordinal)
1111  {
1112  if (_currentRow < 0 || _response.data == null || _currentRow >= _response.data.Count)
1113  throw new InvalidOperationException("No current row");
1114 
1115  if (ordinal < 0 || ordinal >= _columnNames.Count)
1116  throw new ArgumentOutOfRangeException(nameof(ordinal));
1117 
1118  var record = _response.data[_currentRow];
1119  var fieldName = _columnNames[ordinal];
1120 
1121  if (record.TryGetValue(fieldName, out var value))
1122  {
1123  return value ?? DBNull.Value;
1124  }
1125 
1126  return DBNull.Value;
1127  }
1128 
1129  public override int GetValues(object[] values)
1130  {
1131  int count = Math.Min(values.Length, FieldCount);
1132  for (int i = 0; i < count; i++)
1133  {
1134  values[i] = GetValue(i);
1135  }
1136  return count;
1137  }
1138 
1139  public override bool IsDBNull(int ordinal)
1140  {
1141  var value = GetValue(ordinal);
1142  return value == null || value == DBNull.Value;
1143  }
1144 
1145  public override IEnumerator<IDataRecord> GetEnumerator()
1146  {
1147  while (Read())
1148  {
1149  yield return this;
1150  }
1151  }
1152  }
1153 
1158  public class KineticaPagingDataReader : DbDataReader
1159  {
1160  private readonly Kinetica _client;
1161  private readonly string _originalSql;
1162  private readonly int _fetchSize;
1163  private readonly CommandBehavior _behavior;
1164  private readonly CancellationToken _cancellationToken;
1165 
1166  private ExecuteSqlResponse? _currentPage;
1167  private IList<string>? _columnNames;
1168  private IList<string>? _columnTypes;
1169  private int _currentRowInPage = -1;
1170  private long _totalRowsRead = 0;
1171  private long _offset = 0;
1172  private bool _closed = false;
1173  private bool _hasMore = true;
1174  private long _totalRecordCount = -1;
1175 
1177  Kinetica client,
1178  string sql,
1179  int fetchSize,
1180  CommandBehavior behavior,
1181  CancellationToken cancellationToken = default)
1182  {
1183  _client = client ?? throw new ArgumentNullException(nameof(client));
1184  _originalSql = sql ?? throw new ArgumentNullException(nameof(sql));
1185  _fetchSize = fetchSize;
1186  _behavior = behavior;
1187  _cancellationToken = cancellationToken;
1188 
1189  // Fetch first page to get schema
1190  FetchNextPage();
1191  }
1192 
1193  private void FetchNextPage()
1194  {
1195  // Synchronous wrapper for constructor and synchronous Read()
1196  // Uses Task.Run to avoid deadlocks
1197  Task.Run(async () => await FetchNextPageAsync(_cancellationToken).ConfigureAwait(false))
1198  .ConfigureAwait(false)
1199  .GetAwaiter()
1200  .GetResult();
1201  }
1202 
1203  private async Task FetchNextPageAsync(CancellationToken cancellationToken = default)
1204  {
1205  if (!_hasMore || _closed)
1206  return;
1207 
1208  // Build paged query using LIMIT and OFFSET
1209  var pagedSql = BuildPagedQuery(_originalSql, _offset, _fetchSize);
1210 
1211  var response = await _client.ExecuteSqlAsync(pagedSql, 0, -9999, options: null, cancellationToken: cancellationToken).ConfigureAwait(false);
1212  _currentPage = response;
1213  _currentRowInPage = -1;
1214 
1215  // Initialize column info from first page
1216  if (_columnNames == null && response.data != null && response.data.Count > 0)
1217  {
1218  _columnNames = new List<string>();
1219  _columnTypes = new List<string>();
1220 
1221  var firstRecord = response.data[0];
1222  if (firstRecord?.Schema?.Fields != null)
1223  {
1224  foreach (var field in firstRecord.Schema.Fields)
1225  {
1226  _columnNames.Add(field.Name);
1227  _columnTypes!.Add(GetAvroFieldType(field.Schema));
1228  }
1229  }
1230  }
1231 
1232  // Track total record count from first response
1233  if (_totalRecordCount < 0)
1234  {
1235  _totalRecordCount = response.total_number_of_records;
1236  }
1237 
1238  // Check if there are more records to fetch
1239  var recordsInPage = response.data?.Count ?? 0;
1240  _offset += recordsInPage;
1241  _hasMore = recordsInPage == _fetchSize && _offset < _totalRecordCount;
1242  }
1243 
1244  private static string BuildPagedQuery(string sql, long offset, int limit)
1245  {
1246  // Check if query already has LIMIT/OFFSET
1247  var upperSql = sql.ToUpperInvariant();
1248  if (upperSql.Contains(" LIMIT ") || upperSql.Contains(" OFFSET "))
1249  {
1250  // Don't modify queries that already have paging
1251  return sql;
1252  }
1253 
1254  // Append LIMIT and OFFSET
1255  return $"{sql.TrimEnd(';', ' ')} LIMIT {limit} OFFSET {offset}";
1256  }
1257 
1258  private static string GetAvroFieldType(Avro.Schema schema)
1259  {
1260  if (schema is Avro.UnionSchema unionSchema)
1261  {
1262  foreach (var s in unionSchema.Schemas)
1263  {
1264  if (s.Tag != Avro.Schema.Type.Null)
1265  return GetAvroFieldType(s);
1266  }
1267  }
1268 
1269  return schema.Tag switch
1270  {
1271  Avro.Schema.Type.Int => "int",
1272  Avro.Schema.Type.Long => "long",
1273  Avro.Schema.Type.Float => "float",
1274  Avro.Schema.Type.Double => "double",
1275  Avro.Schema.Type.String => "string",
1276  Avro.Schema.Type.Boolean => "boolean",
1277  Avro.Schema.Type.Bytes => "bytes",
1278  _ => "string"
1279  };
1280  }
1281 
1282  public override bool HasRows => _totalRecordCount > 0;
1283  public override bool IsClosed => _closed;
1284  public override int RecordsAffected => (int)_totalRecordCount;
1285  public override int FieldCount => _columnNames?.Count ?? 0;
1286  public override object this[int ordinal] => GetValue(ordinal);
1287  public override object this[string name] => GetValue(GetOrdinal(name));
1288  public override int Depth => 0;
1289 
1293  public long TotalRowsRead => _totalRowsRead;
1294 
1298  public long TotalRecordCount => _totalRecordCount;
1299 
1300  public override bool Read()
1301  {
1302  if (_closed || _currentPage?.data == null)
1303  return false;
1304 
1305  _currentRowInPage++;
1306 
1307  // Check if we need to fetch next page
1308  if (_currentRowInPage >= _currentPage.data.Count)
1309  {
1310  if (!_hasMore)
1311  return false;
1312 
1313  FetchNextPage();
1314 
1315  if (_currentPage?.data == null || _currentPage.data.Count == 0)
1316  return false;
1317 
1318  _currentRowInPage = 0;
1319  }
1320 
1321  _totalRowsRead++;
1322  return true;
1323  }
1324 
1325  public override async Task<bool> ReadAsync(CancellationToken cancellationToken)
1326  {
1327  if (_closed || _currentPage?.data == null)
1328  return false;
1329 
1330  _currentRowInPage++;
1331 
1332  // Check if we need to fetch next page
1333  if (_currentRowInPage >= _currentPage.data.Count)
1334  {
1335  if (!_hasMore)
1336  return false;
1337 
1338  await FetchNextPageAsync(cancellationToken).ConfigureAwait(false);
1339 
1340  if (_currentPage?.data == null || _currentPage.data.Count == 0)
1341  return false;
1342 
1343  _currentRowInPage = 0;
1344  }
1345 
1346  _totalRowsRead++;
1347  return true;
1348  }
1349 
1350  public override bool NextResult() => false;
1351 
1352  public override void Close()
1353  {
1354  _closed = true;
1355  _currentPage = null;
1356  }
1357 
1358  public override DataTable GetSchemaTable()
1359  {
1360  var schemaTable = new DataTable("SchemaTable");
1361  schemaTable.Columns.Add("ColumnName", typeof(string));
1362  schemaTable.Columns.Add("ColumnOrdinal", typeof(int));
1363  schemaTable.Columns.Add("DataType", typeof(Type));
1364  schemaTable.Columns.Add("ColumnSize", typeof(int));
1365 
1366  if (_columnNames != null)
1367  {
1368  for (int i = 0; i < _columnNames.Count; i++)
1369  {
1370  var row = schemaTable.NewRow();
1371  row["ColumnName"] = _columnNames[i];
1372  row["ColumnOrdinal"] = i;
1373  row["DataType"] = GetFieldType(i);
1374  row["ColumnSize"] = -1;
1375  schemaTable.Rows.Add(row);
1376  }
1377  }
1378 
1379  return schemaTable;
1380  }
1381 
1382  public override bool GetBoolean(int ordinal) => Convert.ToBoolean(GetValue(ordinal));
1383  public override byte GetByte(int ordinal) => Convert.ToByte(GetValue(ordinal));
1384  public override char GetChar(int ordinal) => Convert.ToChar(GetValue(ordinal));
1385  public override DateTime GetDateTime(int ordinal) => Convert.ToDateTime(GetValue(ordinal));
1386  public override decimal GetDecimal(int ordinal) => Convert.ToDecimal(GetValue(ordinal));
1387  public override double GetDouble(int ordinal) => Convert.ToDouble(GetValue(ordinal));
1388  public override float GetFloat(int ordinal) => Convert.ToSingle(GetValue(ordinal));
1389  public override Guid GetGuid(int ordinal) => Guid.Parse(GetValue(ordinal)?.ToString() ?? Guid.Empty.ToString());
1390  public override short GetInt16(int ordinal) => Convert.ToInt16(GetValue(ordinal));
1391  public override int GetInt32(int ordinal) => Convert.ToInt32(GetValue(ordinal));
1392  public override long GetInt64(int ordinal) => Convert.ToInt64(GetValue(ordinal));
1393  public override string GetString(int ordinal) => GetValue(ordinal)?.ToString() ?? string.Empty;
1394 
1395  public override long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length)
1396  {
1397  var value = GetValue(ordinal);
1398  if (value is not byte[] bytes)
1399  return 0;
1400 
1401  if (buffer == null)
1402  return bytes.Length;
1403 
1404  int bytesToCopy = Math.Min(bytes.Length - (int)dataOffset, length);
1405  Array.Copy(bytes, (int)dataOffset, buffer, bufferOffset, bytesToCopy);
1406  return bytesToCopy;
1407  }
1408 
1409  public override long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length)
1410  {
1411  var str = GetString(ordinal);
1412  if (buffer == null)
1413  return str.Length;
1414 
1415  int availableChars = str.Length - (int)dataOffset;
1416  int charsToCopy = Math.Min(availableChars, length);
1417  str.CopyTo((int)dataOffset, buffer, bufferOffset, charsToCopy);
1418  return charsToCopy;
1419  }
1420 
1421  public override string GetDataTypeName(int ordinal)
1422  {
1423  if (_columnTypes != null && ordinal >= 0 && ordinal < _columnTypes.Count)
1424  return _columnTypes[ordinal];
1425  return "string";
1426  }
1427 
1428  public override Type GetFieldType(int ordinal)
1429  {
1430  var dataType = GetDataTypeName(ordinal).ToLower();
1431  return dataType switch
1432  {
1433  "int" or "integer" or "int32" => typeof(int),
1434  "int8" or "tinyint" => typeof(sbyte),
1435  "int16" or "smallint" => typeof(short),
1436  "long" or "bigint" or "int64" => typeof(long),
1437  "float" or "real" => typeof(float),
1438  "double" or "float8" => typeof(double),
1439  "decimal" or "numeric" => typeof(decimal),
1440  "bool" or "boolean" => typeof(bool),
1441  "string" or "varchar" or "text" => typeof(string),
1442  var s when s.StartsWith("char") => typeof(string),
1443  "date" => typeof(DateTime),
1444  "time" => typeof(TimeSpan),
1445  "datetime" or "timestamp" => typeof(DateTime),
1446  "uuid" or "guid" => typeof(Guid),
1447  "bytes" or "binary" => typeof(byte[]),
1448  _ => typeof(string)
1449  };
1450  }
1451 
1452  public override string GetName(int ordinal)
1453  {
1454  if (_columnNames != null && ordinal >= 0 && ordinal < _columnNames.Count)
1455  return _columnNames[ordinal];
1456  return $"Column{ordinal}";
1457  }
1458 
1459  public override int GetOrdinal(string name)
1460  {
1461  if (_columnNames != null)
1462  {
1463  for (int i = 0; i < _columnNames.Count; i++)
1464  {
1465  if (string.Equals(_columnNames[i], name, StringComparison.OrdinalIgnoreCase))
1466  return i;
1467  }
1468  }
1469  throw new ArgumentException($"Column '{name}' not found");
1470  }
1471 
1472  public override object GetValue(int ordinal)
1473  {
1474  if (_currentRowInPage < 0 || _currentPage?.data == null || _currentRowInPage >= _currentPage.data.Count)
1475  throw new InvalidOperationException("No current row");
1476 
1477  if (ordinal < 0 || ordinal >= FieldCount)
1478  throw new ArgumentOutOfRangeException(nameof(ordinal));
1479 
1480  var record = _currentPage.data[_currentRowInPage];
1481  if (record == null)
1482  return DBNull.Value;
1483 
1484  var fieldName = GetName(ordinal);
1485  if (record.TryGetValue(fieldName, out var value))
1486  {
1487  return value ?? DBNull.Value;
1488  }
1489 
1490  return DBNull.Value;
1491  }
1492 
1493  public override int GetValues(object[] values)
1494  {
1495  int count = Math.Min(values.Length, FieldCount);
1496  for (int i = 0; i < count; i++)
1497  {
1498  values[i] = GetValue(i);
1499  }
1500  return count;
1501  }
1502 
1503  public override bool IsDBNull(int ordinal)
1504  {
1505  var value = GetValue(ordinal);
1506  return value == null || value == DBNull.Value;
1507  }
1508 
1509  public override IEnumerator<IDataRecord> GetEnumerator()
1510  {
1511  while (Read())
1512  {
1513  yield return this;
1514  }
1515  }
1516  }
1517 
1518  // 4. Connection String Builder - JDBC-Compatible Properties
1524  {
1525  private readonly Dictionary<string, object> _properties;
1526 
1527  // Property key constants matching JDBC driver
1528  public static class PropertyKeys
1529  {
1530  // Connection properties
1531  public const string Server = "Server";
1532  public const string Url = "URL";
1533  public const string PrimaryUrl = "PrimaryURL";
1534  public const string Username = "Username";
1535  public const string Password = "Password";
1536  public const string OAuthToken = "OAuthToken";
1537  public const string Database = "Database";
1538  public const string Schema = "Schema";
1539  public const string ImpersonateUser = "ImpersonateUser";
1540 
1541  // Timeout properties
1542  public const string Timeout = "Timeout";
1543  public const string ConnectionTimeout = "Connection Timeout";
1544  public const string InitialConnectionTimeout = "InitialConnectionTimeoutSeconds";
1545  public const string ServerConnectionTimeout = "ServerConnectionTimeoutSeconds";
1546 
1547  // SSL/TLS properties
1548  public const string BypassSslCertCheck = "BypassSslCertCheck";
1549  public const string SslCaCertPath = "SslCACertPath";
1550  public const string SslCertPassword = "SslCertPassword";
1551  public const string SslAllowHostMismatch = "SslAllowHostMismatch";
1552 
1553  // Network properties
1554  public const string DisableAutoDiscovery = "DisableAutoDiscovery";
1555  public const string DisableFailover = "DisableFailover";
1556  public const string FailoverOrder = "FailoverOrder";
1557  public const string FailbackPollInterval = "FailbackPollInterval";
1558  public const string DisableSnappy = "DisableSnappy";
1559 
1560  // Query optimization properties
1561  public const string CostBasedOptimization = "CostBasedOptimization";
1562  public const string DistributedJoins = "DistributedJoins";
1563  public const string ParallelExecution = "ParallelExecution";
1564  public const string PlanCache = "PlanCache";
1565  public const string ResultsCaching = "ResultsCaching";
1566  public const string RuleBasedOptimizations = "RuleBasedOptimizations";
1567  public const string SsqOptimizations = "SsqOptimizations";
1568  public const string UseApproxCountDistinct = "UseApproxCountDistinct";
1569  public const string ValidateChange = "ValidateChange";
1570 
1571  // Query control properties
1572  public const string ReadOnly = "ReadOnly";
1573  public const string Ttl = "TTL";
1574  public const string PagingTableTtl = "PagingTableTTL";
1575  public const string Limit = "Limit";
1576  public const string RowsPerFetch = "RowsPerFetch";
1577  public const string FetchSize = "FetchSize";
1578 
1579  // Insertion properties
1580  public const string RowsPerInsertion = "RowsPerInsertion";
1581  public const string DisableMultiheadInsert = "DisableMultiheadInsert";
1582  public const string IgnoreExistingPk = "IgnoreExistingPk";
1583  public const string TruncateStrings = "TruncateStrings";
1584  public const string UpdateOnExistingPk = "UpdateOnExistingPk";
1585  public const string ErrorMode = "ErrorMode";
1586  public const string Replication = "Replication";
1587  public const string NoSync = "NoSync";
1588 
1589  // File I/O properties
1590  public const string FileReadDelimiter = "FileReadDelimiter";
1591  public const string FileReadHasHeader = "FileReadHasHeader";
1592  public const string FileReadNullString = "FileReadNullString";
1593  public const string FileReadEscapeChar = "FileReadEscapeChar";
1594  public const string FileReadQuoteChar = "FileReadQuoteChar";
1595  public const string FileReadComment = "FileReadComment";
1596  public const string FileReadInitialClear = "FileReadInitialClear";
1597  public const string FileReadLimit = "FileReadLimit";
1598  public const string FileReadSkip = "FileReadSkip";
1599 
1600  // Connection pooling
1601  public const string Pooling = "Pooling";
1602  public const string MaxPoolSize = "Max Pool Size";
1603  public const string MinPoolSize = "Min Pool Size";
1604 
1605  // Batch insert mode
1606  public const string BatchInsertMode = "Batch Insert Mode";
1607  public const string BatchSize = "Batch Size";
1608  public const string BatchUpdateOnExistingPk = "Batch Update On Existing Pk";
1609 
1610  // Misc
1611  public const string TimeZoneOverride = "TimeZoneOverride";
1612  public const string TokenNameClaim = "TokenNameClaim";
1613  public const string UseKeyLookup = "UseKeyLookup";
1614  public const string FakeTransactions = "FakeTransactions";
1615  public const string LogLevel = "LogLevel";
1616  }
1617 
1618  public KineticaConnectionStringBuilder() : this(string.Empty) { }
1619 
1620  public KineticaConnectionStringBuilder(string connectionString)
1621  {
1622  _properties = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase);
1623  ParseConnectionString(connectionString);
1624  }
1625 
1626  #region Connection Properties
1627 
1632  public string Server
1633  {
1634  get => GetProperty<string>(PropertyKeys.Server, "http://127.0.0.1:9191");
1635  set => SetProperty(PropertyKeys.Server, value);
1636  }
1637 
1641  public string PrimaryUrl
1642  {
1643  get => GetProperty<string>(PropertyKeys.PrimaryUrl, "");
1644  set => SetProperty(PropertyKeys.PrimaryUrl, value);
1645  }
1646 
1651  public string Username
1652  {
1653  get => GetProperty<string>(PropertyKeys.Username, "");
1654  set => SetProperty(PropertyKeys.Username, value);
1655  }
1656 
1661  public string Password
1662  {
1663  get => GetProperty<string>(PropertyKeys.Password, "");
1664  set => SetProperty(PropertyKeys.Password, value);
1665  }
1666 
1670  public string OAuthToken
1671  {
1672  get => GetProperty<string>(PropertyKeys.OAuthToken, "");
1673  set => SetProperty(PropertyKeys.OAuthToken, value);
1674  }
1675 
1680  public string? Database
1681  {
1682  get => GetProperty<string?>(PropertyKeys.Database, null);
1683  set => SetProperty(PropertyKeys.Database, value);
1684  }
1685 
1689  public string? Schema
1690  {
1691  get => GetProperty<string?>(PropertyKeys.Schema, null);
1692  set => SetProperty(PropertyKeys.Schema, value);
1693  }
1694 
1698  public string? ImpersonateUser
1699  {
1700  get => GetProperty<string?>(PropertyKeys.ImpersonateUser, null);
1701  set => SetProperty(PropertyKeys.ImpersonateUser, value);
1702  }
1703 
1704  #endregion
1705 
1706  #region Timeout Properties
1707 
1711  public int Timeout
1712  {
1713  get => GetProperty<int>(PropertyKeys.Timeout, -1);
1714  set => SetProperty(PropertyKeys.Timeout, value);
1715  }
1716 
1720  public int ConnectionTimeout
1721  {
1722  get => GetProperty<int>(PropertyKeys.ConnectionTimeout, 30);
1723  set => SetProperty(PropertyKeys.ConnectionTimeout, value);
1724  }
1725 
1729  public int InitialConnectionTimeout
1730  {
1731  get => GetProperty<int>(PropertyKeys.InitialConnectionTimeout, -1);
1732  set => SetProperty(PropertyKeys.InitialConnectionTimeout, value);
1733  }
1734 
1738  public int ServerConnectionTimeout
1739  {
1740  get => GetProperty<int>(PropertyKeys.ServerConnectionTimeout, -1);
1741  set => SetProperty(PropertyKeys.ServerConnectionTimeout, value);
1742  }
1743 
1744  #endregion
1745 
1746  #region SSL/TLS Properties
1747 
1751  public bool BypassSslCertCheck
1752  {
1753  get => GetProperty<bool>(PropertyKeys.BypassSslCertCheck, false);
1754  set => SetProperty(PropertyKeys.BypassSslCertCheck, value);
1755  }
1756 
1760  public string? SslCaCertPath
1761  {
1762  get => GetProperty<string?>(PropertyKeys.SslCaCertPath, null);
1763  set => SetProperty(PropertyKeys.SslCaCertPath, value);
1764  }
1765 
1769  public string? SslCertPassword
1770  {
1771  get => GetProperty<string?>(PropertyKeys.SslCertPassword, null);
1772  set => SetProperty(PropertyKeys.SslCertPassword, value);
1773  }
1774 
1778  public bool SslAllowHostMismatch
1779  {
1780  get => GetProperty<bool>(PropertyKeys.SslAllowHostMismatch, false);
1781  set => SetProperty(PropertyKeys.SslAllowHostMismatch, value);
1782  }
1783 
1784  #endregion
1785 
1786  #region Network Properties
1787 
1791  public bool DisableAutoDiscovery
1792  {
1793  get => GetProperty<bool>(PropertyKeys.DisableAutoDiscovery, false);
1794  set => SetProperty(PropertyKeys.DisableAutoDiscovery, value);
1795  }
1796 
1800  public bool DisableFailover
1801  {
1802  get => GetProperty<bool>(PropertyKeys.DisableFailover, false);
1803  set => SetProperty(PropertyKeys.DisableFailover, value);
1804  }
1805 
1809  public string? FailoverOrder
1810  {
1811  get => GetProperty<string?>(PropertyKeys.FailoverOrder, null);
1812  set => SetProperty(PropertyKeys.FailoverOrder, value);
1813  }
1814 
1818  public int FailbackPollInterval
1819  {
1820  get => GetProperty<int>(PropertyKeys.FailbackPollInterval, -1);
1821  set => SetProperty(PropertyKeys.FailbackPollInterval, value);
1822  }
1823 
1827  public bool DisableSnappy
1828  {
1829  get => GetProperty<bool>(PropertyKeys.DisableSnappy, false);
1830  set => SetProperty(PropertyKeys.DisableSnappy, value);
1831  }
1832 
1833  #endregion
1834 
1835  #region Query Optimization Properties
1836 
1840  public bool CostBasedOptimization
1841  {
1842  get => GetProperty<bool>(PropertyKeys.CostBasedOptimization, true);
1843  set => SetProperty(PropertyKeys.CostBasedOptimization, value);
1844  }
1845 
1849  public bool DistributedJoins
1850  {
1851  get => GetProperty<bool>(PropertyKeys.DistributedJoins, true);
1852  set => SetProperty(PropertyKeys.DistributedJoins, value);
1853  }
1854 
1858  public bool ParallelExecution
1859  {
1860  get => GetProperty<bool>(PropertyKeys.ParallelExecution, true);
1861  set => SetProperty(PropertyKeys.ParallelExecution, value);
1862  }
1863 
1867  public bool PlanCache
1868  {
1869  get => GetProperty<bool>(PropertyKeys.PlanCache, true);
1870  set => SetProperty(PropertyKeys.PlanCache, value);
1871  }
1872 
1876  public bool ResultsCaching
1877  {
1878  get => GetProperty<bool>(PropertyKeys.ResultsCaching, true);
1879  set => SetProperty(PropertyKeys.ResultsCaching, value);
1880  }
1881 
1885  public bool RuleBasedOptimizations
1886  {
1887  get => GetProperty<bool>(PropertyKeys.RuleBasedOptimizations, true);
1888  set => SetProperty(PropertyKeys.RuleBasedOptimizations, value);
1889  }
1890 
1894  public bool SsqOptimizations
1895  {
1896  get => GetProperty<bool>(PropertyKeys.SsqOptimizations, true);
1897  set => SetProperty(PropertyKeys.SsqOptimizations, value);
1898  }
1899 
1903  public bool UseApproxCountDistinct
1904  {
1905  get => GetProperty<bool>(PropertyKeys.UseApproxCountDistinct, false);
1906  set => SetProperty(PropertyKeys.UseApproxCountDistinct, value);
1907  }
1908 
1909  #endregion
1910 
1911  #region Query Control Properties
1912 
1916  public bool ReadOnly
1917  {
1918  get => GetProperty<bool>(PropertyKeys.ReadOnly, false);
1919  set => SetProperty(PropertyKeys.ReadOnly, value);
1920  }
1921 
1925  public int Ttl
1926  {
1927  get => GetProperty<int>(PropertyKeys.Ttl, 20);
1928  set => SetProperty(PropertyKeys.Ttl, value);
1929  }
1930 
1934  public int PagingTableTtl
1935  {
1936  get => GetProperty<int>(PropertyKeys.PagingTableTtl, 20);
1937  set => SetProperty(PropertyKeys.PagingTableTtl, value);
1938  }
1939 
1943  public int Limit
1944  {
1945  get => GetProperty<int>(PropertyKeys.Limit, -1);
1946  set => SetProperty(PropertyKeys.Limit, value);
1947  }
1948 
1952  public int RowsPerFetch
1953  {
1954  get => GetProperty<int>(PropertyKeys.RowsPerFetch, 10000);
1955  set => SetProperty(PropertyKeys.RowsPerFetch, value);
1956  }
1957 
1963  public int FetchSize
1964  {
1965  get => GetProperty<int>(PropertyKeys.FetchSize, 0);
1966  set => SetProperty(PropertyKeys.FetchSize, value);
1967  }
1968 
1972  public bool UseKeyLookup
1973  {
1974  get => GetProperty<bool>(PropertyKeys.UseKeyLookup, false);
1975  set => SetProperty(PropertyKeys.UseKeyLookup, value);
1976  }
1977 
1978  #endregion
1979 
1980  #region Insertion Properties
1981 
1985  public int RowsPerInsertion
1986  {
1987  get => GetProperty<int>(PropertyKeys.RowsPerInsertion, 10000);
1988  set => SetProperty(PropertyKeys.RowsPerInsertion, value);
1989  }
1990 
1994  public bool DisableMultiheadInsert
1995  {
1996  get => GetProperty<bool>(PropertyKeys.DisableMultiheadInsert, false);
1997  set => SetProperty(PropertyKeys.DisableMultiheadInsert, value);
1998  }
1999 
2003  public bool IgnoreExistingPk
2004  {
2005  get => GetProperty<bool>(PropertyKeys.IgnoreExistingPk, false);
2006  set => SetProperty(PropertyKeys.IgnoreExistingPk, value);
2007  }
2008 
2012  public bool TruncateStrings
2013  {
2014  get => GetProperty<bool>(PropertyKeys.TruncateStrings, false);
2015  set => SetProperty(PropertyKeys.TruncateStrings, value);
2016  }
2017 
2021  public bool UpdateOnExistingPk
2022  {
2023  get => GetProperty<bool>(PropertyKeys.UpdateOnExistingPk, false);
2024  set => SetProperty(PropertyKeys.UpdateOnExistingPk, value);
2025  }
2026 
2030  public string? ErrorMode
2031  {
2032  get => GetProperty<string?>(PropertyKeys.ErrorMode, null);
2033  set => SetProperty(PropertyKeys.ErrorMode, value);
2034  }
2035 
2039  public string? Replication
2040  {
2041  get => GetProperty<string?>(PropertyKeys.Replication, null);
2042  set => SetProperty(PropertyKeys.Replication, value);
2043  }
2044 
2048  public bool NoSync
2049  {
2050  get => GetProperty<bool>(PropertyKeys.NoSync, false);
2051  set => SetProperty(PropertyKeys.NoSync, value);
2052  }
2053 
2054  #endregion
2055 
2056  #region File I/O Properties
2057 
2061  public string FileReadDelimiter
2062  {
2063  get => GetProperty<string>(PropertyKeys.FileReadDelimiter, ",");
2064  set => SetProperty(PropertyKeys.FileReadDelimiter, value);
2065  }
2066 
2070  public bool FileReadHasHeader
2071  {
2072  get => GetProperty<bool>(PropertyKeys.FileReadHasHeader, false);
2073  set => SetProperty(PropertyKeys.FileReadHasHeader, value);
2074  }
2075 
2079  public string FileReadNullString
2080  {
2081  get => GetProperty<string>(PropertyKeys.FileReadNullString, "\\N");
2082  set => SetProperty(PropertyKeys.FileReadNullString, value);
2083  }
2084 
2088  public string FileReadEscapeChar
2089  {
2090  get => GetProperty<string>(PropertyKeys.FileReadEscapeChar, "\\0");
2091  set => SetProperty(PropertyKeys.FileReadEscapeChar, value);
2092  }
2093 
2097  public string FileReadQuoteChar
2098  {
2099  get => GetProperty<string>(PropertyKeys.FileReadQuoteChar, "\"");
2100  set => SetProperty(PropertyKeys.FileReadQuoteChar, value);
2101  }
2102 
2106  public string FileReadComment
2107  {
2108  get => GetProperty<string>(PropertyKeys.FileReadComment, "#");
2109  set => SetProperty(PropertyKeys.FileReadComment, value);
2110  }
2111 
2115  public bool FileReadInitialClear
2116  {
2117  get => GetProperty<bool>(PropertyKeys.FileReadInitialClear, false);
2118  set => SetProperty(PropertyKeys.FileReadInitialClear, value);
2119  }
2120 
2124  public int FileReadLimit
2125  {
2126  get => GetProperty<int>(PropertyKeys.FileReadLimit, int.MaxValue);
2127  set => SetProperty(PropertyKeys.FileReadLimit, value);
2128  }
2129 
2133  public int FileReadSkip
2134  {
2135  get => GetProperty<int>(PropertyKeys.FileReadSkip, 0);
2136  set => SetProperty(PropertyKeys.FileReadSkip, value);
2137  }
2138 
2139  #endregion
2140 
2141  #region Connection Pooling Properties
2142 
2146  public bool Pooling
2147  {
2148  get => GetProperty<bool>(PropertyKeys.Pooling, true);
2149  set => SetProperty(PropertyKeys.Pooling, value);
2150  }
2151 
2155  public int MaxPoolSize
2156  {
2157  get => GetProperty<int>(PropertyKeys.MaxPoolSize, 100);
2158  set => SetProperty(PropertyKeys.MaxPoolSize, value);
2159  }
2160 
2164  public int MinPoolSize
2165  {
2166  get => GetProperty<int>(PropertyKeys.MinPoolSize, 0);
2167  set => SetProperty(PropertyKeys.MinPoolSize, value);
2168  }
2169 
2170  #endregion
2171 
2172  #region Batch Insert Properties
2173 
2178  public bool BatchInsertMode
2179  {
2180  get => GetProperty<bool>(PropertyKeys.BatchInsertMode, false);
2181  set => SetProperty(PropertyKeys.BatchInsertMode, value);
2182  }
2183 
2187  public int BatchSize
2188  {
2189  get => GetProperty<int>(PropertyKeys.BatchSize, 10000);
2190  set => SetProperty(PropertyKeys.BatchSize, value);
2191  }
2192 
2196  public bool BatchUpdateOnExistingPk
2197  {
2198  get => GetProperty<bool>(PropertyKeys.BatchUpdateOnExistingPk, false);
2199  set => SetProperty(PropertyKeys.BatchUpdateOnExistingPk, value);
2200  }
2201 
2202  #endregion
2203 
2204  #region Misc Properties
2205 
2209  public string? TimeZoneOverride
2210  {
2211  get => GetProperty<string?>(PropertyKeys.TimeZoneOverride, null);
2212  set => SetProperty(PropertyKeys.TimeZoneOverride, value);
2213  }
2214 
2218  public string TokenNameClaim
2219  {
2220  get => GetProperty<string>(PropertyKeys.TokenNameClaim, "sub");
2221  set => SetProperty(PropertyKeys.TokenNameClaim, value);
2222  }
2223 
2227  public bool FakeTransactions
2228  {
2229  get => GetProperty<bool>(PropertyKeys.FakeTransactions, false);
2230  set => SetProperty(PropertyKeys.FakeTransactions, value);
2231  }
2232 
2236  public string? LogLevel
2237  {
2238  get => GetProperty<string?>(PropertyKeys.LogLevel, null);
2239  set => SetProperty(PropertyKeys.LogLevel, value);
2240  }
2241 
2242  #endregion
2243 
2244  #region Property Access Methods
2245 
2246  private T GetProperty<T>(string key, T defaultValue)
2247  {
2248  if (_properties.TryGetValue(key, out var value))
2249  {
2250  if (value == null)
2251  return defaultValue;
2252 
2253  // Handle boolean parsing from string
2254  if (typeof(T) == typeof(bool) && value is string strVal)
2255  {
2256  if (bool.TryParse(strVal, out var boolResult))
2257  return (T)(object)boolResult;
2258  // Also handle "1" and "0"
2259  if (strVal == "1" || strVal.Equals("true", StringComparison.OrdinalIgnoreCase))
2260  return (T)(object)true;
2261  if (strVal == "0" || strVal.Equals("false", StringComparison.OrdinalIgnoreCase))
2262  return (T)(object)false;
2263  }
2264 
2265  // Handle int parsing from string
2266  if (typeof(T) == typeof(int) && value is string strIntVal)
2267  {
2268  if (int.TryParse(strIntVal, out var intResult))
2269  return (T)(object)intResult;
2270  }
2271 
2272  return (T)Convert.ChangeType(value, typeof(T));
2273  }
2274  return defaultValue;
2275  }
2276 
2277  private void SetProperty(string key, object? value)
2278  {
2279  if (value != null)
2280  _properties[key] = value;
2281  else
2282  _properties.Remove(key);
2283  }
2284 
2288  public IReadOnlyDictionary<string, object> Properties => _properties;
2289 
2293  public object? this[string key]
2294  {
2295  get => _properties.TryGetValue(key, out var value) ? value : null;
2296  set => SetProperty(key, value);
2297  }
2298 
2299  #endregion
2300 
2301  #region Parsing
2302 
2303  private void ParseConnectionString(string connectionString)
2304  {
2305  if (string.IsNullOrEmpty(connectionString))
2306  return;
2307 
2308  var pairs = connectionString.Split(';');
2309  foreach (var pair in pairs)
2310  {
2311  var keyValue = pair.Split('=', 2);
2312  if (keyValue.Length == 2)
2313  {
2314  var key = keyValue[0].Trim();
2315  var value = keyValue[1].Trim();
2316 
2317  // Handle aliases for compatibility
2318  key = NormalizePropertyKey(key);
2319  _properties[key] = value;
2320  }
2321  }
2322  }
2323 
2324  private static string NormalizePropertyKey(string key)
2325  {
2326  // Normalize common aliases to canonical property names
2327  return key.ToUpperInvariant() switch
2328  {
2329  "URL" or "HOST" or "DATA SOURCE" => PropertyKeys.Server,
2330  "UID" or "USER" or "USER ID" => PropertyKeys.Username,
2331  "PWD" => PropertyKeys.Password,
2332  "INITIAL CATALOG" => PropertyKeys.Database,
2333  "OAUTHTOKEN" or "OAUTH TOKEN" or "OAUTH_TOKEN" => PropertyKeys.OAuthToken,
2334  _ => key
2335  };
2336  }
2337 
2338  public override string ToString()
2339  {
2340  return string.Join(";", _properties.Select(kvp => $"{kvp.Key}={kvp.Value}"));
2341  }
2342 
2343  #endregion
2344 
2345  #region Query Options Builder
2346 
2350  public IDictionary<string, string> BuildQueryOptions()
2351  {
2352  var options = new Dictionary<string, string>();
2353 
2354  // Query optimization options
2355  if (!CostBasedOptimization)
2356  options["cost_based_optimization"] = "false";
2357  if (!DistributedJoins)
2358  options["distributed_joins"] = "false";
2359  if (!ParallelExecution)
2360  options["parallel_execution"] = "false";
2361  if (!PlanCache)
2362  options["plan_cache"] = "false";
2363  if (!ResultsCaching)
2364  options["results_caching"] = "false";
2366  options["rule_based_optimizations"] = "false";
2367  if (!SsqOptimizations)
2368  options["ssq_optimization"] = "false";
2370  options["use_approx_count_distinct"] = "true";
2371 
2372  // TTL and paging
2373  if (Ttl != 20)
2374  options["ttl"] = Ttl.ToString();
2375  if (PagingTableTtl != 20)
2376  options["paging_table_ttl"] = PagingTableTtl.ToString();
2377  if (Limit > 0)
2378  options["limit"] = Limit.ToString();
2379 
2380  return options;
2381  }
2382 
2386  public IDictionary<string, string> BuildInsertOptions()
2387  {
2388  var options = new Dictionary<string, string>();
2389 
2390  if (UpdateOnExistingPk)
2391  options["update_on_existing_pk"] = "true";
2392  if (IgnoreExistingPk)
2393  options["ignore_existing_pk"] = "true";
2394  if (TruncateStrings)
2395  options["truncate_strings"] = "true";
2396  if (NoSync)
2397  options["no_sync"] = "true";
2398  if (!string.IsNullOrEmpty(ErrorMode))
2399  options["error_mode"] = ErrorMode;
2400  if (!string.IsNullOrEmpty(Replication))
2401  options["replication_mode"] = Replication;
2402 
2403  return options;
2404  }
2405 
2406  #endregion
2407  }
2408 
2409  // 5. Connection Pool Implementation
2411  {
2412  private readonly ConcurrentDictionary<string, ConnectionPoolEntry> _pools = new();
2413  private readonly Timer _cleanupTimer;
2414 
2416  {
2417  _cleanupTimer = new Timer(CleanupExpiredConnections!, null, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(5));
2418  }
2419 
2420  public async Task<Kinetica> GetConnectionAsync(string connectionString, CancellationToken cancellationToken)
2421  {
2422  var pool = _pools.GetOrAdd(connectionString, cs => new ConnectionPoolEntry(cs));
2423  return await pool.GetConnectionAsync(cancellationToken).ConfigureAwait(false);
2424  }
2425 
2426  public void ReturnConnection(string connectionString, Kinetica connection)
2427  {
2428  if (_pools.TryGetValue(connectionString, out var pool))
2429  {
2430  pool.ReturnConnection(connection);
2431  }
2432  }
2433 
2434  private void CleanupExpiredConnections(object state)
2435  {
2436  foreach (var pool in _pools.Values)
2437  {
2438  pool.CleanupExpiredConnections();
2439  }
2440  }
2441 
2442  private class ConnectionPoolEntry
2443  {
2444  private readonly KineticaConnectionStringBuilder _connectionStringBuilder;
2445  private readonly ConcurrentQueue<PooledConnection> _connections = new();
2446  private readonly SemaphoreSlim _semaphore;
2447  private int _currentCount = 0;
2448 
2449  public ConnectionPoolEntry(string connectionString)
2450  {
2451  _connectionStringBuilder = new KineticaConnectionStringBuilder(connectionString);
2452  _semaphore = new SemaphoreSlim(_connectionStringBuilder.MaxPoolSize, _connectionStringBuilder.MaxPoolSize);
2453  }
2454 
2455  public async Task<Kinetica> GetConnectionAsync(CancellationToken cancellationToken)
2456  {
2457  await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
2458 
2459  try
2460  {
2461  // Try to get an existing connection
2462  while (_connections.TryDequeue(out var pooledConnection))
2463  {
2464  if (pooledConnection.IsValid && await ValidateConnectionAsync(pooledConnection.Connection, cancellationToken).ConfigureAwait(false))
2465  {
2466  return pooledConnection.Connection;
2467  }
2468  // Note: Kinetica doesn't implement IDisposable, just release the reference
2469  Interlocked.Decrement(ref _currentCount);
2470  }
2471 
2472  // Create a new connection with authentication
2473  Kinetica.Options? options = null;
2474 
2475  if (!string.IsNullOrEmpty(_connectionStringBuilder.Username) || !string.IsNullOrEmpty(_connectionStringBuilder.Password))
2476  {
2477  options = new Kinetica.Options
2478  {
2479  Username = _connectionStringBuilder.Username ?? string.Empty,
2480  Password = _connectionStringBuilder.Password ?? string.Empty
2481  };
2482  }
2483 
2484  if (!string.IsNullOrEmpty(_connectionStringBuilder.OAuthToken))
2485  {
2486  options ??= new Kinetica.Options();
2487  options.OauthToken = _connectionStringBuilder.OAuthToken;
2488  }
2489 
2490  var client = new Kinetica(_connectionStringBuilder.Server, options);
2491 
2492  // Test connection
2493  await client.ShowSystemStatusAsync(new Dictionary<string, string>(), cancellationToken).ConfigureAwait(false);
2494 
2495  Interlocked.Increment(ref _currentCount);
2496  return client;
2497  }
2498  catch
2499  {
2500  _semaphore.Release();
2501  throw;
2502  }
2503  }
2504 
2505  private async Task<bool> ValidateConnectionAsync(Kinetica connection, CancellationToken cancellationToken)
2506  {
2507  try
2508  {
2509  // Validate the connection is still alive by making a lightweight call
2510  await connection.ShowSystemStatusAsync(new Dictionary<string, string>(), cancellationToken).ConfigureAwait(false);
2511  return true;
2512  }
2513  catch
2514  {
2515  return false;
2516  }
2517  }
2518 
2519  public void ReturnConnection(Kinetica connection)
2520  {
2521  try
2522  {
2523  _connections.Enqueue(new PooledConnection(connection));
2524  }
2525  finally
2526  {
2527  _semaphore.Release();
2528  }
2529  }
2530 
2531  public void CleanupExpiredConnections()
2532  {
2533  var activeConnections = new List<PooledConnection>();
2534 
2535  while (_connections.TryDequeue(out var pooledConnection))
2536  {
2537  if (pooledConnection.IsValid)
2538  {
2539  activeConnections.Add(pooledConnection);
2540  }
2541  else
2542  {
2543  // Note: Kinetica doesn't implement IDisposable, just release the reference
2544  Interlocked.Decrement(ref _currentCount);
2545  }
2546  }
2547 
2548  foreach (var connection in activeConnections)
2549  {
2550  _connections.Enqueue(connection);
2551  }
2552  }
2553 
2554  private class PooledConnection
2555  {
2556  public Kinetica Connection { get; }
2557  public DateTime CreatedAt { get; }
2558 
2559  public PooledConnection(Kinetica connection)
2560  {
2561  Connection = connection;
2562  CreatedAt = DateTime.UtcNow;
2563  }
2564 
2565  public bool IsValid => DateTime.UtcNow - CreatedAt < TimeSpan.FromMinutes(30); // 30 minute timeout
2566  }
2567  }
2568  }
2569 
2570  // 6. SQL Parser for Better Command Handling
2578  public class SQLParser
2579  {
2580  // Statement type patterns
2581  private static readonly Regex _selectRegex = new Regex(@"^\s*SELECT\s", RegexOptions.IgnoreCase | RegexOptions.Compiled);
2582  private static readonly Regex _insertRegex = new Regex(@"^\s*INSERT\s", RegexOptions.IgnoreCase | RegexOptions.Compiled);
2583  private static readonly Regex _updateRegex = new Regex(@"^\s*UPDATE\s", RegexOptions.IgnoreCase | RegexOptions.Compiled);
2584  private static readonly Regex _deleteRegex = new Regex(@"^\s*DELETE\s", RegexOptions.IgnoreCase | RegexOptions.Compiled);
2585  private static readonly Regex _createTableRegex = new Regex(@"^\s*CREATE\s+TABLE\s", RegexOptions.IgnoreCase | RegexOptions.Compiled);
2586  private static readonly Regex _dropTableRegex = new Regex(@"^\s*DROP\s+TABLE\s", RegexOptions.IgnoreCase | RegexOptions.Compiled);
2587  private static readonly Regex _parameterRegex = new Regex(@"@(\w+)", RegexOptions.Compiled);
2588  // JDBC-style positional parameter placeholder
2589  private static readonly Regex _positionalParamRegex = new Regex(@"\?", RegexOptions.Compiled);
2590 
2591  // User impersonation patterns (JDBC-compatible)
2592  private static readonly Regex _setUserRegex = new Regex(
2593  @"^\s*SET\s+USER\s+['""]?(\w+)['""]?\s*$",
2594  RegexOptions.IgnoreCase | RegexOptions.Compiled);
2595  private static readonly Regex _executeAsUserRegex = new Regex(
2596  @"^\s*EXECUTE\s+AS\s+USER\s+['""]?(\w+)['""]?\s*$",
2597  RegexOptions.IgnoreCase | RegexOptions.Compiled);
2598  private static readonly Regex _revertRegex = new Regex(
2599  @"^\s*REVERT\s*$",
2600  RegexOptions.IgnoreCase | RegexOptions.Compiled);
2601 
2602  // SET SCHEMA / SET SQLID pattern
2603  private static readonly Regex _setSchemaRegex = new Regex(
2604  @"^\s*SET\s+(?:SCHEMA|SQLID)\s+['""]?([.\w]+)['""]?\s*$",
2605  RegexOptions.IgnoreCase | RegexOptions.Compiled);
2606 
2607  // Query hint patterns (KI_HINT_*)
2608  // Matches both /*+ KI_HINT_* */ and /* KI_HINT_* */ formats
2609  private static readonly Regex _hintPattern = new Regex(
2610  @"/\*\+?\s*(?:KI_HINT_\w+(?:\([^)]*\))?\s*)+\*/",
2611  RegexOptions.IgnoreCase | RegexOptions.Compiled);
2612  private static readonly Regex _batchSizeHint = new Regex(
2613  @"KI_HINT_BATCH_SIZE\s*\(\s*(\d+)\s*\)",
2614  RegexOptions.IgnoreCase | RegexOptions.Compiled);
2615  private static readonly Regex _truncateStringsHint = new Regex(
2616  @"KI_HINT_TRUNCATE_STRINGS",
2617  RegexOptions.IgnoreCase | RegexOptions.Compiled);
2618  private static readonly Regex _updateOnPkHint = new Regex(
2619  @"KI_HINT_UPDATE_ON_EXISTING_PK",
2620  RegexOptions.IgnoreCase | RegexOptions.Compiled);
2621  private static readonly Regex _disableMultiheadHint = new Regex(
2622  @"KI_HINT_DISABLE_MULTIHEAD",
2623  RegexOptions.IgnoreCase | RegexOptions.Compiled);
2624  private static readonly Regex _keyLookupHint = new Regex(
2625  @"KI_HINT_KEY_LOOKUP",
2626  RegexOptions.IgnoreCase | RegexOptions.Compiled);
2627  private static readonly Regex _replSyncHint = new Regex(
2628  @"KI_HINT_REPL_SYNC(?:_PARALLEL)?",
2629  RegexOptions.IgnoreCase | RegexOptions.Compiled);
2630  private static readonly Regex _ignoreExistingPkHint = new Regex(
2631  @"KI_HINT_IGNORE_EXISTING_PK",
2632  RegexOptions.IgnoreCase | RegexOptions.Compiled);
2633  private static readonly Regex _serverSideInsertHint = new Regex(
2634  @"KI_HINT_SERVER_SIDE_INSERT",
2635  RegexOptions.IgnoreCase | RegexOptions.Compiled);
2636  // PK conflict predicate hints
2637  private static readonly Regex _pkConflictPredicateLowerHint = new Regex(
2638  @"KI_HINT_PK_CONFLICT_PREDICATE_LOWER\s*\(\s*(\w+)\s*\)",
2639  RegexOptions.IgnoreCase | RegexOptions.Compiled);
2640  private static readonly Regex _pkConflictPredicateHigherHint = new Regex(
2641  @"KI_HINT_PK_CONFLICT_PREDICATE_HIGHER\s*\(\s*(\w+)\s*\)",
2642  RegexOptions.IgnoreCase | RegexOptions.Compiled);
2643 
2644  // INSERT INTO...SELECT FROM FILE pattern
2645  // Matches: INSERT INTO [schema.]table [(columns)] SELECT [*|(columns)] FROM FILE."path" [WITH OPTIONS (...)]
2646  // Also matches kifs:// paths: INSERT INTO [schema.]table [(columns)] SELECT [*|(columns)] FROM 'kifs://path' [WITH OPTIONS (...)]
2647  private static readonly Regex _insertFromFileRegex = new Regex(
2648  @"^\s*INSERT\s+INTO\s+(?<table>[\w\.]+)\s*(?:\(\s*(?<insertColumns>[\w\s,""]+)\s*\))?\s*SELECT\s+(?:(?<selectAll>\*)|(?<selectColumns>[\w\s,""]+))\s+FROM\s+(?:FILE\s*\.\s*""(?<filePath>[^""]+)""|'(?<kifsPath>kifs://[^']+)')(?:\s+WITH\s+OPTIONS\s*\(\s*(?<options>[^)]*)\s*\))?\s*$",
2649  RegexOptions.IgnoreCase | RegexOptions.Compiled);
2650 
2651  public ParsedCommand Parse(string sql, DbParameterCollection parameters)
2652  {
2653  var commandType = DetermineCommandType(sql);
2654  var hints = ExtractHints(sql);
2655 
2656  // Strip hints from SQL before parameter substitution
2657  var strippedSql = StripHints(sql);
2658  var finalSql = SubstituteParameters(strippedSql, parameters);
2659 
2660  // Handle user impersonation commands
2661  var userImpersonation = ExtractUserImpersonation(sql);
2662 
2663  // Parse INSERT FROM FILE details if applicable
2664  InsertFromFileInfo? insertFromFileInfo = null;
2665  if (commandType == ParsedCommandType.InsertFromFile)
2666  {
2667  insertFromFileInfo = ExtractInsertFromFileInfo(sql);
2668  }
2669 
2670  return new ParsedCommand
2671  {
2672  OriginalSql = sql,
2673  FinalSql = finalSql,
2674  CommandType = commandType,
2675  Parameters = parameters,
2676  Hints = hints,
2677  UserImpersonation = userImpersonation,
2678  InsertFromFile = insertFromFileInfo
2679  };
2680  }
2681 
2685  private string StripHints(string sql)
2686  {
2687  // Remove hint comments in format /*+ KI_HINT_... */
2688  return _hintPattern.Replace(sql, "").Trim();
2689  }
2690 
2691  private ParsedCommandType DetermineCommandType(string sql)
2692  {
2693  // Check session/user management commands first
2694  if (_setUserRegex.IsMatch(sql)) return ParsedCommandType.SetUser;
2695  if (_executeAsUserRegex.IsMatch(sql)) return ParsedCommandType.ExecuteAsUser;
2696  if (_revertRegex.IsMatch(sql)) return ParsedCommandType.Revert;
2697  if (_setSchemaRegex.IsMatch(sql)) return ParsedCommandType.SetSchema;
2698 
2699  // Standard SQL commands
2700  if (_selectRegex.IsMatch(sql)) return ParsedCommandType.Select;
2701  // Check INSERT FROM FILE before regular INSERT
2702  if (_insertFromFileRegex.IsMatch(sql)) return ParsedCommandType.InsertFromFile;
2703  if (_insertRegex.IsMatch(sql)) return ParsedCommandType.Insert;
2704  if (_updateRegex.IsMatch(sql)) return ParsedCommandType.Update;
2705  if (_deleteRegex.IsMatch(sql)) return ParsedCommandType.Delete;
2706  if (_createTableRegex.IsMatch(sql)) return ParsedCommandType.CreateTable;
2707  if (_dropTableRegex.IsMatch(sql)) return ParsedCommandType.DropTable;
2708  return ParsedCommandType.Other;
2709  }
2710 
2714  private QueryHints ExtractHints(string sql)
2715  {
2716  var hints = new QueryHints();
2717 
2718  // Check for batch size hint
2719  var batchMatch = _batchSizeHint.Match(sql);
2720  if (batchMatch.Success && int.TryParse(batchMatch.Groups[1].Value, out var batchSize))
2721  {
2722  hints.BatchSize = batchSize;
2723  }
2724 
2725  // Check for boolean hints
2726  hints.TruncateStrings = _truncateStringsHint.IsMatch(sql);
2727  hints.UpdateOnExistingPk = _updateOnPkHint.IsMatch(sql);
2728  hints.IgnoreExistingPk = _ignoreExistingPkHint.IsMatch(sql);
2729  hints.DisableMultihead = _disableMultiheadHint.IsMatch(sql);
2730  hints.UseKeyLookup = _keyLookupHint.IsMatch(sql);
2731  hints.ReplicationSync = _replSyncHint.IsMatch(sql);
2732  hints.ServerSideInsert = _serverSideInsertHint.IsMatch(sql);
2733 
2734  // Check for PK conflict predicate hints
2735  var lowerMatch = _pkConflictPredicateLowerHint.Match(sql);
2736  if (lowerMatch.Success)
2737  {
2738  hints.PkConflictPredicateLowerColumn = lowerMatch.Groups[1].Value;
2739  }
2740 
2741  var higherMatch = _pkConflictPredicateHigherHint.Match(sql);
2742  if (higherMatch.Success)
2743  {
2744  hints.PkConflictPredicateHigherColumn = higherMatch.Groups[1].Value;
2745  }
2746 
2747  return hints;
2748  }
2749 
2753  public string? ExtractSchemaName(string sql)
2754  {
2755  var match = _setSchemaRegex.Match(sql);
2756  return match.Success ? match.Groups[1].Value : null;
2757  }
2758 
2762  private UserImpersonationInfo? ExtractUserImpersonation(string sql)
2763  {
2764  var setUserMatch = _setUserRegex.Match(sql);
2765  if (setUserMatch.Success)
2766  {
2767  return new UserImpersonationInfo
2768  {
2769  Type = ImpersonationType.SetUser,
2770  Username = setUserMatch.Groups[1].Value
2771  };
2772  }
2773 
2774  var executeAsMatch = _executeAsUserRegex.Match(sql);
2775  if (executeAsMatch.Success)
2776  {
2777  return new UserImpersonationInfo
2778  {
2779  Type = ImpersonationType.ExecuteAs,
2780  Username = executeAsMatch.Groups[1].Value
2781  };
2782  }
2783 
2784  if (_revertRegex.IsMatch(sql))
2785  {
2786  return new UserImpersonationInfo
2787  {
2788  Type = ImpersonationType.Revert,
2789  Username = null
2790  };
2791  }
2792 
2793  return null;
2794  }
2795 
2799  private InsertFromFileInfo? ExtractInsertFromFileInfo(string sql)
2800  {
2801  var match = _insertFromFileRegex.Match(sql);
2802  if (!match.Success)
2803  return null;
2804 
2805  var info = new InsertFromFileInfo
2806  {
2807  TableName = match.Groups["table"].Value.Trim(),
2808  SelectAll = match.Groups["selectAll"].Success && match.Groups["selectAll"].Value == "*",
2809  FilePath = match.Groups["filePath"].Success ? match.Groups["filePath"].Value : null,
2810  KifsPath = match.Groups["kifsPath"].Success ? match.Groups["kifsPath"].Value : null
2811  };
2812 
2813  // Parse INSERT columns
2814  if (match.Groups["insertColumns"].Success && !string.IsNullOrEmpty(match.Groups["insertColumns"].Value))
2815  {
2816  info.InsertColumns = ParseColumnList(match.Groups["insertColumns"].Value);
2817  }
2818 
2819  // Parse SELECT columns
2820  if (!info.SelectAll && match.Groups["selectColumns"].Success && !string.IsNullOrEmpty(match.Groups["selectColumns"].Value))
2821  {
2822  info.SelectColumns = ParseColumnList(match.Groups["selectColumns"].Value);
2823  }
2824 
2825  // Parse WITH OPTIONS
2826  if (match.Groups["options"].Success && !string.IsNullOrEmpty(match.Groups["options"].Value))
2827  {
2828  ParseFileOptions(match.Groups["options"].Value, info.Options);
2829  }
2830 
2831  // Auto-detect format and delimiter based on file extension
2832  var path = info.ActualPath.ToLowerInvariant();
2833  if (info.Options.Format == FileFormat.Auto)
2834  {
2835  if (path.EndsWith(".parquet") || path.EndsWith(".pqt"))
2836  info.Options.Format = FileFormat.Parquet;
2837  else if (path.EndsWith(".json") || path.EndsWith(".jsonl"))
2838  info.Options.Format = FileFormat.Json;
2839  else if (path.EndsWith(".avro"))
2840  info.Options.Format = FileFormat.Avro;
2841  else if (path.EndsWith(".shp"))
2842  info.Options.Format = FileFormat.Shapefile;
2843  else
2844  info.Options.Format = FileFormat.DelimitedText;
2845  }
2846 
2847  // Auto-detect delimiter for delimited text
2848  if (info.Options.Format == FileFormat.DelimitedText)
2849  {
2850  if (path.EndsWith(".psv"))
2851  info.Options.Delimiter = '|';
2852  else if (path.EndsWith(".tsv"))
2853  info.Options.Delimiter = '\t';
2854  }
2855 
2856  return info;
2857  }
2858 
2862  private static List<string> ParseColumnList(string columns)
2863  {
2864  var result = new List<string>();
2865  var current = new System.Text.StringBuilder();
2866  bool inQuote = false;
2867 
2868  foreach (char c in columns)
2869  {
2870  if (c == '"')
2871  {
2872  inQuote = !inQuote;
2873  }
2874  else if (c == ',' && !inQuote)
2875  {
2876  var col = current.ToString().Trim().Trim('"');
2877  if (!string.IsNullOrEmpty(col))
2878  result.Add(col);
2879  current.Clear();
2880  }
2881  else
2882  {
2883  current.Append(c);
2884  }
2885  }
2886 
2887  // Add last column
2888  var lastCol = current.ToString().Trim().Trim('"');
2889  if (!string.IsNullOrEmpty(lastCol))
2890  result.Add(lastCol);
2891 
2892  return result;
2893  }
2894 
2898  private static void ParseFileOptions(string optionsStr, FileInsertOptions options)
2899  {
2900  // Pattern to match option = value pairs
2901  var optionPattern = new Regex(
2902  @"(\w+)\s*=\s*(?:'([^']*)'|""([^""]*)""|(\d+)|(\w+))",
2903  RegexOptions.IgnoreCase);
2904 
2905  var matches = optionPattern.Matches(optionsStr);
2906  foreach (Match m in matches)
2907  {
2908  var name = m.Groups[1].Value.ToUpperInvariant().Replace("_", "");
2909  var value = m.Groups[2].Success ? m.Groups[2].Value :
2910  m.Groups[3].Success ? m.Groups[3].Value :
2911  m.Groups[4].Success ? m.Groups[4].Value :
2912  m.Groups[5].Value;
2913 
2914  switch (name)
2915  {
2916  case "BATCHSIZE":
2917  if (int.TryParse(value, out var batchSize))
2918  options.BatchSize = batchSize;
2919  break;
2920  case "DELIMITER":
2921  if (!string.IsNullOrEmpty(value))
2922  options.Delimiter = value[0];
2923  break;
2924  case "QUOTE":
2925  if (!string.IsNullOrEmpty(value))
2926  options.QuoteChar = value[0];
2927  break;
2928  case "ESCAPE":
2929  if (!string.IsNullOrEmpty(value))
2930  options.EscapeChar = value[0];
2931  break;
2932  case "NULL":
2933  case "NULLSTRING":
2934  options.NullString = value;
2935  break;
2936  case "COMMENT":
2937  options.CommentPrefix = value;
2938  break;
2939  case "SKIP":
2940  if (int.TryParse(value, out var skip))
2941  options.Skip = skip;
2942  break;
2943  case "LIMIT":
2944  if (int.TryParse(value, out var limit))
2945  options.Limit = limit;
2946  break;
2947  case "HEADER":
2948  case "HASHEADER":
2949  options.HasHeader = value.Equals("TRUE", StringComparison.OrdinalIgnoreCase) ||
2950  value.Equals("YES", StringComparison.OrdinalIgnoreCase) ||
2951  value == "1";
2952  break;
2953  case "INITIALCLEAR":
2954  options.InitialClear = value.Equals("TRUE", StringComparison.OrdinalIgnoreCase) ||
2955  value.Equals("YES", StringComparison.OrdinalIgnoreCase) ||
2956  value == "1";
2957  break;
2958  case "ONERROR":
2959  case "ERRORMODE":
2960  if (Enum.TryParse<FileErrorMode>(value, true, out var errorMode))
2961  options.ErrorMode = errorMode;
2962  break;
2963  case "IGNOREEXISTINGPK":
2964  options.IgnoreExistingPk = value.Equals("TRUE", StringComparison.OrdinalIgnoreCase) ||
2965  value.Equals("YES", StringComparison.OrdinalIgnoreCase) ||
2966  value == "1";
2967  break;
2968  case "UPDATEONEXISTINGPK":
2969  options.UpdateOnExistingPk = value.Equals("TRUE", StringComparison.OrdinalIgnoreCase) ||
2970  value.Equals("YES", StringComparison.OrdinalIgnoreCase) ||
2971  value == "1";
2972  break;
2973  case "TRUNCATESTRINGS":
2974  options.TruncateStrings = value.Equals("TRUE", StringComparison.OrdinalIgnoreCase) ||
2975  value.Equals("YES", StringComparison.OrdinalIgnoreCase) ||
2976  value == "1";
2977  break;
2978  case "DRYRUN":
2979  options.DryRun = value.Equals("TRUE", StringComparison.OrdinalIgnoreCase) ||
2980  value.Equals("YES", StringComparison.OrdinalIgnoreCase) ||
2981  value == "1";
2982  break;
2983  case "FORMAT":
2984  case "FILETYPE":
2985  if (Enum.TryParse<FileFormat>(value, true, out var format))
2986  options.Format = format;
2987  else if (value.Equals("DELIMITED_TEXT", StringComparison.OrdinalIgnoreCase) ||
2988  value.Equals("CSV", StringComparison.OrdinalIgnoreCase) ||
2989  value.Equals("TSV", StringComparison.OrdinalIgnoreCase) ||
2990  value.Equals("PSV", StringComparison.OrdinalIgnoreCase))
2991  options.Format = FileFormat.DelimitedText;
2992  break;
2993  }
2994  }
2995  }
2996 
2997  private string SubstituteParameters(string sql, DbParameterCollection parameters)
2998  {
2999  // First, substitute named parameters (@param style)
3000  var result = _parameterRegex.Replace(sql, match =>
3001  {
3002  var paramName = match.Groups[1].Value;
3003  var param = parameters.Cast<DbParameter>().FirstOrDefault(p =>
3004  p.ParameterName.Equals($"@{paramName}", StringComparison.OrdinalIgnoreCase) ||
3005  p.ParameterName.Equals(paramName, StringComparison.OrdinalIgnoreCase));
3006 
3007  return FormatParameterValue(param);
3008  });
3009 
3010  // Then, substitute positional parameters (? style, JDBC-compatible)
3011  if (_positionalParamRegex.IsMatch(result))
3012  {
3013  var positionalParams = parameters.Cast<DbParameter>()
3014  .Where(p => string.IsNullOrEmpty(p.ParameterName) || p.ParameterName.StartsWith("?"))
3015  .ToList();
3016 
3017  // Also include numbered parameters like @0, @1, etc. that weren't already substituted
3018  var numberedParams = parameters.Cast<DbParameter>()
3019  .Where(p => int.TryParse(p.ParameterName?.TrimStart('@'), out _))
3020  .OrderBy(p => int.Parse(p.ParameterName!.TrimStart('@')))
3021  .ToList();
3022 
3023  var allPositional = positionalParams.Count > 0 ? positionalParams : numberedParams;
3024 
3025  int paramIndex = 0;
3026  result = _positionalParamRegex.Replace(result, match =>
3027  {
3028  if (paramIndex < allPositional.Count)
3029  {
3030  return FormatParameterValue(allPositional[paramIndex++]);
3031  }
3032  // If no more parameters, leave the ? as is (will cause SQL error)
3033  return "?";
3034  });
3035  }
3036 
3037  return result;
3038  }
3039 
3040  private string FormatParameterValue(DbParameter? param)
3041  {
3042  if (param?.Value == null || param.Value == DBNull.Value)
3043  return "NULL";
3044 
3045  // Handle based on DbType for proper SQL formatting
3046  return param.DbType switch
3047  {
3048  // String types - escape single quotes
3049  DbType.String or DbType.AnsiString or DbType.StringFixedLength or DbType.AnsiStringFixedLength
3050  => $"'{EscapeString(param.Value.ToString())}'",
3051 
3052  // Date/Time types - format appropriately for Kinetica
3053  DbType.DateTime or DbType.DateTime2 => FormatDateTime(param.Value),
3054  DbType.Date => FormatDate(param.Value),
3055  DbType.Time => FormatTime(param.Value),
3056  DbType.DateTimeOffset => FormatDateTimeOffset(param.Value),
3057 
3058  // Boolean
3059  DbType.Boolean => FormatBoolean(param.Value),
3060 
3061  // GUID/UUID
3062  DbType.Guid => $"'{FormatGuid(param.Value)}'",
3063 
3064  // Binary data
3065  DbType.Binary => FormatBinary(param.Value),
3066 
3067  // Numeric types - use invariant culture to ensure correct decimal separator
3068  DbType.Decimal or DbType.Currency or DbType.VarNumeric
3069  => Convert.ToDecimal(param.Value).ToString(System.Globalization.CultureInfo.InvariantCulture),
3070  DbType.Double => Convert.ToDouble(param.Value).ToString(System.Globalization.CultureInfo.InvariantCulture),
3071  DbType.Single => Convert.ToSingle(param.Value).ToString(System.Globalization.CultureInfo.InvariantCulture),
3072 
3073  // Default: try to infer type from value
3074  _ => FormatValueByType(param.Value)
3075  };
3076  }
3077 
3078  private static string FormatValueByType(object value)
3079  {
3080  return value switch
3081  {
3082  string s => $"'{EscapeString(s)}'",
3083  DateTime dt => $"'{dt:yyyy-MM-dd HH:mm:ss.fff}'",
3084  DateTimeOffset dto => $"'{dto:yyyy-MM-dd HH:mm:ss.fff}'",
3085  DateOnly d => $"'{d:yyyy-MM-dd}'",
3086  TimeOnly t => $"'{t:HH:mm:ss.fff}'",
3087  TimeSpan ts => $"'{ts:hh\\:mm\\:ss\\.fff}'",
3088  bool b => b ? "TRUE" : "FALSE",
3089  Guid g => $"'{g}'",
3090  byte[] bytes => "0x" + BitConverter.ToString(bytes).Replace("-", ""),
3091  decimal dec => dec.ToString(System.Globalization.CultureInfo.InvariantCulture),
3092  double dbl => dbl.ToString(System.Globalization.CultureInfo.InvariantCulture),
3093  float flt => flt.ToString(System.Globalization.CultureInfo.InvariantCulture),
3094  _ => value.ToString() ?? "NULL"
3095  };
3096  }
3097 
3098  private static string EscapeString(string? value)
3099  {
3100  if (value == null) return "";
3101  return value.Replace("'", "''");
3102  }
3103 
3104  private static string FormatDateTime(object value)
3105  {
3106  if (value is DateTime dt)
3107  return $"'{dt:yyyy-MM-dd HH:mm:ss.fff}'";
3108  if (value is DateTimeOffset dto)
3109  return $"'{dto:yyyy-MM-dd HH:mm:ss.fff}'";
3110  return $"'{value}'";
3111  }
3112 
3113  private static string FormatDate(object value)
3114  {
3115  if (value is DateTime dt)
3116  return $"'{dt:yyyy-MM-dd}'";
3117  if (value is DateTimeOffset dto)
3118  return $"'{dto:yyyy-MM-dd}'";
3119  if (value is DateOnly d)
3120  return $"'{d:yyyy-MM-dd}'";
3121  return $"'{value}'";
3122  }
3123 
3124  private static string FormatTime(object value)
3125  {
3126  if (value is DateTime dt)
3127  return $"'{dt:HH:mm:ss.fff}'";
3128  if (value is TimeSpan ts)
3129  return $"'{ts:hh\\:mm\\:ss\\.fff}'";
3130  if (value is TimeOnly t)
3131  return $"'{t:HH:mm:ss.fff}'";
3132  return $"'{value}'";
3133  }
3134 
3135  private static string FormatDateTimeOffset(object value)
3136  {
3137  if (value is DateTimeOffset dto)
3138  return $"'{dto:yyyy-MM-dd HH:mm:ss.fffzzz}'";
3139  return FormatDateTime(value);
3140  }
3141 
3142  private static string FormatBoolean(object value)
3143  {
3144  if (value is bool b)
3145  return b ? "TRUE" : "FALSE";
3146  // Handle numeric boolean representations
3147  if (value is int i)
3148  return i != 0 ? "TRUE" : "FALSE";
3149  if (value is string s)
3150  return s.Equals("true", StringComparison.OrdinalIgnoreCase) || s == "1" ? "TRUE" : "FALSE";
3151  return Convert.ToBoolean(value) ? "TRUE" : "FALSE";
3152  }
3153 
3154  private static string FormatGuid(object value)
3155  {
3156  if (value is Guid g)
3157  return g.ToString();
3158  return value.ToString() ?? "";
3159  }
3160 
3161  private static string FormatBinary(object value)
3162  {
3163  if (value is byte[] bytes)
3164  {
3165  // Format as hex string with 0x prefix for Kinetica
3166  return "0x" + BitConverter.ToString(bytes).Replace("-", "");
3167  }
3168  return "NULL";
3169  }
3170  }
3171 
3175  public class ParsedCommand
3176  {
3177  public string OriginalSql { get; set; } = string.Empty;
3178  public string FinalSql { get; set; } = string.Empty;
3179  public ParsedCommandType CommandType { get; set; }
3180  public DbParameterCollection? Parameters { get; set; }
3181  public QueryHints Hints { get; set; } = new();
3183  public InsertFromFileInfo? InsertFromFile { get; set; }
3184  }
3185 
3189  public enum ParsedCommandType
3190  {
3191  Select,
3192  Insert,
3194  Update,
3195  Delete,
3196  CreateTable,
3197  DropTable,
3198  SetUser,
3199  ExecuteAsUser,
3200  Revert,
3201  SetSchema,
3202  Other
3203  }
3204 
3209  public class QueryHints
3210  {
3214  public int? BatchSize { get; set; }
3215 
3219  public bool TruncateStrings { get; set; }
3220 
3224  public bool UpdateOnExistingPk { get; set; }
3225 
3229  public bool IgnoreExistingPk { get; set; }
3230 
3234  public bool DisableMultihead { get; set; }
3235 
3239  public bool UseKeyLookup { get; set; }
3240 
3244  public bool ReplicationSync { get; set; }
3245 
3249  public bool ServerSideInsert { get; set; }
3250 
3255  public string? PkConflictPredicateLowerColumn { get; set; }
3256 
3261  public string? PkConflictPredicateHigherColumn { get; set; }
3262 
3266  public IDictionary<string, string> ToInsertOptions()
3267  {
3268  var options = new Dictionary<string, string>();
3269 
3270  if (TruncateStrings)
3271  options["truncate_strings"] = "true";
3272  if (UpdateOnExistingPk)
3273  options["update_on_existing_pk"] = "true";
3274  if (IgnoreExistingPk)
3275  options["ignore_existing_pk"] = "true";
3276  if (ReplicationSync)
3277  options["replication_mode"] = "sync";
3278  if (!string.IsNullOrEmpty(PkConflictPredicateLowerColumn))
3279  options["pk_conflict_predicate_lower"] = PkConflictPredicateLowerColumn;
3280  if (!string.IsNullOrEmpty(PkConflictPredicateHigherColumn))
3281  options["pk_conflict_predicate_higher"] = PkConflictPredicateHigherColumn;
3282 
3283  return options;
3284  }
3285 
3289  public IDictionary<string, string> ToQueryOptions()
3290  {
3291  var options = new Dictionary<string, string>();
3292 
3293  if (UseKeyLookup)
3294  options["key_lookup"] = "true";
3295 
3296  return options;
3297  }
3298  }
3299 
3304  {
3305  public ImpersonationType Type { get; set; }
3306  public string? Username { get; set; }
3307  }
3308 
3312  public enum ImpersonationType
3313  {
3315  SetUser,
3317  ExecuteAs,
3319  Revert
3320  }
3321 
3325  public class InsertFromFileInfo
3326  {
3328  public string TableName { get; set; } = string.Empty;
3329 
3331  public List<string> InsertColumns { get; set; } = new();
3332 
3334  public List<string> SelectColumns { get; set; } = new();
3335 
3337  public bool SelectAll { get; set; }
3338 
3340  public string? FilePath { get; set; }
3341 
3343  public string? KifsPath { get; set; }
3344 
3346  public bool IsKifsPath => !string.IsNullOrEmpty(KifsPath);
3347 
3349  public string ActualPath => IsKifsPath ? KifsPath! : FilePath ?? string.Empty;
3350 
3352  public FileInsertOptions Options { get; set; } = new();
3353  }
3354 
3359  public class FileInsertOptions
3360  {
3362  public FileFormat Format { get; set; } = FileFormat.Auto;
3363 
3365  public int BatchSize { get; set; } = 10000;
3366 
3368  public char Delimiter { get; set; } = ',';
3369 
3371  public char QuoteChar { get; set; } = '"';
3372 
3374  public char EscapeChar { get; set; } = '\0';
3375 
3377  public string NullString { get; set; } = "\\N";
3378 
3380  public string? CommentPrefix { get; set; }
3381 
3383  public int Skip { get; set; } = 0;
3384 
3386  public int Limit { get; set; } = 0;
3387 
3389  public bool HasHeader { get; set; } = true;
3390 
3392  public bool InitialClear { get; set; } = false;
3393 
3395  public FileErrorMode ErrorMode { get; set; } = FileErrorMode.Abort;
3396 
3398  public bool IgnoreExistingPk { get; set; } = false;
3399 
3401  public bool UpdateOnExistingPk { get; set; } = false;
3402 
3404  public bool TruncateStrings { get; set; } = false;
3405 
3407  public bool DryRun { get; set; } = false;
3408  }
3409 
3413  public enum FileFormat
3414  {
3416  Auto,
3418  DelimitedText,
3420  Parquet,
3422  Json,
3424  Avro,
3426  Shapefile
3427  }
3428 
3432  public enum FileErrorMode
3433  {
3435  Abort,
3437  Skip,
3439  Permissive
3440  }
3441 
3442  // 7. Schema Provider Implementation
3444  {
3445  private readonly KineticaConnection _connection;
3446 
3448  {
3449  _connection = connection;
3450  }
3451 
3452  public DataTable GetSchema(string collectionName, string?[]? restrictionValues)
3453  {
3454  return collectionName.ToUpper() switch
3455  {
3456  "METADATACOLLECTIONS" => GetMetaDataCollections(),
3457  "TABLES" => GetTables(restrictionValues),
3458  "COLUMNS" => GetColumns(restrictionValues),
3459  "VIEWS" => GetViews(restrictionValues),
3460  "INDEXES" => GetIndexes(restrictionValues),
3461  "PROCEDURES" => GetProcedures(restrictionValues),
3462  "USERS" => GetUsers(restrictionValues),
3463  "ROLES" => GetRoles(restrictionValues),
3464  "DATATYPES" => GetDataTypes(),
3465  _ => throw new ArgumentException($"Unsupported schema collection: {collectionName}")
3466  };
3467  }
3468 
3469  private DataTable GetMetaDataCollections()
3470  {
3471  var table = new DataTable("MetaDataCollections");
3472  table.Columns.Add("CollectionName", typeof(string));
3473  table.Columns.Add("NumberOfRestrictions", typeof(int));
3474  table.Columns.Add("NumberOfIdentifierParts", typeof(int));
3475 
3476  table.Rows.Add("MetaDataCollections", 0, 0);
3477  table.Rows.Add("Tables", 4, 3);
3478  table.Rows.Add("Columns", 4, 4);
3479  table.Rows.Add("Views", 3, 3);
3480  table.Rows.Add("Indexes", 4, 3);
3481  table.Rows.Add("Procedures", 4, 3);
3482  table.Rows.Add("Users", 1, 1);
3483  table.Rows.Add("Roles", 1, 1);
3484  table.Rows.Add("DataTypes", 0, 0);
3485 
3486  return table;
3487  }
3488 
3489  private DataTable GetTables(string?[]? restrictionValues)
3490  {
3491  var table = new DataTable("Tables");
3492  table.Columns.Add("TABLE_CATALOG", typeof(string));
3493  table.Columns.Add("TABLE_SCHEMA", typeof(string));
3494  table.Columns.Add("TABLE_NAME", typeof(string));
3495  table.Columns.Add("TABLE_TYPE", typeof(string));
3496 
3497  try
3498  {
3499  var client = _connection.GetKineticaClient();
3500  // showTable requires a table_name parameter - use empty string or "*" to get all tables
3501  var response = client.showTable("", new Dictionary<string, string>
3502  {
3503  { "show_children", "true" }
3504  });
3505 
3506  foreach (var tableName in response.table_names ?? new List<string>())
3507  {
3508  // Apply restrictions if provided
3509  if (restrictionValues != null && restrictionValues.Length > 2 &&
3510  !string.IsNullOrEmpty(restrictionValues[2]) &&
3511  !tableName.Equals(restrictionValues[2], StringComparison.OrdinalIgnoreCase))
3512  continue;
3513 
3514  table.Rows.Add(null, "public", tableName, "BASE TABLE");
3515  }
3516  }
3517  catch (Exception ex)
3518  {
3519  throw new KineticaException($"Failed to retrieve table schema: {ex.Message}", ex);
3520  }
3521 
3522  return table;
3523  }
3524 
3525  private DataTable GetColumns(string?[]? restrictionValues)
3526  {
3527  var table = new DataTable("Columns");
3528  table.Columns.Add("TABLE_CATALOG", typeof(string));
3529  table.Columns.Add("TABLE_SCHEMA", typeof(string));
3530  table.Columns.Add("TABLE_NAME", typeof(string));
3531  table.Columns.Add("COLUMN_NAME", typeof(string));
3532  table.Columns.Add("ORDINAL_POSITION", typeof(int));
3533  table.Columns.Add("COLUMN_DEFAULT", typeof(string));
3534  table.Columns.Add("IS_NULLABLE", typeof(string));
3535  table.Columns.Add("DATA_TYPE", typeof(string));
3536  table.Columns.Add("CHARACTER_MAXIMUM_LENGTH", typeof(int));
3537  table.Columns.Add("NUMERIC_PRECISION", typeof(int));
3538  table.Columns.Add("NUMERIC_SCALE", typeof(int));
3539 
3540  try
3541  {
3542  var client = _connection.GetKineticaClient();
3543  var tablesResponse = client.showTable("", new Dictionary<string, string>
3544  {
3545  { "show_children", "true" }
3546  });
3547 
3548  foreach (var tableName in tablesResponse.table_names ?? new List<string>())
3549  {
3550  // Apply table name restriction
3551  if (restrictionValues != null && restrictionValues.Length > 2 &&
3552  !string.IsNullOrEmpty(restrictionValues[2]) &&
3553  !tableName.Equals(restrictionValues[2], StringComparison.OrdinalIgnoreCase))
3554  continue;
3555 
3556  try
3557  {
3558  var tableInfo = client.showTable(tableName);
3559  var typeSchemas = tableInfo.type_schemas;
3560  var properties = tableInfo.properties;
3561 
3562  if (typeSchemas != null && typeSchemas.Count > 0)
3563  {
3564  // type_schemas is a list of Avro schema JSON strings
3565  var schemaJson = typeSchemas[0];
3566 
3567  // Parse the Avro schema to extract field information
3568  var avroSchema = Avro.Schema.Parse(schemaJson) as Avro.RecordSchema;
3569  if (avroSchema != null)
3570  {
3571  // Get column properties if available (unused for now but available for future use)
3572  IDictionary<string, IList<string>>? columnProperties = null;
3573  if (properties != null && properties.Count > 0)
3574  {
3575  columnProperties = properties[0];
3576  }
3577  _ = columnProperties; // Suppress unused variable warning
3578 
3579  int ordinal = 1;
3580  foreach (var field in avroSchema.Fields)
3581  {
3582  var columnName = field.Name;
3583  var dataType = GetKineticaTypeFromAvro(field.Schema);
3584 
3585  // Apply column name restriction
3586  if (restrictionValues != null && restrictionValues.Length > 3 &&
3587  !string.IsNullOrEmpty(restrictionValues[3]) &&
3588  !columnName.Equals(restrictionValues[3], StringComparison.OrdinalIgnoreCase))
3589  {
3590  ordinal++;
3591  continue;
3592  }
3593 
3594  var isNullable = IsNullableField(field.Schema) ? "YES" : "NO";
3595  var maxLength = GetMaxLength(dataType);
3596  var (precision, scale) = GetPrecisionAndScale(dataType);
3597 
3598  table.Rows.Add(
3599  null, // TABLE_CATALOG
3600  "public", // TABLE_SCHEMA
3601  tableName, // TABLE_NAME
3602  columnName, // COLUMN_NAME
3603  ordinal, // ORDINAL_POSITION
3604  null, // COLUMN_DEFAULT
3605  isNullable, // IS_NULLABLE
3606  MapKineticaTypeToSqlType(dataType), // DATA_TYPE
3607  maxLength, // CHARACTER_MAXIMUM_LENGTH
3608  precision, // NUMERIC_PRECISION
3609  scale // NUMERIC_SCALE
3610  );
3611  ordinal++;
3612  }
3613  }
3614  }
3615  }
3616  catch (Exception ex)
3617  {
3618  // Skip tables that can't be described
3619  System.Diagnostics.Debug.WriteLine($"Failed to get schema for table {tableName}: {ex.Message}");
3620  }
3621  }
3622  }
3623  catch (Exception ex)
3624  {
3625  throw new KineticaException($"Failed to retrieve column schema: {ex.Message}", ex);
3626  }
3627 
3628  return table;
3629  }
3630 
3631  private DataTable GetViews(string?[]? restrictionValues)
3632  {
3633  var table = new DataTable("Views");
3634  table.Columns.Add("TABLE_CATALOG", typeof(string));
3635  table.Columns.Add("TABLE_SCHEMA", typeof(string));
3636  table.Columns.Add("TABLE_NAME", typeof(string));
3637  table.Columns.Add("VIEW_DEFINITION", typeof(string));
3638  table.Columns.Add("VIEW_TYPE", typeof(string));
3639  table.Columns.Add("IS_UPDATABLE", typeof(string));
3640 
3641  try
3642  {
3643  var client = _connection.GetKineticaClient();
3644  // Get all tables/views with show_children=true to retrieve views
3645  var response = client.showTable("", new Dictionary<string, string>
3646  {
3647  { "show_children", "true" }
3648  });
3649 
3650  for (int i = 0; i < response.table_names.Count; i++)
3651  {
3652  var viewName = response.table_names[i];
3653  var descriptions = response.table_descriptions[i];
3654 
3655  // Check if this is a view type (LOGICAL_VIEW, MATERIALIZED_VIEW, VIEW)
3656  bool isView = descriptions.Any(d =>
3657  d == "LOGICAL_VIEW" ||
3658  d == "MATERIALIZED_VIEW" ||
3659  d == "VIEW" ||
3660  d == "MATERIALIZED_VIEW_MEMBER");
3661 
3662  if (!isView)
3663  continue;
3664 
3665  // Apply restrictions if provided
3666  if (restrictionValues != null && restrictionValues.Length > 2 &&
3667  !string.IsNullOrEmpty(restrictionValues[2]) &&
3668  !viewName.Equals(restrictionValues[2], StringComparison.OrdinalIgnoreCase))
3669  continue;
3670 
3671  // Determine schema name from additional_info if available
3672  string schemaName = "public";
3673  string viewDefinition = "";
3674  if (response.additional_info != null && i < response.additional_info.Count)
3675  {
3676  var info = response.additional_info[i];
3677  if (info.TryGetValue("schema_name", out var schema))
3678  schemaName = schema;
3679  if (info.TryGetValue("request_avro_json", out var definition))
3680  viewDefinition = definition;
3681  }
3682 
3683  // Determine view type
3684  string viewType = descriptions.Contains("MATERIALIZED_VIEW") ? "MATERIALIZED" : "LOGICAL";
3685 
3686  table.Rows.Add(
3687  null, // TABLE_CATALOG
3688  schemaName, // TABLE_SCHEMA
3689  viewName, // TABLE_NAME
3690  viewDefinition, // VIEW_DEFINITION
3691  viewType, // VIEW_TYPE
3692  "NO" // IS_UPDATABLE (views are generally not directly updatable)
3693  );
3694  }
3695  }
3696  catch (Exception ex)
3697  {
3698  throw new KineticaException($"Failed to retrieve views: {ex.Message}", ex);
3699  }
3700 
3701  return table;
3702  }
3703 
3704  private DataTable GetIndexes(string?[]? restrictionValues)
3705  {
3706  var table = new DataTable("Indexes");
3707  table.Columns.Add("TABLE_CATALOG", typeof(string));
3708  table.Columns.Add("TABLE_SCHEMA", typeof(string));
3709  table.Columns.Add("TABLE_NAME", typeof(string));
3710  table.Columns.Add("INDEX_NAME", typeof(string));
3711  table.Columns.Add("INDEX_TYPE", typeof(string));
3712  table.Columns.Add("COLUMN_NAME", typeof(string));
3713  table.Columns.Add("ORDINAL_POSITION", typeof(int));
3714  table.Columns.Add("IS_UNIQUE", typeof(bool));
3715 
3716  try
3717  {
3718  var client = _connection.GetKineticaClient();
3719  var tablesResponse = client.showTable("", new Dictionary<string, string>
3720  {
3721  { "show_children", "true" }
3722  });
3723 
3724  for (int i = 0; i < tablesResponse.table_names.Count; i++)
3725  {
3726  var tableName = tablesResponse.table_names[i];
3727  var descriptions = tablesResponse.table_descriptions[i];
3728 
3729  // Skip schemas and views
3730  if (descriptions.Contains("SCHEMA") ||
3731  descriptions.Contains("LOGICAL_VIEW") ||
3732  descriptions.Contains("MATERIALIZED_VIEW"))
3733  continue;
3734 
3735  // Apply table name restriction
3736  if (restrictionValues != null && restrictionValues.Length > 2 &&
3737  !string.IsNullOrEmpty(restrictionValues[2]) &&
3738  !tableName.Equals(restrictionValues[2], StringComparison.OrdinalIgnoreCase))
3739  continue;
3740 
3741  // Get index information from additional_info
3742  string schemaName = "public";
3743  string attributeIndexes = "";
3744 
3745  if (tablesResponse.additional_info != null && i < tablesResponse.additional_info.Count)
3746  {
3747  var info = tablesResponse.additional_info[i];
3748  if (info.TryGetValue("schema_name", out var schema))
3749  schemaName = schema;
3750  if (info.TryGetValue("attribute_indexes", out var indexes))
3751  attributeIndexes = indexes;
3752  }
3753 
3754  // Parse attribute_indexes (semicolon-separated list)
3755  // Format: column_name or index_type@column_list@column_options
3756  if (!string.IsNullOrEmpty(attributeIndexes))
3757  {
3758  var indexList = attributeIndexes.Split(';', StringSplitOptions.RemoveEmptyEntries);
3759  int ordinal = 1;
3760 
3761  foreach (var indexEntry in indexList)
3762  {
3763  string indexName;
3764  string indexType;
3765  string columnName;
3766 
3767  if (indexEntry.Contains('@'))
3768  {
3769  // Complex index format: index_type@column_list@options
3770  var parts = indexEntry.Split('@');
3771  indexType = parts[0];
3772  columnName = parts.Length > 1 ? parts[1] : indexEntry;
3773  indexName = $"{indexType}_{columnName.Replace(",", "_")}";
3774  }
3775  else
3776  {
3777  // Simple column index
3778  indexType = "COLUMN";
3779  columnName = indexEntry.Trim();
3780  indexName = $"IX_{tableName}_{columnName}";
3781  }
3782 
3783  // Apply index name restriction
3784  if (restrictionValues != null && restrictionValues.Length > 3 &&
3785  !string.IsNullOrEmpty(restrictionValues[3]) &&
3786  !indexName.Equals(restrictionValues[3], StringComparison.OrdinalIgnoreCase))
3787  continue;
3788 
3789  // Handle multiple columns in index
3790  var columns = columnName.Split(',', StringSplitOptions.RemoveEmptyEntries);
3791  int colOrdinal = 1;
3792  foreach (var col in columns)
3793  {
3794  table.Rows.Add(
3795  null, // TABLE_CATALOG
3796  schemaName, // TABLE_SCHEMA
3797  tableName, // TABLE_NAME
3798  indexName, // INDEX_NAME
3799  indexType, // INDEX_TYPE
3800  col.Trim(), // COLUMN_NAME
3801  colOrdinal++, // ORDINAL_POSITION
3802  false // IS_UNIQUE (assume non-unique by default)
3803  );
3804  }
3805 
3806  ordinal++;
3807  }
3808  }
3809  }
3810  }
3811  catch (Exception ex)
3812  {
3813  throw new KineticaException($"Failed to retrieve indexes: {ex.Message}", ex);
3814  }
3815 
3816  return table;
3817  }
3818 
3819  private DataTable GetProcedures(string?[]? restrictionValues)
3820  {
3821  var table = new DataTable("Procedures");
3822  table.Columns.Add("PROCEDURE_CATALOG", typeof(string));
3823  table.Columns.Add("PROCEDURE_SCHEMA", typeof(string));
3824  table.Columns.Add("PROCEDURE_NAME", typeof(string));
3825  table.Columns.Add("PROCEDURE_TYPE", typeof(string));
3826  table.Columns.Add("EXECUTION_MODE", typeof(string));
3827  table.Columns.Add("COMMAND", typeof(string));
3828 
3829  try
3830  {
3831  var client = _connection.GetKineticaClient();
3832 
3833  // Get proc name filter from restrictions
3834  string procNameFilter = "";
3835  if (restrictionValues != null && restrictionValues.Length > 2 &&
3836  !string.IsNullOrEmpty(restrictionValues[2]))
3837  {
3838  procNameFilter = restrictionValues[2] ?? "";
3839  }
3840 
3841  var response = client.showProc(procNameFilter);
3842 
3843  for (int i = 0; i < response.proc_names.Count; i++)
3844  {
3845  var procName = response.proc_names[i];
3846  var executionMode = i < response.execution_modes.Count ? response.execution_modes[i] : "unknown";
3847  var command = i < response.commands.Count ? response.commands[i] : "";
3848 
3849  table.Rows.Add(
3850  null, // PROCEDURE_CATALOG
3851  "public", // PROCEDURE_SCHEMA
3852  procName, // PROCEDURE_NAME
3853  "PROCEDURE", // PROCEDURE_TYPE
3854  executionMode, // EXECUTION_MODE
3855  command // COMMAND
3856  );
3857  }
3858  }
3859  catch (Exception ex)
3860  {
3861  // If showProc fails (e.g., no permissions), return empty table
3862  System.Diagnostics.Debug.WriteLine($"Failed to retrieve procedures: {ex.Message}");
3863  }
3864 
3865  return table;
3866  }
3867 
3868  private DataTable GetUsers(string?[]? restrictionValues)
3869  {
3870  var table = new DataTable("Users");
3871  table.Columns.Add("USER_NAME", typeof(string));
3872  table.Columns.Add("USER_TYPE", typeof(string));
3873  table.Columns.Add("RESOURCE_GROUP", typeof(string));
3874 
3875  try
3876  {
3877  var client = _connection.GetKineticaClient();
3878 
3879  // Get user name filter from restrictions
3880  var names = new List<string>();
3881  if (restrictionValues != null && restrictionValues.Length > 0 &&
3882  !string.IsNullOrEmpty(restrictionValues[0]))
3883  {
3884  names.Add(restrictionValues[0]!);
3885  }
3886 
3887  var response = client.showSecurity(names);
3888 
3889  foreach (var kvp in response.types)
3890  {
3891  var userName = kvp.Key;
3892  var userType = kvp.Value;
3893 
3894  // Only include users (not roles)
3895  if (userType == "role")
3896  continue;
3897 
3898  // Get resource group if available
3899  string resourceGroup = "";
3900  if (response.resource_groups.TryGetValue(userName, out var rg))
3901  resourceGroup = rg;
3902 
3903  table.Rows.Add(
3904  userName, // USER_NAME
3905  userType, // USER_TYPE (internal_user or external_user)
3906  resourceGroup // RESOURCE_GROUP
3907  );
3908  }
3909  }
3910  catch (Exception ex)
3911  {
3912  // If showSecurity fails (e.g., no permissions), return empty table
3913  System.Diagnostics.Debug.WriteLine($"Failed to retrieve users: {ex.Message}");
3914  }
3915 
3916  return table;
3917  }
3918 
3919  private DataTable GetRoles(string?[]? restrictionValues)
3920  {
3921  var table = new DataTable("Roles");
3922  table.Columns.Add("ROLE_NAME", typeof(string));
3923  table.Columns.Add("MEMBER_ROLES", typeof(string));
3924 
3925  try
3926  {
3927  var client = _connection.GetKineticaClient();
3928 
3929  // Get role name filter from restrictions
3930  var names = new List<string>();
3931  if (restrictionValues != null && restrictionValues.Length > 0 &&
3932  !string.IsNullOrEmpty(restrictionValues[0]))
3933  {
3934  names.Add(restrictionValues[0]!);
3935  }
3936 
3937  var response = client.showSecurity(names);
3938 
3939  foreach (var kvp in response.types)
3940  {
3941  var name = kvp.Key;
3942  var type = kvp.Value;
3943 
3944  // Only include roles
3945  if (type != "role")
3946  continue;
3947 
3948  // Get member roles if available
3949  string memberRoles = "";
3950  if (response.roles.TryGetValue(name, out var roles) && roles != null)
3951  memberRoles = string.Join(", ", roles);
3952 
3953  table.Rows.Add(
3954  name, // ROLE_NAME
3955  memberRoles // MEMBER_ROLES
3956  );
3957  }
3958  }
3959  catch (Exception ex)
3960  {
3961  // If showSecurity fails (e.g., no permissions), return empty table
3962  System.Diagnostics.Debug.WriteLine($"Failed to retrieve roles: {ex.Message}");
3963  }
3964 
3965  return table;
3966  }
3967 
3968  private DataTable GetDataTypes()
3969  {
3970  var table = new DataTable("DataTypes");
3971  table.Columns.Add("TYPE_NAME", typeof(string));
3972  table.Columns.Add("PROVIDER_TYPE", typeof(int));
3973  table.Columns.Add("COLUMN_SIZE", typeof(int));
3974  table.Columns.Add("LITERAL_PREFIX", typeof(string));
3975  table.Columns.Add("LITERAL_SUFFIX", typeof(string));
3976  table.Columns.Add("IS_NULLABLE", typeof(bool));
3977  table.Columns.Add("IS_CASE_SENSITIVE", typeof(bool));
3978  table.Columns.Add("IS_SEARCHABLE", typeof(bool));
3979  table.Columns.Add("IS_UNSIGNED", typeof(bool));
3980  table.Columns.Add("IS_FIXED_PRECISION_SCALE", typeof(bool));
3981  table.Columns.Add("IS_AUTO_INCREMENT", typeof(bool));
3982  table.Columns.Add("MINIMUM_SCALE", typeof(short));
3983  table.Columns.Add("MAXIMUM_SCALE", typeof(short));
3984 
3985  // Kinetica supported data types
3986  // See: https://docs.kinetica.com/7.2/concepts/types/
3987 
3988  // Integer types
3989  table.Rows.Add("int", (int)DbType.Int32, 10, null, null, true, false, true, false, true, false, (short)0, (short)0);
3990  table.Rows.Add("int8", (int)DbType.SByte, 3, null, null, true, false, true, false, true, false, (short)0, (short)0);
3991  table.Rows.Add("int16", (int)DbType.Int16, 5, null, null, true, false, true, false, true, false, (short)0, (short)0);
3992  table.Rows.Add("long", (int)DbType.Int64, 19, null, null, true, false, true, false, true, false, (short)0, (short)0);
3993  table.Rows.Add("ulong", (int)DbType.UInt64, 20, null, null, true, false, true, true, true, false, (short)0, (short)0);
3994 
3995  // Floating point types
3996  table.Rows.Add("float", (int)DbType.Single, 7, null, null, true, false, true, false, false, false, (short)0, (short)7);
3997  table.Rows.Add("double", (int)DbType.Double, 15, null, null, true, false, true, false, false, false, (short)0, (short)15);
3998  table.Rows.Add("decimal", (int)DbType.Decimal, 38, null, null, true, false, true, false, true, false, (short)0, (short)38);
3999 
4000  // Boolean
4001  table.Rows.Add("boolean", (int)DbType.Boolean, 1, null, null, true, false, true, false, true, false, (short)0, (short)0);
4002 
4003  // String types
4004  table.Rows.Add("string", (int)DbType.String, 8000, "'", "'", true, true, true, false, false, false, (short)0, (short)0);
4005  // Note: Kinetica supports char1 through char256, but we represent them as string with size
4006  table.Rows.Add("char1", (int)DbType.StringFixedLength, 1, "'", "'", true, true, true, false, false, false, (short)0, (short)0);
4007  table.Rows.Add("char2", (int)DbType.StringFixedLength, 2, "'", "'", true, true, true, false, false, false, (short)0, (short)0);
4008  table.Rows.Add("char4", (int)DbType.StringFixedLength, 4, "'", "'", true, true, true, false, false, false, (short)0, (short)0);
4009  table.Rows.Add("char8", (int)DbType.StringFixedLength, 8, "'", "'", true, true, true, false, false, false, (short)0, (short)0);
4010  table.Rows.Add("char16", (int)DbType.StringFixedLength, 16, "'", "'", true, true, true, false, false, false, (short)0, (short)0);
4011  table.Rows.Add("char32", (int)DbType.StringFixedLength, 32, "'", "'", true, true, true, false, false, false, (short)0, (short)0);
4012  table.Rows.Add("char64", (int)DbType.StringFixedLength, 64, "'", "'", true, true, true, false, false, false, (short)0, (short)0);
4013  table.Rows.Add("char128", (int)DbType.StringFixedLength, 128, "'", "'", true, true, true, false, false, false, (short)0, (short)0);
4014  table.Rows.Add("char256", (int)DbType.StringFixedLength, 256, "'", "'", true, true, true, false, false, false, (short)0, (short)0);
4015 
4016  // Binary
4017  table.Rows.Add("bytes", (int)DbType.Binary, 8000, "0x", null, true, false, false, false, false, false, (short)0, (short)0);
4018 
4019  // Date/Time types
4020  table.Rows.Add("date", (int)DbType.Date, 10, "'", "'", true, false, true, false, false, false, (short)0, (short)0);
4021  table.Rows.Add("time", (int)DbType.Time, 12, "'", "'", true, false, true, false, false, false, (short)0, (short)0);
4022  table.Rows.Add("datetime", (int)DbType.DateTime, 23, "'", "'", true, false, true, false, false, false, (short)0, (short)3);
4023  table.Rows.Add("timestamp", (int)DbType.Int64, 19, null, null, true, false, true, false, false, false, (short)0, (short)0); // milliseconds since epoch
4024 
4025  // Special types
4026  table.Rows.Add("uuid", (int)DbType.Guid, 36, "'", "'", true, false, true, false, false, false, (short)0, (short)0);
4027  table.Rows.Add("ipv4", (int)DbType.String, 15, "'", "'", true, false, true, false, false, false, (short)0, (short)0);
4028  table.Rows.Add("json", (int)DbType.String, -1, "'", "'", true, true, true, false, false, false, (short)0, (short)0);
4029 
4030  // Geospatial
4031  table.Rows.Add("wkt", (int)DbType.String, -1, "'", "'", true, false, true, false, false, false, (short)0, (short)0);
4032 
4033  // Vector type (for vector search/embeddings)
4034  table.Rows.Add("vector", (int)DbType.Object, -1, null, null, true, false, false, false, false, false, (short)0, (short)0);
4035 
4036  // Array types (represented as Object since .NET doesn't have a direct DbType for arrays)
4037  table.Rows.Add("array", (int)DbType.Object, -1, null, null, true, false, false, false, false, false, (short)0, (short)0);
4038 
4039  return table;
4040  }
4041 
4042  private static string GetKineticaTypeFromAvro(Avro.Schema schema)
4043  {
4044  if (schema is Avro.UnionSchema unionSchema)
4045  {
4046  // For union types (nullable), find the non-null type
4047  foreach (var s in unionSchema.Schemas)
4048  {
4049  if (s.Tag != Avro.Schema.Type.Null)
4050  return GetKineticaTypeFromAvro(s);
4051  }
4052  }
4053 
4054  return schema.Tag switch
4055  {
4056  Avro.Schema.Type.Int => "int",
4057  Avro.Schema.Type.Long => "long",
4058  Avro.Schema.Type.Float => "float",
4059  Avro.Schema.Type.Double => "double",
4060  Avro.Schema.Type.String => "string",
4061  Avro.Schema.Type.Boolean => "boolean",
4062  Avro.Schema.Type.Bytes => "bytes",
4063  Avro.Schema.Type.Fixed => "bytes",
4064  _ => "string"
4065  };
4066  }
4067 
4068  private static bool IsNullableField(Avro.Schema schema)
4069  {
4070  if (schema is Avro.UnionSchema unionSchema)
4071  {
4072  return unionSchema.Schemas.Any(s => s.Tag == Avro.Schema.Type.Null);
4073  }
4074  return false;
4075  }
4076 
4077  private string MapKineticaTypeToSqlType(string kineticaType)
4078  {
4079  var typeLower = kineticaType.ToLower();
4080 
4081  // Handle charN types (char1, char2, char4, ..., char256)
4082  if (typeLower.StartsWith("char") && typeLower.Length > 4)
4083  return "char";
4084 
4085  // Handle array types
4086  if (typeLower.StartsWith("array"))
4087  return "array";
4088 
4089  // Handle vector types
4090  if (typeLower.StartsWith("vector"))
4091  return "vector";
4092 
4093  return typeLower switch
4094  {
4095  // Integer types
4096  "int" or "integer" or "int32" => "int",
4097  "int8" or "tinyint" => "tinyint",
4098  "int16" or "smallint" => "smallint",
4099  "long" or "bigint" or "int64" => "bigint",
4100  "ulong" or "uint64" => "bigint unsigned",
4101 
4102  // Floating point
4103  "float" or "real" => "real",
4104  "double" or "float8" => "float",
4105  "decimal" or "numeric" => "decimal",
4106 
4107  // Boolean
4108  "bool" or "boolean" => "boolean",
4109 
4110  // String types
4111  "string" or "varchar" or "text" => "varchar",
4112  "char" => "char",
4113 
4114  // Date/Time
4115  "date" => "date",
4116  "time" => "time",
4117  "datetime" => "datetime",
4118  "timestamp" => "timestamp",
4119 
4120  // Special types
4121  "uuid" => "uuid",
4122  "ipv4" => "varchar",
4123  "json" => "json",
4124  "wkt" => "geometry",
4125 
4126  // Binary
4127  "bytes" or "binary" => "varbinary",
4128 
4129  _ => "varchar"
4130  };
4131  }
4132 
4133  private int? GetMaxLength(string dataType)
4134  {
4135  return dataType.ToLower() switch
4136  {
4137  "string" or "varchar" => 8000, // Default max length
4138  "char" => 1,
4139  _ => null
4140  };
4141  }
4142 
4143  private (int? precision, int? scale) GetPrecisionAndScale(string dataType)
4144  {
4145  return dataType.ToLower() switch
4146  {
4147  "int" or "integer" => (10, 0),
4148  "long" or "bigint" => (19, 0),
4149  "float" => (24, null),
4150  "double" => (53, null),
4151  "decimal" => (18, 2), // Default precision and scale
4152  _ => (null, null)
4153  };
4154  }
4155  }
4156 
4157  // 8. Custom Exception Classes with Error Mapping
4158  public class KineticaException : SystemException
4159  {
4160  public int ErrorCode { get; }
4161  public string SqlState { get; }
4162 
4163  public KineticaException(string message) : base(message)
4164  {
4165  ErrorCode = -1;
4166  SqlState = "HY000"; // General error
4167  }
4168 
4169  public KineticaException(string message, Exception innerException) : base(message, innerException)
4170  {
4171  ErrorCode = MapExceptionToErrorCode(innerException);
4172  SqlState = MapExceptionToSqlState(innerException);
4173  }
4174 
4175  public KineticaException(string message, int errorCode, string sqlState) : base(message)
4176  {
4177  ErrorCode = errorCode;
4178  SqlState = sqlState;
4179  }
4180 
4181  private int MapExceptionToErrorCode(Exception exception)
4182  {
4183  return exception switch
4184  {
4185  TimeoutException => -2,
4186  UnauthorizedAccessException => -3,
4187  ArgumentException => -4,
4188  InvalidOperationException => -5,
4189  _ => -1
4190  };
4191  }
4192 
4193  private string MapExceptionToSqlState(Exception exception)
4194  {
4195  return exception switch
4196  {
4197  TimeoutException => "HYT00", // Timeout expired
4198  UnauthorizedAccessException => "28000", // Invalid authorization specification
4199  ArgumentException => "22000", // Data exception
4200  InvalidOperationException => "24000", // Invalid cursor state
4201  _ => "HY000" // General error
4202  };
4203  }
4204  }
4205 
4207  {
4208  public KineticaConnectionException(string message) : base(message, -100, "08000") { }
4209  public KineticaConnectionException(string message, Exception innerException) : base(message, innerException) { }
4210  }
4211 
4213  {
4214  public KineticaSqlException(string message) : base(message, -200, "42000") { }
4215  public KineticaSqlException(string message, Exception innerException) : base(message, innerException) { }
4216  }
4217 
4218  // 9. Parameter Classes (keeping existing implementation)
4219  public class KineticaParameter : DbParameter
4220  {
4221  public override DbType DbType { get; set; } = DbType.String;
4222  public override ParameterDirection Direction { get; set; } = ParameterDirection.Input;
4223  public override bool IsNullable { get; set; }
4224  [System.Diagnostics.CodeAnalysis.AllowNull]
4225  public override string ParameterName { get; set; } = string.Empty;
4226  public override int Size { get; set; }
4227  [System.Diagnostics.CodeAnalysis.AllowNull]
4228  public override string SourceColumn { get; set; } = string.Empty;
4229  public override bool SourceColumnNullMapping { get; set; }
4230  public override object? Value { get; set; }
4231 
4232  public override void ResetDbType()
4233  {
4234  DbType = DbType.String;
4235  }
4236  }
4237 
4238  public class KineticaParameterCollection : DbParameterCollection
4239  {
4240  private readonly List<DbParameter> _parameters = new List<DbParameter>();
4241 
4242  public override int Count => _parameters.Count;
4243  public override object SyncRoot => _parameters;
4244 
4245  public override int Add(object value)
4246  {
4247  _parameters.Add((DbParameter)value);
4248  return _parameters.Count - 1;
4249  }
4250 
4251  public override void AddRange(Array values)
4252  {
4253  foreach (DbParameter param in values)
4254  {
4255  _parameters.Add(param);
4256  }
4257  }
4258 
4259  public override void Clear() => _parameters.Clear();
4260  public override bool Contains(object value) => _parameters.Contains((DbParameter)value);
4261  public override bool Contains(string value) => _parameters.Any(p => p.ParameterName == value);
4262  public override void CopyTo(Array array, int index) => _parameters.CopyTo((DbParameter[])array, index);
4263  public override System.Collections.IEnumerator GetEnumerator() => _parameters.GetEnumerator();
4264  public override int IndexOf(object value) => _parameters.IndexOf((DbParameter)value);
4265  public override int IndexOf(string parameterName) => _parameters.FindIndex(p => p.ParameterName == parameterName);
4266  public override void Insert(int index, object value) => _parameters.Insert(index, (DbParameter)value);
4267  public override void Remove(object value) => _parameters.Remove((DbParameter)value);
4268  public override void RemoveAt(int index) => _parameters.RemoveAt(index);
4269 
4270  public override void RemoveAt(string parameterName)
4271  {
4272  var index = IndexOf(parameterName);
4273  if (index >= 0) RemoveAt(index);
4274  }
4275 
4276  protected override DbParameter GetParameter(int index) => _parameters[index];
4277  protected override DbParameter GetParameter(string parameterName)
4278  {
4279  var index = IndexOf(parameterName);
4280  return index >= 0 ? _parameters[index] : throw new ArgumentException($"Parameter '{parameterName}' not found");
4281  }
4282 
4283  protected override void SetParameter(int index, DbParameter value) => _parameters[index] = value;
4284  protected override void SetParameter(string parameterName, DbParameter value)
4285  {
4286  var index = IndexOf(parameterName);
4287  if (index >= 0)
4288  _parameters[index] = value;
4289  else
4290  _parameters.Add(value);
4291  }
4292  }
4293 
4326  public class KineticaTransaction : DbTransaction
4327  {
4328  private readonly KineticaConnection _connection;
4329  private bool _completed = false;
4330 
4331  public KineticaTransaction(KineticaConnection connection, IsolationLevel isolationLevel)
4332  {
4333  _connection = connection;
4334  IsolationLevel = isolationLevel;
4335  }
4336 
4344  public override IsolationLevel IsolationLevel { get; }
4345 
4346  protected override DbConnection DbConnection => _connection;
4347 
4357  public override void Commit()
4358  {
4359  Task.Run(async () => await CommitAsync(CancellationToken.None).ConfigureAwait(false))
4360  .ConfigureAwait(false)
4361  .GetAwaiter()
4362  .GetResult();
4363  }
4364 
4374  public override async Task CommitAsync(CancellationToken cancellationToken = default)
4375  {
4376  if (_completed)
4377  throw new InvalidOperationException("Transaction already completed");
4378 
4379  try
4380  {
4381  // Flush any pending batch inserts
4382  if (_connection.BatchManager != null)
4383  {
4384  await _connection.FlushBatchAsync(cancellationToken).ConfigureAwait(false);
4385  }
4386 
4387  _completed = true;
4388  }
4389  catch (Exception ex)
4390  {
4391  throw new KineticaException($"Failed to flush batch inserts during commit: {ex.Message}", ex);
4392  }
4393  }
4394 
4407  public override void Rollback()
4408  {
4409  if (_completed)
4410  throw new InvalidOperationException("Transaction already completed");
4411 
4412  // Kinetica does not support rollback. All executed commands are already committed.
4413  // This method exists only for ADO.NET API compatibility.
4414  _completed = true;
4415  }
4416 
4417  protected override void Dispose(bool disposing)
4418  {
4419  if (disposing && !_completed)
4420  {
4421  // Mark as completed on dispose; no actual rollback occurs
4422  _completed = true;
4423  }
4424  base.Dispose(disposing);
4425  }
4426  }
4427 
4428  // 11. Factory Class
4429  public class KineticaProviderFactory : DbProviderFactory
4430  {
4432 
4433  public override DbCommand CreateCommand() => new KineticaCommand();
4434  public override DbConnection CreateConnection() => new KineticaConnection();
4435  public override DbParameter CreateParameter() => new KineticaParameter();
4436 
4437  public override DbConnectionStringBuilder CreateConnectionStringBuilder() =>
4438  new DbConnectionStringBuilder(); // Could create custom KineticaConnectionStringBuilder wrapper
4439 
4440  public override bool CanCreateDataSourceEnumerator => false;
4441  public override DbDataSourceEnumerator CreateDataSourceEnumerator() =>
4442  throw new NotSupportedException("Data source enumeration not supported");
4443  }
4444 
4445  // 12. Configuration and Registration Helper
4446  public static class KineticaProviderRegistration
4447  {
4448  private static bool _registered = false;
4449  private static readonly object _lock = new object();
4450 
4451  public static void RegisterProvider()
4452  {
4453  if (_registered) return;
4454 
4455  lock (_lock)
4456  {
4457  if (_registered) return;
4458 
4459  try
4460  {
4461  // Register the provider in the current application domain
4462  var factoryType = typeof(KineticaProviderFactory);
4463  var invariantName = "KineticaAdo";
4464 
4465  // This would typically be done through configuration files,
4466  // but can also be done programmatically
4467  DbProviderFactories.RegisterFactory(invariantName, factoryType);
4468 
4469  _registered = true;
4470  }
4471  catch (Exception ex)
4472  {
4473  throw new InvalidOperationException($"Failed to register Kinetica provider: {ex.Message}", ex);
4474  }
4475  }
4476  }
4477  }
4478 }
4479 
4480 // Usage Examples:
4481 /*
4482 // 1. Basic Usage with Connection Pooling
4483 using (var connection = new KineticaConnection("Server=192.168.1.100:9191;Username=admin;Password=password;Pooling=true;Max Pool Size=50"))
4484 {
4485  await connection.OpenAsync();
4486 
4487  using (var command = new KineticaCommand("SELECT * FROM my_table WHERE id = @id", connection))
4488  {
4489  command.Parameters.Add(new KineticaParameter { ParameterName = "@id", Value = 123 });
4490 
4491  using (var reader = await command.ExecuteReaderAsync())
4492  {
4493  while (await reader.ReadAsync())
4494  {
4495  Console.WriteLine($"Value: {reader[0]}");
4496  }
4497  }
4498  }
4499 }
4500 
4501 // 1b. High-Performance Batch Insert Mode
4502 // Enable batch insert mode for high-throughput INSERT operations.
4503 // Records are buffered and sent in batches for 10-100x better performance.
4504 using (var connection = new KineticaConnection("Server=192.168.1.100:9191;Username=admin;Password=password;Batch Insert Mode=true;Batch Size=10000"))
4505 {
4506  await connection.OpenAsync();
4507 
4508  // All INSERTs are automatically batched
4509  for (int i = 0; i < 100000; i++)
4510  {
4511  using (var command = new KineticaCommand($"INSERT INTO my_table (id, name, value) VALUES ({i}, 'name{i}', {i * 1.5})", connection))
4512  {
4513  await command.ExecuteNonQueryAsync(); // Record is buffered, not sent immediately
4514  }
4515  }
4516 
4517  // Explicitly flush remaining records (also happens automatically on Close/Dispose)
4518  long flushed = await connection.FlushBatchAsync();
4519  Console.WriteLine($"Flushed {flushed} records");
4520 }
4521 
4522 // 1c. Batch Insert with Programmatic Configuration
4523 using (var connection = new KineticaConnection("Server=192.168.1.100:9191;Username=admin;Password=password"))
4524 {
4525  // Enable batch mode programmatically
4526  connection.BatchInsertMode = true;
4527  connection.BatchSize = 5000;
4528  connection.BatchUpdateOnExistingPk = true; // UPSERT behavior
4529 
4530  await connection.OpenAsync();
4531 
4532  // Insert records...
4533  // Records are automatically sent when BatchSize is reached
4534 
4535  // Check pending count
4536  Console.WriteLine($"Pending records: {connection.PendingBatchCount}");
4537 }
4538 
4539 // 2. Schema Discovery
4540 using (var connection = new KineticaConnection(connectionString))
4541 {
4542  connection.Open();
4543 
4544  // Get all tables
4545  var tables = connection.GetSchema("Tables");
4546  foreach (DataRow row in tables.Rows)
4547  {
4548  Console.WriteLine($"Table: {row["TABLE_NAME"]}");
4549  }
4550 
4551  // Get columns for a specific table
4552  var columns = connection.GetSchema("Columns", new[] { null, null, "my_table", null });
4553  foreach (DataRow row in columns.Rows)
4554  {
4555  Console.WriteLine($"Column: {row["COLUMN_NAME"]} ({row["DATA_TYPE"]})");
4556  }
4557 }
4558 
4559 // 3. Provider Registration and Factory Usage
4560 KineticaProviderRegistration.RegisterProvider();
4561 
4562 var factory = DbProviderFactories.GetFactory("KineticaAdo");
4563 using (var connection = factory.CreateConnection())
4564 {
4565  connection.ConnectionString = "Server=localhost:9191";
4566  connection.Open();
4567 
4568  using (var command = factory.CreateCommand())
4569  {
4570  command.Connection = connection;
4571  command.CommandText = "SELECT COUNT(*) FROM my_table";
4572 
4573  var count = command.ExecuteScalar();
4574  Console.WriteLine($"Row count: {count}");
4575  }
4576 }
4577 */
Apache Parquet format.
override async Task CommitAsync(CancellationToken cancellationToken=default)
Asynchronously commits the transaction.
int Limit
Gets or sets the result row limit.
override async Task< bool > NextResultAsync(CancellationToken cancellationToken)
Definition: KineticaAdo.cs:958
override void Dispose(bool disposing)
Definition: KineticaAdo.cs:368
int Timeout
Gets or sets the query timeout in minutes.
Manages bulk insert operations for the ADO.NET driver.
ImpersonationType
Types of user impersonation commands.
override void RemoveAt(int index)
KineticaSqlException(string message)
Class for record schemas
Definition: RecordSchema.cs:31
override? DbConnection DbConnection
Definition: KineticaAdo.cs:426
bool UpdateOnExistingPk
Gets or sets whether to update on existing primary key.
bool UpdateOnExistingPk
If true, updates existing records with matching primary keys.
override object GetValue(int ordinal)
bool PlanCache
Gets or sets whether plan caching is enabled.
override string GetDataTypeName(int ordinal)
Options for INSERT FROM FILE operations.
UserImpersonationInfo? UserImpersonation
override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel)
Definition: KineticaAdo.cs:236
override void AddRange(Array values)
bool ServerSideInsert
Whether to force server-side insert execution (KI_HINT_SERVER_SIDE_INSERT).
override long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length)
bool UseApproxCountDistinct
Gets or sets whether to use approximate count distinct.
KineticaSchemaProvider(KineticaConnection connection)
override double GetDouble(int ordinal)
string PrimaryUrl
Gets or sets the primary URL for HA configurations.
int FetchSize
Gets or sets the default fetch size for paged result sets.
override bool GetBoolean(int ordinal)
List< string > InsertColumns
Column names specified in the INSERT INTO clause (optional).
int PendingBatchCount
Gets the number of records currently pending in the batch buffer.
Definition: KineticaAdo.cs:319
override char GetChar(int ordinal)
KineticaPagingDataReader(Kinetica client, string sql, int fetchSize, CommandBehavior behavior, CancellationToken cancellationToken=default)
override void Dispose(bool disposing)
string FileReadEscapeChar
Gets or sets the file escape character.
FileFormat
Supported file formats for INSERT FROM FILE operations.
override void SetParameter(int index, DbParameter value)
bool DryRun
Whether this is a dry run (validate without inserting). Default: false.
override DataTable GetSchema()
Definition: KineticaAdo.cs:352
override float GetFloat(int ordinal)
override Guid GetGuid(int ordinal)
string? KifsPath
KiFS path (for kifs:// syntax).
Skip problematic rows and continue.
FileFormat Format
File format. Default: Auto (detected from extension).
bool CostBasedOptimization
Gets or sets whether cost-based optimization is enabled.
FileErrorMode ErrorMode
Error handling mode.
override? object ExecuteScalar()
Definition: KineticaAdo.cs:593
User impersonation information for SET USER/EXECUTE AS commands.
override bool SourceColumnNullMapping
override int GetValues(object[] values)
bool FileReadHasHeader
Gets or sets whether the file has a header row.
Base class for all schema types
Definition: Schema.cs:29
override IEnumerator< IDataRecord > GetEnumerator()
string ActualPath
The actual path to use (either FilePath or KifsPath).
override void Dispose(bool disposing)
Definition: KineticaAdo.cs:855
KineticaConnectionStringBuilder(string connectionString)
override DateTime GetDateTime(int ordinal)
override int IndexOf(object value)
IDictionary< string, string > ToQueryOptions()
Converts hints to query options dictionary.
override DbCommand CreateCommand()
bool ResultsCaching
Gets or sets whether results caching is enabled.
string? FilePath
File path (local file path for FILE."path" syntax).
string Server
Gets or sets the server URL (e.g., "http://localhost:9191").
int Limit
Maximum number of rows to read. Default: unlimited (0).
override string?? CommandText
Definition: KineticaAdo.cs:407
override DateTime GetDateTime(int ordinal)
override DbConnection DbConnection
Information about an INSERT INTO...SELECT FROM FILE statement.
Class for union schemas
Definition: UnionSchema.cs:29
override string GetName(int ordinal)
override short GetInt16(int ordinal)
override string Name
Name of the schema
Definition: NamedSchema.cs:40
void RevertUser()
Reverts to previous user context from ExecuteAsUser.
Definition: KineticaAdo.cs:112
long TotalRecordCount
Gets the total number of records in the result set.
override DbParameter GetParameter(string parameterName)
override CommandType CommandType
Definition: KineticaAdo.cs:409
override string GetString(int ordinal)
string OAuthToken
Gets or sets the OAuth token for authentication.
string?? Database
Gets or sets the database name.
override string ServerVersion
Definition: KineticaAdo.cs:56
void SetSchema(string schemaName)
Sets the current schema (equivalent to SET SCHEMA command).
Definition: KineticaAdo.cs:82
string FileReadDelimiter
Gets or sets the file read delimiter.
override long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length)
Definition: KineticaAdo.cs:983
override int GetOrdinal(string name)
string Password
Gets or sets the password for authentication.
override DbDataSourceEnumerator CreateDataSourceEnumerator()
override void Insert(int index, object value)
int FailbackPollInterval
Gets or sets the failback poll interval in seconds.
async Task< long > FlushAllAsync(CancellationToken cancellationToken=default)
Flushes all pending records for all tables.
string?? ImpersonateUser
Gets or sets the user to impersonate.
FileErrorMode
Error handling modes for file insert operations.
int ServerConnectionTimeout
Gets or sets the server connection timeout in seconds.
bool DisableMultihead
Whether to disable multi-head insert (KI_HINT_DISABLE_MULTIHEAD).
KineticaException(string message, Exception innerException)
SQL parser with support for JDBC-compatible features including:
override bool Contains(object value)
Type
Enum for schema types
Definition: Schema.cs:34
async Task< bool > InsertAsync(string tableName, IList< string > columnNames, IList< object?> values, CancellationToken cancellationToken=default)
Inserts a record into the specified table using batch processing.
string? CommentPrefix
Comment line prefix (lines starting with this are skipped).
override object GetValue(int ordinal)
override async Task< bool > ReadAsync(CancellationToken cancellationToken)
override DbParameter CreateDbParameter()
Definition: KineticaAdo.cs:825
override int GetInt32(int ordinal)
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
KineticaConnectionException(string message, Exception innerException)
bool ParallelExecution
Gets or sets whether parallel execution is enabled.
bool NoSync
Gets or sets whether to disable sync mode.
override short GetInt16(int ordinal)
void ExecuteAsUser(string username)
Executes as a specific user, pushing current context to stack.
Definition: KineticaAdo.cs:100
IList< KineticaRecord > data
Avro binary encoded response.
Definition: ExecuteSql.cs:1613
override byte GetByte(int ordinal)
bool ReadOnly
Gets or sets whether the connection is read-only.
int GetTotalPendingCount()
Gets the total number of pending records across all tables.
bool InitialClear
Whether to clear the table before inserting. Default: false.
KineticaSqlException(string message, Exception innerException)
override float GetFloat(int ordinal)
KineticaConnection(string connectionString)
Definition: KineticaAdo.cs:32
bool IgnoreExistingPk
Gets or sets whether to ignore existing primary keys on insert.
override ConnectionState State
Definition: KineticaAdo.cs:57
override bool GetBoolean(int ordinal)
override DataTable GetSchema(string collectionName)
Definition: KineticaAdo.cs:357
if(args.Length > 0)
Definition: Program.cs:5
bool UseKeyLookup
Gets or sets whether to use key lookup optimization.
bool DisableAutoDiscovery
Gets or sets whether to disable auto-discovery of cluster nodes.
int ConnectionTimeout
Gets or sets the connection timeout in seconds.
Provides a comprehensive connection string builder with JDBC-compatible properties.
override int GetValues(object[] values)
override void Rollback()
Rolls back the transaction.
KineticaCommand(string commandText, KineticaConnection connection)
Definition: KineticaAdo.cs:401
ParsedCommandType CommandType
override IsolationLevel IsolationLevel
Gets the isolation level for this transaction.
int FetchSize
Gets or sets the fetch size (number of records to retrieve per batch).
Definition: KineticaAdo.cs:420
string TableName
Target table name (may include schema).
string?? Schema
Gets or sets the default schema.
ParsedCommand Parse(string sql, DbParameterCollection parameters)
string?? SslCaCertPath
Gets or sets the path to the SSL CA certificate.
string Username
Gets or sets the username for authentication.
override DbConnection CreateConnection()
bool BatchUpdateOnExistingPk
Gets or sets whether to update existing records with matching primary keys during batch inserts.
Definition: KineticaAdo.cs:306
override int Add(object value)
DataTable GetSchema(string collectionName, string?[]? restrictionValues)
override async Task OpenAsync(CancellationToken cancellationToken)
Definition: KineticaAdo.cs:162
override long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length)
string FileReadNullString
Gets or sets the null string representation in files.
int InitialConnectionTimeout
Gets or sets the initial connection timeout in seconds.
bool HasHeader
Whether the file has a header row.
bool IgnoreExistingPk
Whether to ignore records with duplicate primary keys. Default: false.
bool TruncateStrings
Whether to truncate strings that exceed column length (KI_HINT_TRUNCATE_STRINGS).
IDictionary< string, string > ToInsertOptions()
Converts hints to insert options dictionary.
override Guid GetGuid(int ordinal)
KineticaCommand(KineticaConnection connection)
Definition: KineticaAdo.cs:396
List< string > SelectColumns
Column names specified in the SELECT clause (optional, * means all).
long FlushBatch()
Flushes all pending batch inserts to the database.
Definition: KineticaAdo.cs:325
static readonly KineticaProviderFactory Instance
KineticaTransaction(KineticaConnection connection, IsolationLevel isolationLevel)
override decimal GetDecimal(int ordinal)
override Type GetFieldType(int ordinal)
override long GetInt64(int ordinal)
string???? CurrentSchema
Gets or sets the current schema for this session.
Definition: KineticaAdo.cs:64
Abort on first error.
FileInsertOptions Options
File insert options parsed from WITH OPTIONS clause.
bool DistributedJoins
Gets or sets whether distributed joins are enabled.
override bool DesignTimeVisible
Definition: KineticaAdo.cs:410
bool SsqOptimizations
Gets or sets whether SSQ optimizations are enabled.
EXECUTE AS USER "username" - Temporary context switch (can be reverted).
override async Task< bool > ReadAsync(CancellationToken cancellationToken)
Definition: KineticaAdo.cs:941
void ReturnConnection(string connectionString, Kinetica connection)
string?? Replication
Gets or sets the replication mode.
override DbParameter CreateParameter()
override void ChangeDatabase(string databaseName)
Definition: KineticaAdo.cs:74
override int GetInt32(int ordinal)
override void RemoveAt(string parameterName)
override int GetOrdinal(string name)
string? PkConflictPredicateLowerColumn
Column name for lower predicate PK conflict resolution (KI_HINT_PK_CONFLICT_PREDICATE_LOWER).
int Ttl
Gets or sets the time-to-live in minutes for query results.
bool SelectAll
Whether SELECT * was used.
Query optimization hints extracted from SQL.
override void Commit()
Commits the transaction.
string? PkConflictPredicateHigherColumn
Column name for higher predicate PK conflict resolution (KI_HINT_PK_CONFLICT_PREDICATE_HIGHER).
override void Remove(object value)
int Skip
Number of lines to skip from beginning. Default: 0.
int RowsPerInsertion
Gets or sets the number of rows per insertion batch.
bool SslAllowHostMismatch
Gets or sets whether to allow SSL host mismatch.
bool BypassSslCertCheck
Gets or sets whether to bypass SSL certificate validation.
override string GetName(int ordinal)
ParsedCommandType
Types of parsed SQL commands.
override double GetDouble(int ordinal)
bool UseKeyLookup
Whether to use key lookup optimization (KI_HINT_KEY_LOOKUP).
override byte GetByte(int ordinal)
override void SetParameter(string parameterName, DbParameter value)
char Delimiter
Field delimiter character. Default: comma for CSV, auto-detected from extension.
char QuoteChar
Quote character for string fields. Default: double-quote.
override string???? ConnectionString
Definition: KineticaAdo.cs:39
bool ReplicationSync
Whether to use synchronous replication (KI_HINT_REPL_SYNC).
bool TruncateStrings
Whether to truncate strings that exceed column length. Default: false.
override DbParameter GetParameter(int index)
bool DisableSnappy
Gets or sets whether to disable Snappy compression.
int RowsPerFetch
Gets or sets the number of rows per fetch.
string? ExtractSchemaName(string sql)
Extracts schema name from SET SCHEMA command.
Represents a parsed SQL command with extracted metadata.
Provides ADO.NET transaction API compatibility for Kinetica.
override async Task< int > ExecuteNonQueryAsync(CancellationToken cancellationToken)
Definition: KineticaAdo.cs:448
override DbParameterCollection DbParameterCollection
Definition: KineticaAdo.cs:431
void SetUser(string username)
Sets the impersonated user context (equivalent to SET USER command).
Definition: KineticaAdo.cs:91
Options for batch insert operations.
Delimited text (CSV, TSV, PSV).
override void CopyTo(Array array, int index)
string? ImpersonatedUser
Gets the current impersonated user, if any.
Definition: KineticaAdo.cs:72
override bool IsDBNull(int ordinal)
string?? ErrorMode
Gets or sets the error mode for insertions.
bool IsKifsPath
Whether this is a KiFS file operation.
Immutable collection of metadata about a Kinetica type.
Definition: Type.cs:36
KineticaException(string message)
bool IgnoreExistingPk
Whether to ignore existing primary key (KI_HINT_IGNORE_EXISTING_PK).
string?? SslCertPassword
Gets or sets the SSL certificate password.
override System.Collections.IEnumerator GetEnumerator()
A paging data reader that fetches records in batches for large result sets.
DbParameterCollection? Parameters
string NullString
String that represents NULL values. Default: \N.
int BatchSize
The number of records to batch before flushing.
InsertFromFileInfo? InsertFromFile
override long GetInt64(int ordinal)
Auto-detect format from file extension.
override async Task< object?> ExecuteScalarAsync(CancellationToken cancellationToken)
Definition: KineticaAdo.cs:602
override IEnumerator< IDataRecord > GetEnumerator()
override DbDataReader ExecuteDbDataReader(CommandBehavior behavior)
Definition: KineticaAdo.cs:614
override DataTable GetSchemaTable()
async Task< long > FlushBatchAsync(CancellationToken cancellationToken=default)
Flushes all pending batch inserts to the database asynchronously.
Definition: KineticaAdo.cs:339
bool BatchInsertMode
Gets or sets whether batch insert mode is enabled.
Definition: KineticaAdo.cs:271
char EscapeChar
Escape character for special chars. Default: none.
override string GetDataTypeName(int ordinal)
override DbCommand CreateDbCommand()
Definition: KineticaAdo.cs:241
long TotalRowsRead
Gets the total number of records read so far.
int BatchSize
Batch size for bulk insert operations. Default: 10000.
override async Task< DbDataReader > ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken)
Definition: KineticaAdo.cs:623
override long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length)
bool DisableMultiheadInsert
Gets or sets whether to disable multi-head insert.
static Schema Parse(string json)
Parses a given JSON string to create a new schema object
Definition: Schema.cs:141
bool UpdateOnExistingPk
Whether to update records with duplicate primary keys. Default: false.
int? BatchSize
Batch size hint for bulk operations (KI_HINT_BATCH_SIZE).
override ParameterDirection Direction
KineticaDataReader(ExecuteSqlResponse response, CommandBehavior behavior)
Definition: KineticaAdo.cs:875
bool RuleBasedOptimizations
Gets or sets whether rule-based optimizations are enabled.
int BatchSize
Gets or sets the batch size for batch insert operations.
Definition: KineticaAdo.cs:297
override decimal GetDecimal(int ordinal)
override char GetChar(int ordinal)
DateTime in YYYY-MM-DD HH:MM:SS.mmm format
override DataTable GetSchema(string collectionName, string?[]? restrictionValues)
Definition: KineticaAdo.cs:362
async Task< Kinetica > GetConnectionAsync(string connectionString, CancellationToken cancellationToken)
Attempt to insert partial records.
string?? FailoverOrder
Gets or sets the HA failover order.
override UpdateRowSource UpdatedRowSource
Definition: KineticaAdo.cs:411
KineticaException(string message, int errorCode, string sqlState)
override DbConnectionStringBuilder CreateConnectionStringBuilder()
override string GetString(int ordinal)
int PagingTableTtl
Gets or sets the TTL for paging tables in minutes.
override bool IsDBNull(int ordinal)
override int ExecuteNonQuery()
Definition: KineticaAdo.cs:439
override? DbTransaction DbTransaction
Definition: KineticaAdo.cs:432
bool TruncateStrings
Gets or sets whether to truncate strings that exceed column length.
bool DisableFailover
Gets or sets whether to disable automatic failover.
bool UpdateOnExistingPk
Whether to update on existing primary key (KI_HINT_UPDATE_ON_EXISTING_PK).
override Type GetFieldType(int ordinal)
A set of results returned by Kinetica.executeSql.
Definition: ExecuteSql.cs:1588