Kinetica   C#   API  Version 7.2.3.1
StripedWorkerQueue.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 using System.Threading;
4 
5 namespace kinetica;
6 
11  internal sealed class Stripe<T>
12  {
13  private readonly object _lock = new();
14  private List<T> _records;
15  private readonly int _capacity;
16 
17  public Stripe(int capacity)
18  {
19  _capacity = capacity;
20  _records = new List<T>(capacity);
21  }
22 
26  public List<T>? Add(T record)
27  {
28  lock (_lock)
29  {
30  _records.Add(record);
31  if (_records.Count >= _capacity)
32  {
33  var batch = _records;
34  _records = new List<T>(_capacity);
35  return batch;
36  }
37  return null;
38  }
39  }
40 
44  public List<List<T>> AddRange(IReadOnlyList<T> records)
45  {
46  var batches = new List<List<T>>();
47  lock (_lock)
48  {
49  foreach (var record in records)
50  {
51  _records.Add(record);
52  if (_records.Count >= _capacity)
53  {
54  batches.Add(_records);
55  _records = new List<T>(_capacity);
56  }
57  }
58  }
59  return batches;
60  }
61 
65  public List<T>? Flush()
66  {
67  lock (_lock)
68  {
69  if (_records.Count == 0)
70  return null;
71 
72  var batch = _records;
73  _records = new List<T>(_capacity);
74  return batch;
75  }
76  }
77 
81  public int Count
82  {
83  get
84  {
85  lock (_lock)
86  {
87  return _records.Count;
88  }
89  }
90  }
91  }
92 
98  internal sealed class StripedWorkerQueue<T>
99  {
100  private readonly Stripe<T>[] _stripes;
101  private readonly int _numStripes;
102  private readonly int _stripeMask;
103 
107  public Uri Url { get; }
108 
112  public int WorkerIndex { get; }
113 
121  public StripedWorkerQueue(Uri url, int workerIndex, int numStripes, int batchSize)
122  {
123  Url = url;
124  WorkerIndex = workerIndex;
125 
126  // Round up to nearest power of 2 for efficient modulo operation
127  _numStripes = RoundUpToPowerOf2(numStripes);
128  _stripeMask = _numStripes - 1;
129 
130  _stripes = new Stripe<T>[_numStripes];
131  for (int i = 0; i < _numStripes; i++)
132  {
133  _stripes[i] = new Stripe<T>(batchSize);
134  }
135  }
136 
137  private static int RoundUpToPowerOf2(int n)
138  {
139  n--;
140  n |= n >> 1;
141  n |= n >> 2;
142  n |= n >> 4;
143  n |= n >> 8;
144  n |= n >> 16;
145  n++;
146  return Math.Max(1, n);
147  }
148 
157  public List<T>? Add(T record, long stripeHash, out int stripeIndex)
158  {
159  stripeIndex = (int)(stripeHash & _stripeMask);
160  return _stripes[stripeIndex].Add(record);
161  }
162 
166  public List<T>? AddToStripe(T record, int stripeIndex)
167  {
168  return _stripes[stripeIndex & _stripeMask].Add(record);
169  }
170 
174  public List<List<T>> AddRangeToStripe(IReadOnlyList<T> records, int stripeIndex)
175  {
176  return _stripes[stripeIndex & _stripeMask].AddRange(records);
177  }
178 
183  public List<(int stripeIndex, List<T> batch)> FlushAll()
184  {
185  var batches = new List<(int, List<T>)>();
186  for (int i = 0; i < _numStripes; i++)
187  {
188  var batch = _stripes[i].Flush();
189  if (batch != null && batch.Count > 0)
190  {
191  batches.Add((i, batch));
192  }
193  }
194  return batches;
195  }
196 
200  public int TotalCount
201  {
202  get
203  {
204  int count = 0;
205  for (int i = 0; i < _numStripes; i++)
206  {
207  count += _stripes[i].Count;
208  }
209  return count;
210  }
211  }
212 
216  public int NumStripes => _numStripes;
217  }
List< T >? Add(T record)
Adds a record to the stripe.
int TotalCount
Gets the total count of records across all stripes.
List< T >? AddToStripe(T record, int stripeIndex)
Adds a record to a specific stripe.
List< List< T > > AddRange(IReadOnlyList< T > records)
Adds multiple records to the stripe.
Stripe(int capacity)
A worker queue with multiple stripes to reduce lock contention.
List< List< T > > AddRangeToStripe(IReadOnlyList< T > records, int stripeIndex)
Adds multiple records to a specific stripe.
int NumStripes
Number of stripes in this queue.
List< T >? Flush()
Flushes all records from the stripe, regardless of count.
StripedWorkerQueue(Uri url, int workerIndex, int numStripes, int batchSize)
Creates a new striped worker queue.
A stripe within a worker queue.
int WorkerIndex
Index of this worker in the worker list.
int Count
Gets the current count of records in the stripe.
List<(int stripeIndex, List< T > batch)> FlushAll()
Flushes all stripes and returns all batches.
List< T >? Add(T record, long stripeHash, out int stripeIndex)
Adds a record to the appropriate stripe based on the stripe hash.
Uri Url
URL of the worker this queue is associated with.