14 using System.Collections.Concurrent;
15 using System.Collections.Generic;
16 using System.Diagnostics;
17 using System.Threading;
18 using System.Threading.Tasks;
25 #region Dashboard Metrics 79 #region Metrics Collector 89 private readonly Stopwatch _startTime;
92 private long _prevInserted;
93 private long _prevUpdated;
94 private int _prevErrors;
95 private Stopwatch _prevTimestamp;
98 private readonly ConcurrentQueue<double> _rateHistory =
new();
99 private const int MaxHistorySize = 60;
103 _inserter = inserter;
104 _startTime = Stopwatch.StartNew();
105 _prevTimestamp = Stopwatch.StartNew();
114 var now = Stopwatch.StartNew();
115 var uptime = _startTime.Elapsed;
118 var totalInserted = _inserter.CountInserted;
119 var totalUpdated = _inserter.CountUpdated;
120 var totalProcessed = totalInserted + totalUpdated;
123 var bpMetrics = _inserter.GetBackpressureMetrics();
129 var workerCount = _inserter.NumWorkers;
130 var batchSize = bpMetrics.MaxInFlightBatches > 0 ? 10000 : 10000;
131 var multiHeadEnabled = workerCount > 1;
134 var elapsed = _prevTimestamp.Elapsed.TotalSeconds;
135 _prevTimestamp.Restart();
137 var insertRate = elapsed > 0 ? (totalInserted - _prevInserted) / elapsed : 0;
138 var updateRate = elapsed > 0 ? (totalUpdated - _prevUpdated) / elapsed : 0;
139 var throughputRate = insertRate + updateRate;
140 var errorRate = elapsed > 0 ? (errorCount - _prevErrors) / elapsed : 0;
143 _rateHistory.Enqueue(throughputRate);
144 while (_rateHistory.Count > MaxHistorySize)
145 _rateHistory.TryDequeue(out _);
148 _prevInserted = totalInserted;
149 _prevUpdated = totalUpdated;
150 _prevErrors = errorCount;
153 var avgBatchTimeMs = throughputRate > 0
154 ? (batchSize / throughputRate) * 1000.0
157 var estimatedQueueDrainSecs = throughputRate > 0
158 ? (bpMetrics.PendingBatches * batchSize) / throughputRate
159 :
double.PositiveInfinity;
163 TimestampMs = (long)uptime.TotalMilliseconds,
164 UptimeSecs = uptime.TotalSeconds,
166 TotalInserted = totalInserted,
167 TotalUpdated = totalUpdated,
168 TotalProcessed = totalProcessed,
170 InsertRate = insertRate,
171 UpdateRate = updateRate,
172 ThroughputRate = throughputRate,
175 InFlightBatches = bpMetrics.InFlightBatches,
176 AvailablePermits = bpMetrics.AvailablePermits,
177 MaxInFlight = bpMetrics.MaxInFlightBatches,
178 UtilizationPct = bpMetrics.UtilizationPercent,
180 ErrorCount = errorCount,
181 ErrorRate = errorRate,
183 WorkerCount = workerCount,
184 BatchSize = batchSize,
185 MultiHeadEnabled = multiHeadEnabled,
187 AvgBatchTimeMs = avgBatchTimeMs,
188 EstimatedQueueDrainSecs = estimatedQueueDrainSecs,
194 if (_rateHistory.IsEmpty)
return 0;
197 foreach (var rate
in _rateHistory)
202 return count > 0 ? sum / count : 0;
208 #region Console Dashboard 215 private readonly TimeSpan _refreshInterval;
216 private readonly
bool _showErrors;
220 _refreshInterval = refreshInterval;
221 _showErrors = showErrors;
224 private void ClearScreen()
226 Console.Write(
"\x1B[2J\x1B[H");
233 Console.WriteLine(
"+----------------------------------------------------------------------------------+");
234 Console.WriteLine(
"| BULKINSERTER MONITORING DASHBOARD |");
235 Console.WriteLine(
"+----------------------------------------------------------------------------------+");
238 Console.WriteLine($
"| Uptime: {metrics.UptimeSecs,10:F1}s {DateTime.Now:yyyy-MM-dd HH:mm:ss} |");
240 Console.WriteLine(
"+----------------------------------------------------------------------------------+");
241 Console.WriteLine(
"| THROUGHPUT |");
242 Console.WriteLine(
"+----------------------------------------------------------------------------------+");
245 Console.WriteLine($
"| Records Inserted: {FormatNumber(metrics.TotalInserted),15} Records Updated: {FormatNumber(metrics.TotalUpdated),15} |");
246 Console.WriteLine($
"| Total Processed: {FormatNumber(metrics.TotalProcessed),15} |");
247 Console.WriteLine(
"| |");
251 Console.WriteLine($
"| Insert Rate: {metrics.InsertRate,12:F0} rec/s |");
252 Console.WriteLine($
"| Update Rate: {metrics.UpdateRate,12:F0} rec/s |");
253 Console.WriteLine($
"| Total Rate: {metrics.ThroughputRate,12:F0} rec/s [{rateBar}] |");
255 Console.WriteLine(
"+----------------------------------------------------------------------------------+");
256 Console.WriteLine(
"| QUEUE & BACKPRESSURE |");
257 Console.WriteLine(
"+----------------------------------------------------------------------------------+");
260 Console.WriteLine($
"| Pending Batches: {metrics.PendingBatches,6} In-Flight: {metrics.InFlightBatches,6} Available: {metrics.AvailablePermits,6} |");
269 Console.WriteLine($
"| Utilization: {metrics.UtilizationPct,5:F1}% [{utilBar}] {utilStatus,12} |");
273 Console.WriteLine($
"| Est. Queue Drain: {metrics.EstimatedQueueDrainSecs,6:F1}s |");
277 Console.WriteLine($
"| Est. Queue Drain: N/A (no throughput) |");
280 Console.WriteLine(
"+----------------------------------------------------------------------------------+");
281 Console.WriteLine(
"| ERRORS |");
282 Console.WriteLine(
"+----------------------------------------------------------------------------------+");
284 var errorStatus = metrics.
ErrorCount > 0 ?
"!" :
"+";
285 Console.WriteLine($
"| Errors: {metrics.ErrorCount,6} {errorStatus} Error Rate: {metrics.ErrorRate,8:F2}/s |");
290 Console.WriteLine(
"| |");
291 Console.WriteLine(
"| Recent Errors: |");
292 for (
int i = 0; i < Math.Min(3, metrics.
RecentErrors.Count); i++)
295 var msg = err.Message.Length > 50 ? err.Message[..47] +
"..." : err.Message;
296 Console.WriteLine($
"| {i + 1}. {msg} ({err.RecordCount} recs)");
300 Console.WriteLine(
"+----------------------------------------------------------------------------------+");
301 Console.WriteLine(
"| CONFIGURATION |");
302 Console.WriteLine(
"+----------------------------------------------------------------------------------+");
305 Console.WriteLine($
"| Workers: {metrics.WorkerCount,3} Batch Size: {metrics.BatchSize,6} Multi-Head: {multiHead,8} |");
306 Console.WriteLine($
"| Max In-Flight: {metrics.MaxInFlight,3} Avg Batch Time: {metrics.AvgBatchTimeMs,8:F1}ms |");
308 Console.WriteLine(
"+----------------------------------------------------------------------------------+");
310 Console.WriteLine(
"Press Ctrl+C to stop");
313 private static string FormatNumber(
long n)
315 if (n >= 1_000_000_000)
316 return $
"{n / 1_000_000_000.0:F2}B";
318 return $
"{n / 1_000_000.0:F2}M";
320 return $
"{n / 1_000.0:F2}K";
324 private static string CreateRateBar(
double rate,
double maxRate,
int width)
326 var filled = (int)Math.Min(width, (rate / maxRate) * width);
327 var empty = width - filled;
328 return new string(
'#', filled) +
new string(
'-', empty);
331 private static string CreateUtilizationBar(
double pct,
int width)
333 var filled = (int)Math.Min(width, (pct / 100.0) * width);
334 var empty = width - filled;
336 var barChar = pct > 90 ?
'!' : pct > 70 ?
'*' :
'#';
337 return new string(barChar, filled) +
new string(
'-', empty);
343 #region Dashboard Example 352 public int id {
get;
set; }
353 public string category {
get;
set; } =
"";
354 public double value {
get;
set; }
355 public long timestamp {
get;
set; }
365 Console.WriteLine(
"=============================================================");
366 Console.WriteLine(
" BulkInserter Monitoring Dashboard Example");
367 Console.WriteLine(
"=============================================================");
369 Console.WriteLine(
"This example demonstrates how to:");
370 Console.WriteLine(
" 1. Create a BulkInserter for data ingestion");
371 Console.WriteLine(
" 2. Create a MetricsCollector to monitor the inserter");
372 Console.WriteLine(
" 3. Run monitoring in parallel with data ingestion");
376 var url = Environment.GetEnvironmentVariable(
"KINETICA_URL") ??
"http://localhost:9191";
377 var user = Environment.GetEnvironmentVariable(
"KINETICA_USER") ??
"admin";
378 var password = Environment.GetEnvironmentVariable(
"KINETICA_PASSWORD") ??
"secret";
380 Console.WriteLine($
"Connecting to Kinetica at: {url}");
390 var schemaName =
"test_schema";
391 var tableName = $
"{schemaName}.dashboard_test";
396 kinetica.executeSql($
"CREATE SCHEMA IF NOT EXISTS {schemaName}");
403 kinetica.executeSql($
"DROP TABLE IF EXISTS {tableName}");
409 CREATE TABLE {tableName} ( 411 category VARCHAR(64), 419 Console.WriteLine($
"Created table: {tableName}");
422 var showTableResponse =
kinetica.showTable(tableName,
null);
423 var typeId = showTableResponse.type_ids[0];
430 MaxInFlightBatches = 50,
431 FlushIntervalSeconds = 10,
435 kinetica, tableName, ktype, options);
437 Console.WriteLine(
"BulkInserter created");
442 Console.WriteLine(
"MetricsCollector created, connected to the BulkInserter");
449 var cts =
new CancellationTokenSource();
450 Console.CancelKeyPress += (s, e) =>
457 var producerTask = Task.Run(async () =>
460 var categories =
new[] {
"A",
"B",
"C",
"D",
"E" };
462 while (!cts.Token.IsCancellationRequested)
465 var batch = new List<DashboardTestRecord>();
466 for (int i = 0; i < 1000; i++)
469 batch.Add(new DashboardTestRecord
472 category = categories[i % categories.Length],
474 timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
481 await inserter.InsertBatchAsync(batch);
485 Console.Error.WriteLine($
"Insert error: {ex.Message}");
488 await Task.Delay(10, cts.Token);
493 Console.WriteLine(
"Starting dashboard - monitoring the active BulkInserter...\n");
494 await Task.Delay(1000);
496 while (!cts.Token.IsCancellationRequested)
498 var metrics = collector.Collect();
499 dashboard.Render(metrics);
503 await Task.Delay(1000, cts.Token);
505 catch (OperationCanceledException)
512 Console.WriteLine(
"\nShutting down...");
519 catch (OperationCanceledException) { }
522 await inserter.CloseAsync();
524 Console.WriteLine($
"Final stats:");
525 Console.WriteLine($
" Inserted: {inserter.CountInserted}");
526 Console.WriteLine($
" Updated: {inserter.CountUpdated}");
527 Console.WriteLine($
" Errors: {inserter.ErrorCount}");
532 kinetica.executeSql($
"DROP TABLE IF EXISTS {tableName}");
536 Console.WriteLine(
"Done!");
static async Task RunAsync()
void Render(DashboardMetrics metrics)
ConsoleDashboard(TimeSpan refreshInterval, bool showErrors=true)
DashboardMetrics Collect()
Collect current metrics snapshot from the BulkInserter.
High-performance bulk inserter for Kinetica with support for multi-head ingest,
static KineticaType fromTypeID(Kinetica kinetica, string typeId)
Create a KineticaType object based on an existing type in the database.
Renders DashboardMetrics to the console with a visual layout.
Collection of shard key column names and values.
Comprehensive metrics collected from a BulkInserter.
double EstimatedQueueDrainSecs
List< ErrorSummary > RecentErrors
MetricsCollector(BulkInserter< T > inserter)
Configuration options for the BulkInserter<T>.
double GetAvgThroughput()
Collects and aggregates metrics from a BulkInserter.