2 using System.Collections.Generic;
3 using System.Threading.Tasks;
4 using BenchmarkDotNet.Attributes;
5 using BenchmarkDotNet.Configs;
6 using BenchmarkDotNet.Jobs;
16 [Config(typeof(BenchmarkConfig))]
18 [SimpleJob(RuntimeMoniker.Net80, warmupCount: 1, iterationCount: 3)]
22 private string _tableName =
null!;
24 private List<BenchRecord> _records =
null!;
31 var url = Environment.GetEnvironmentVariable(
"KINETICA_URL") ??
"http://localhost:9191";
32 var username = Environment.GetEnvironmentVariable(
"KINETICA_USER") ??
"admin";
33 var password = Environment.GetEnvironmentVariable(
"KINETICA_PASSWORD") ??
"secret";
52 var schemaName = $
"bench_{Guid.NewGuid():N}".Substring(0, 20);
56 _kinetica.
createSchema(schemaName,
new Dictionary<string, string> { {
"no_error_if_exists",
"true" } });
60 var typeDef =
@"{""type"":""record"",""name"":""bench_record"",""fields"":[" +
61 @"{""name"":""id"",""type"":""int""}," +
62 @"{""name"":""thread_id"",""type"":""int""}," +
63 @"{""name"":""timestamp"",""type"":""long""}," +
64 @"{""name"":""name"",""type"":""string""}," +
65 @"{""name"":""score"",""type"":""double""}]}";
67 var properties =
new Dictionary<string, IList<string>>
69 {
"id",
new List<string> {
"primary_key" } },
70 {
"thread_id",
new List<string>() },
71 {
"timestamp",
new List<string>() },
72 {
"name",
new List<string>() },
73 {
"score",
new List<string>() }
76 var typeResp = _kinetica.
createType(typeDef,
"bench_type", properties,
new Dictionary<string, string>());
77 _tableName = $
"{schemaName}.bench_table";
78 _kinetica.
createTable(_tableName, typeResp.type_id,
new Dictionary<string, string>());
87 var schemaName = _tableName.Split(
'.')[0];
88 _kinetica.
executeSql($
"DROP SCHEMA IF EXISTS {schemaName} CASCADE", 0, -9999);
103 [Params(10000, 50000)]
106 [Params(1000, 10000)]
111 #region Record Generation 115 public int id {
get;
set; }
118 public string name {
get;
set; } =
string.Empty;
127 private List<BenchRecord> GenerateRecords(
int count)
129 var records =
new List<BenchRecord>(count);
130 var baseTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
132 for (
int i = 0; i < count; i++)
134 records.Add(
new BenchRecord
137 thread_id = Environment.CurrentManagedThreadId,
138 timestamp = baseTimestamp + i,
139 name = $
"record_{i:D8}",
151 [Benchmark(Baseline =
true)]
161 inserter.InsertBatch(_records);
163 await inserter.CloseAsync();
172 MaxInFlightBatches = 20
177 await inserter.InsertBatchAsync(_records);
179 await inserter.CloseAsync();
192 foreach (var record
in _records)
194 inserter.Insert(record);
197 await inserter.CloseAsync();
207 MaxInFlightBatches = 20
212 var chunkSize = _records.Count / 4;
213 var tasks =
new Task[4];
215 for (
int t = 0; t < 4; t++)
217 var start = t * chunkSize;
218 var end = (t == 3) ? _records.Count : (t + 1) * chunkSize;
219 var chunk = _records.GetRange(start, end - start);
221 tasks[t] = Task.Run(async () =>
223 foreach (var record
in chunk)
225 await inserter.InsertAsync(record);
230 await Task.WhenAll(tasks);
231 await inserter.CloseAsync();
243 foreach (var record
in _records)
245 ingestor.insert(record);
260 ingestor.insert(_records);
273 .WithIterationCount(3));
CreateTableResponse createTable(CreateTableRequest request_)
Creates a new table with the given type (definition of columns).
string? Username
Optional: User Name for Kinetica security
async Task BulkInserter_ConcurrentInserts()
ExecuteSqlResponse executeSql(ExecuteSqlRequest request_)
Execute a SQL statement (query, DML, or DDL).
void LegacyIngestor_BatchInsert()
async Task BulkInserter_SingleInserts()
High-performance bulk inserter for Kinetica with support for multi-head ingest,
Benchmarks for the BulkInserter comparing different batch sizes, insertion methods,...
static KineticaType fromTable(Kinetica kinetica, string tableName)
Create a KineticaType object based on an existing table in the database.
CreateSchemaResponse createSchema(CreateSchemaRequest request_)
Creates a SQL-style schema.
Collection of shard key column names and values.
CreateTypeResponse createType(CreateTypeRequest request_)
Creates a new type describing the columns of a table.
ShardKeyValues GetShardKeyValues()
Returns shard key column names and their typed values.
async Task BulkInserter_InsertBatchAsync()
void LegacyIngestor_SingleInserts()
async Task BulkInserter_InsertBatch()
Configuration options for the BulkInserter<T>.
API to talk to Kinetica Database
Manages the insertion into GPUdb of large numbers of records in bulk, with automatic batch management...