Kinetica   C#   API  Version 7.2.3.1
BulkInserterDashboard.cs
Go to the documentation of this file.
1 
13 using System;
14 using System.Collections.Concurrent;
15 using System.Collections.Generic;
16 using System.Diagnostics;
17 using System.Threading;
18 using System.Threading.Tasks;
19 using kinetica;
20 using kinetica.Records;
21 using kinetica.Utils;
22 
23 namespace Example
24 {
25  #region Dashboard Metrics
26 
31  public class DashboardMetrics
32  {
33  // Timestamp
34  public long TimestampMs { get; set; }
35  public double UptimeSecs { get; set; }
36 
37  // Record counts
38  public long TotalInserted { get; set; }
39  public long TotalUpdated { get; set; }
40  public long TotalProcessed { get; set; }
41 
42  // Rates (per second)
43  public double InsertRate { get; set; }
44  public double UpdateRate { get; set; }
45  public double ThroughputRate { get; set; }
46 
47  // Queue metrics
48  public long PendingBatches { get; set; }
49  public int InFlightBatches { get; set; }
50  public int AvailablePermits { get; set; }
51  public int MaxInFlight { get; set; }
52  public double UtilizationPct { get; set; }
53 
54  // Error metrics
55  public int ErrorCount { get; set; }
56  public double ErrorRate { get; set; }
57  public List<ErrorSummary> RecentErrors { get; set; } = new();
58 
59  // Configuration
60  public int WorkerCount { get; set; }
61  public int BatchSize { get; set; }
62  public bool MultiHeadEnabled { get; set; }
63 
64  // Derived metrics
65  public double AvgBatchTimeMs { get; set; }
66  public double EstimatedQueueDrainSecs { get; set; }
67  }
68 
69  public class ErrorSummary
70  {
71  public string WorkerUrl { get; set; } = "";
72  public string Message { get; set; } = "";
73  public int RecordCount { get; set; }
74  public double AgeSecs { get; set; }
75  }
76 
77  #endregion
78 
79  #region Metrics Collector
80 
86  public class MetricsCollector<T> where T : IShardKeyExtractor, new()
87  {
88  private readonly BulkInserter<T> _inserter;
89  private readonly Stopwatch _startTime;
90 
91  // Previous values for rate calculation
92  private long _prevInserted;
93  private long _prevUpdated;
94  private int _prevErrors;
95  private Stopwatch _prevTimestamp;
96 
97  // Historical data for averaging
98  private readonly ConcurrentQueue<double> _rateHistory = new();
99  private const int MaxHistorySize = 60;
100 
102  {
103  _inserter = inserter;
104  _startTime = Stopwatch.StartNew();
105  _prevTimestamp = Stopwatch.StartNew();
106  }
107 
113  {
114  var now = Stopwatch.StartNew();
115  var uptime = _startTime.Elapsed;
116 
117  // Read metrics from the BulkInserter
118  var totalInserted = _inserter.CountInserted;
119  var totalUpdated = _inserter.CountUpdated;
120  var totalProcessed = totalInserted + totalUpdated;
121 
122  // Backpressure metrics
123  var bpMetrics = _inserter.GetBackpressureMetrics();
124 
125  // Error counts
126  var errorCount = _inserter.ErrorCount;
127 
128  // Configuration
129  var workerCount = _inserter.NumWorkers;
130  var batchSize = bpMetrics.MaxInFlightBatches > 0 ? 10000 : 10000; // Default batch size
131  var multiHeadEnabled = workerCount > 1;
132 
133  // Calculate rates
134  var elapsed = _prevTimestamp.Elapsed.TotalSeconds;
135  _prevTimestamp.Restart();
136 
137  var insertRate = elapsed > 0 ? (totalInserted - _prevInserted) / elapsed : 0;
138  var updateRate = elapsed > 0 ? (totalUpdated - _prevUpdated) / elapsed : 0;
139  var throughputRate = insertRate + updateRate;
140  var errorRate = elapsed > 0 ? (errorCount - _prevErrors) / elapsed : 0;
141 
142  // Update rate history
143  _rateHistory.Enqueue(throughputRate);
144  while (_rateHistory.Count > MaxHistorySize)
145  _rateHistory.TryDequeue(out _);
146 
147  // Store current values for next rate calculation
148  _prevInserted = totalInserted;
149  _prevUpdated = totalUpdated;
150  _prevErrors = errorCount;
151 
152  // Calculate derived metrics
153  var avgBatchTimeMs = throughputRate > 0
154  ? (batchSize / throughputRate) * 1000.0
155  : 0;
156 
157  var estimatedQueueDrainSecs = throughputRate > 0
158  ? (bpMetrics.PendingBatches * batchSize) / throughputRate
159  : double.PositiveInfinity;
160 
161  return new DashboardMetrics
162  {
163  TimestampMs = (long)uptime.TotalMilliseconds,
164  UptimeSecs = uptime.TotalSeconds,
165 
166  TotalInserted = totalInserted,
167  TotalUpdated = totalUpdated,
168  TotalProcessed = totalProcessed,
169 
170  InsertRate = insertRate,
171  UpdateRate = updateRate,
172  ThroughputRate = throughputRate,
173 
174  PendingBatches = bpMetrics.PendingBatches,
175  InFlightBatches = bpMetrics.InFlightBatches,
176  AvailablePermits = bpMetrics.AvailablePermits,
177  MaxInFlight = bpMetrics.MaxInFlightBatches,
178  UtilizationPct = bpMetrics.UtilizationPercent,
179 
180  ErrorCount = errorCount,
181  ErrorRate = errorRate,
182 
183  WorkerCount = workerCount,
184  BatchSize = batchSize,
185  MultiHeadEnabled = multiHeadEnabled,
186 
187  AvgBatchTimeMs = avgBatchTimeMs,
188  EstimatedQueueDrainSecs = estimatedQueueDrainSecs,
189  };
190  }
191 
192  public double GetAvgThroughput()
193  {
194  if (_rateHistory.IsEmpty) return 0;
195  double sum = 0;
196  int count = 0;
197  foreach (var rate in _rateHistory)
198  {
199  sum += rate;
200  count++;
201  }
202  return count > 0 ? sum / count : 0;
203  }
204  }
205 
206  #endregion
207 
208  #region Console Dashboard
209 
213  public class ConsoleDashboard
214  {
215  private readonly TimeSpan _refreshInterval;
216  private readonly bool _showErrors;
217 
218  public ConsoleDashboard(TimeSpan refreshInterval, bool showErrors = true)
219  {
220  _refreshInterval = refreshInterval;
221  _showErrors = showErrors;
222  }
223 
224  private void ClearScreen()
225  {
226  Console.Write("\x1B[2J\x1B[H");
227  }
228 
229  public void Render(DashboardMetrics metrics)
230  {
231  ClearScreen();
232 
233  Console.WriteLine("+----------------------------------------------------------------------------------+");
234  Console.WriteLine("| BULKINSERTER MONITORING DASHBOARD |");
235  Console.WriteLine("+----------------------------------------------------------------------------------+");
236 
237  // Uptime and timestamp
238  Console.WriteLine($"| Uptime: {metrics.UptimeSecs,10:F1}s {DateTime.Now:yyyy-MM-dd HH:mm:ss} |");
239 
240  Console.WriteLine("+----------------------------------------------------------------------------------+");
241  Console.WriteLine("| THROUGHPUT |");
242  Console.WriteLine("+----------------------------------------------------------------------------------+");
243 
244  // Record counts
245  Console.WriteLine($"| Records Inserted: {FormatNumber(metrics.TotalInserted),15} Records Updated: {FormatNumber(metrics.TotalUpdated),15} |");
246  Console.WriteLine($"| Total Processed: {FormatNumber(metrics.TotalProcessed),15} |");
247  Console.WriteLine("| |");
248 
249  // Rates
250  var rateBar = CreateRateBar(metrics.ThroughputRate, 500_000, 30);
251  Console.WriteLine($"| Insert Rate: {metrics.InsertRate,12:F0} rec/s |");
252  Console.WriteLine($"| Update Rate: {metrics.UpdateRate,12:F0} rec/s |");
253  Console.WriteLine($"| Total Rate: {metrics.ThroughputRate,12:F0} rec/s [{rateBar}] |");
254 
255  Console.WriteLine("+----------------------------------------------------------------------------------+");
256  Console.WriteLine("| QUEUE & BACKPRESSURE |");
257  Console.WriteLine("+----------------------------------------------------------------------------------+");
258 
259  // Queue status
260  Console.WriteLine($"| Pending Batches: {metrics.PendingBatches,6} In-Flight: {metrics.InFlightBatches,6} Available: {metrics.AvailablePermits,6} |");
261 
262  // Utilization bar
263  var utilBar = CreateUtilizationBar(metrics.UtilizationPct, 40);
264  var utilStatus = metrics.UtilizationPct > 90 ? "BACKPRESSURE"
265  : metrics.UtilizationPct > 70 ? "HIGH"
266  : metrics.UtilizationPct > 30 ? "NORMAL"
267  : "LOW";
268 
269  Console.WriteLine($"| Utilization: {metrics.UtilizationPct,5:F1}% [{utilBar}] {utilStatus,12} |");
270 
271  if (!double.IsInfinity(metrics.EstimatedQueueDrainSecs))
272  {
273  Console.WriteLine($"| Est. Queue Drain: {metrics.EstimatedQueueDrainSecs,6:F1}s |");
274  }
275  else
276  {
277  Console.WriteLine($"| Est. Queue Drain: N/A (no throughput) |");
278  }
279 
280  Console.WriteLine("+----------------------------------------------------------------------------------+");
281  Console.WriteLine("| ERRORS |");
282  Console.WriteLine("+----------------------------------------------------------------------------------+");
283 
284  var errorStatus = metrics.ErrorCount > 0 ? "!" : "+";
285  Console.WriteLine($"| Errors: {metrics.ErrorCount,6} {errorStatus} Error Rate: {metrics.ErrorRate,8:F2}/s |");
286 
287  // Show recent errors if any
288  if (_showErrors && metrics.RecentErrors.Count > 0)
289  {
290  Console.WriteLine("| |");
291  Console.WriteLine("| Recent Errors: |");
292  for (int i = 0; i < Math.Min(3, metrics.RecentErrors.Count); i++)
293  {
294  var err = metrics.RecentErrors[i];
295  var msg = err.Message.Length > 50 ? err.Message[..47] + "..." : err.Message;
296  Console.WriteLine($"| {i + 1}. {msg} ({err.RecordCount} recs)");
297  }
298  }
299 
300  Console.WriteLine("+----------------------------------------------------------------------------------+");
301  Console.WriteLine("| CONFIGURATION |");
302  Console.WriteLine("+----------------------------------------------------------------------------------+");
303 
304  var multiHead = metrics.MultiHeadEnabled ? "Enabled" : "Disabled";
305  Console.WriteLine($"| Workers: {metrics.WorkerCount,3} Batch Size: {metrics.BatchSize,6} Multi-Head: {multiHead,8} |");
306  Console.WriteLine($"| Max In-Flight: {metrics.MaxInFlight,3} Avg Batch Time: {metrics.AvgBatchTimeMs,8:F1}ms |");
307 
308  Console.WriteLine("+----------------------------------------------------------------------------------+");
309  Console.WriteLine();
310  Console.WriteLine("Press Ctrl+C to stop");
311  }
312 
313  private static string FormatNumber(long n)
314  {
315  if (n >= 1_000_000_000)
316  return $"{n / 1_000_000_000.0:F2}B";
317  if (n >= 1_000_000)
318  return $"{n / 1_000_000.0:F2}M";
319  if (n >= 1_000)
320  return $"{n / 1_000.0:F2}K";
321  return n.ToString();
322  }
323 
324  private static string CreateRateBar(double rate, double maxRate, int width)
325  {
326  var filled = (int)Math.Min(width, (rate / maxRate) * width);
327  var empty = width - filled;
328  return new string('#', filled) + new string('-', empty);
329  }
330 
331  private static string CreateUtilizationBar(double pct, int width)
332  {
333  var filled = (int)Math.Min(width, (pct / 100.0) * width);
334  var empty = width - filled;
335 
336  var barChar = pct > 90 ? '!' : pct > 70 ? '*' : '#';
337  return new string(barChar, filled) + new string('-', empty);
338  }
339  }
340 
341  #endregion
342 
343  #region Dashboard Example
344 
345  public static class BulkInserterDashboardExample
346  {
350  private class DashboardTestRecord : IShardKeyExtractor
351  {
352  public int id { get; set; }
353  public string category { get; set; } = "";
354  public double value { get; set; }
355  public long timestamp { get; set; }
356 
357  public ShardKeyValues GetShardKeyValues()
358  {
359  return new ShardKeyValues();
360  }
361  }
362 
363  public static async Task RunAsync()
364  {
365  Console.WriteLine("=============================================================");
366  Console.WriteLine(" BulkInserter Monitoring Dashboard Example");
367  Console.WriteLine("=============================================================");
368  Console.WriteLine();
369  Console.WriteLine("This example demonstrates how to:");
370  Console.WriteLine(" 1. Create a BulkInserter for data ingestion");
371  Console.WriteLine(" 2. Create a MetricsCollector to monitor the inserter");
372  Console.WriteLine(" 3. Run monitoring in parallel with data ingestion");
373  Console.WriteLine();
374 
375  // Get connection settings
376  var url = Environment.GetEnvironmentVariable("KINETICA_URL") ?? "http://localhost:9191";
377  var user = Environment.GetEnvironmentVariable("KINETICA_USER") ?? "admin";
378  var password = Environment.GetEnvironmentVariable("KINETICA_PASSWORD") ?? "secret";
379 
380  Console.WriteLine($"Connecting to Kinetica at: {url}");
381 
382  // Create Kinetica connection
383  var kinetica = new Kinetica(url, new Kinetica.Options
384  {
385  Username = user,
386  Password = password
387  });
388 
389  // Setup table
390  var schemaName = "test_schema";
391  var tableName = $"{schemaName}.dashboard_test";
392 
393  // Create schema (ignore if exists)
394  try
395  {
396  kinetica.executeSql($"CREATE SCHEMA IF NOT EXISTS {schemaName}");
397  }
398  catch { }
399 
400  // Drop existing table
401  try
402  {
403  kinetica.executeSql($"DROP TABLE IF EXISTS {tableName}");
404  }
405  catch { }
406 
407  // Create table
408  kinetica.executeSql($@"
409  CREATE TABLE {tableName} (
410  id INT NOT NULL,
411  category VARCHAR(64),
412  value DOUBLE,
413  timestamp LONG,
414  PRIMARY KEY (id),
415  SHARD KEY (id)
416  )
417  ");
418 
419  Console.WriteLine($"Created table: {tableName}");
420 
421  // Get the KineticaType for the table
422  var showTableResponse = kinetica.showTable(tableName, null);
423  var typeId = showTableResponse.type_ids[0];
424  var ktype = KineticaType.fromTypeID(kinetica, typeId);
425 
426  // Create BulkInserter
427  var options = new BulkInserterOptions
428  {
429  BatchSize = 10000,
430  MaxInFlightBatches = 50,
431  FlushIntervalSeconds = 10,
432  };
433 
434  var inserter = new BulkInserter<DashboardTestRecord>(
435  kinetica, tableName, ktype, options);
436 
437  Console.WriteLine("BulkInserter created");
438  Console.WriteLine();
439 
440  // Create MetricsCollector
441  var collector = new MetricsCollector<DashboardTestRecord>(inserter);
442  Console.WriteLine("MetricsCollector created, connected to the BulkInserter");
443  Console.WriteLine();
444 
445  // Create console dashboard
446  var dashboard = new ConsoleDashboard(TimeSpan.FromSeconds(1));
447 
448  // Cancellation token for graceful shutdown
449  var cts = new CancellationTokenSource();
450  Console.CancelKeyPress += (s, e) =>
451  {
452  e.Cancel = true;
453  cts.Cancel();
454  };
455 
456  // Start producer task
457  var producerTask = Task.Run(async () =>
458  {
459  int id = 0;
460  var categories = new[] { "A", "B", "C", "D", "E" };
461 
462  while (!cts.Token.IsCancellationRequested)
463  {
464  // Generate batch of records
465  var batch = new List<DashboardTestRecord>();
466  for (int i = 0; i < 1000; i++)
467  {
468  id++;
469  batch.Add(new DashboardTestRecord
470  {
471  id = id,
472  category = categories[i % categories.Length],
473  value = id * 0.01,
474  timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
475  });
476  }
477 
478  // Insert using the BulkInserter
479  try
480  {
481  await inserter.InsertBatchAsync(batch);
482  }
483  catch (Exception ex)
484  {
485  Console.Error.WriteLine($"Insert error: {ex.Message}");
486  }
487 
488  await Task.Delay(10, cts.Token);
489  }
490  }, cts.Token);
491 
492  // Dashboard update loop
493  Console.WriteLine("Starting dashboard - monitoring the active BulkInserter...\n");
494  await Task.Delay(1000);
495 
496  while (!cts.Token.IsCancellationRequested)
497  {
498  var metrics = collector.Collect();
499  dashboard.Render(metrics);
500 
501  try
502  {
503  await Task.Delay(1000, cts.Token);
504  }
505  catch (OperationCanceledException)
506  {
507  break;
508  }
509  }
510 
511  // Graceful shutdown
512  Console.WriteLine("\nShutting down...");
513 
514  // Wait for producer to stop
515  try
516  {
517  await producerTask;
518  }
519  catch (OperationCanceledException) { }
520 
521  // Flush and close inserter
522  await inserter.CloseAsync();
523 
524  Console.WriteLine($"Final stats:");
525  Console.WriteLine($" Inserted: {inserter.CountInserted}");
526  Console.WriteLine($" Updated: {inserter.CountUpdated}");
527  Console.WriteLine($" Errors: {inserter.ErrorCount}");
528 
529  // Cleanup
530  try
531  {
532  kinetica.executeSql($"DROP TABLE IF EXISTS {tableName}");
533  }
534  catch { }
535 
536  Console.WriteLine("Done!");
537  }
538  }
539 
540  #endregion
541 }
void Render(DashboardMetrics metrics)
ConsoleDashboard(TimeSpan refreshInterval, bool showErrors=true)
DashboardMetrics Collect()
Collect current metrics snapshot from the BulkInserter.
High-performance bulk inserter for Kinetica with support for multi-head ingest,
Definition: BulkInserter.cs:28
static KineticaType fromTypeID(Kinetica kinetica, string typeId)
Create a KineticaType object based on an existing type in the database.
Renders DashboardMetrics to the console with a visual layout.
Collection of shard key column names and values.
Comprehensive metrics collected from a BulkInserter.
List< ErrorSummary > RecentErrors
Interface for extracting shard key values from a record.
MetricsCollector(BulkInserter< T > inserter)
Configuration options for the BulkInserter<T>.
Collects and aggregates metrics from a BulkInserter.