2 using System.Collections.Generic;
3 using System.Threading;
13 private readonly
object _lock =
new();
14 private List<T> _records;
15 private readonly
int _capacity;
20 _records =
new List<T>(capacity);
26 public List<T>?
Add(T record)
31 if (_records.Count >= _capacity)
34 _records =
new List<T>(_capacity);
44 public List<List<T>>
AddRange(IReadOnlyList<T> records)
46 var batches =
new List<List<T>>();
49 foreach (var record
in records)
52 if (_records.Count >= _capacity)
54 batches.Add(_records);
55 _records =
new List<T>(_capacity);
69 if (_records.Count == 0)
73 _records =
new List<T>(_capacity);
87 return _records.Count;
101 private readonly
int _numStripes;
102 private readonly
int _stripeMask;
127 _numStripes = RoundUpToPowerOf2(numStripes);
128 _stripeMask = _numStripes - 1;
131 for (
int i = 0; i < _numStripes; i++)
137 private static int RoundUpToPowerOf2(
int n)
146 return Math.Max(1, n);
157 public List<T>?
Add(T record,
long stripeHash, out
int stripeIndex)
159 stripeIndex = (int)(stripeHash & _stripeMask);
160 return _stripes[stripeIndex].Add(record);
168 return _stripes[stripeIndex & _stripeMask].Add(record);
176 return _stripes[stripeIndex & _stripeMask].AddRange(records);
183 public List<(
int stripeIndex, List<T> batch)>
FlushAll()
185 var batches =
new List<(int, List<T>)>();
186 for (
int i = 0; i < _numStripes; i++)
188 var batch = _stripes[i].Flush();
189 if (batch !=
null && batch.Count > 0)
191 batches.Add((i, batch));
205 for (
int i = 0; i < _numStripes; i++)
207 count += _stripes[i].Count;
List< T >? Add(T record)
Adds a record to the stripe.
int TotalCount
Gets the total count of records across all stripes.
List< T >? AddToStripe(T record, int stripeIndex)
Adds a record to a specific stripe.
List< List< T > > AddRange(IReadOnlyList< T > records)
Adds multiple records to the stripe.
A worker queue with multiple stripes to reduce lock contention.
List< List< T > > AddRangeToStripe(IReadOnlyList< T > records, int stripeIndex)
Adds multiple records to a specific stripe.
int NumStripes
Number of stripes in this queue.
List< T >? Flush()
Flushes all records from the stripe, regardless of count.
StripedWorkerQueue(Uri url, int workerIndex, int numStripes, int batchSize)
Creates a new striped worker queue.
A stripe within a worker queue.
int WorkerIndex
Index of this worker in the worker list.
int Count
Gets the current count of records in the stripe.
List<(int stripeIndex, List< T > batch)> FlushAll()
Flushes all stripes and returns all batches.
List< T >? Add(T record, long stripeHash, out int stripeIndex)
Adds a record to the appropriate stripe based on the stripe hash.
Uri Url
URL of the worker this queue is associated with.