Kinetica C# API  Version 6.2.0.1
WorkerQueue.cs
Go to the documentation of this file.
1 using System;
2 using System.Collections.Generic;
3 
4 
5 namespace kinetica.Utils
6 {
7  internal sealed class WorkerQueue<T>
8  {
9  public System.Uri url { get; private set; }
10  private int capacity;
11  private bool has_primary_key;
12  private bool update_on_existing_pk;
13  private IList<T> queue;
14  private Dictionary<RecordKey, int> primary_key_map;
15 
16 
21  public WorkerQueue( System.Uri url )
22  {
23  this.url = url;
24  this.capacity = 1;
25  this.has_primary_key = false;
26  this.update_on_existing_pk = false;
27 
28  queue = new List<T>();
29  } // end constructor WorkerQueue<T>
30 
31 
32 
40  public WorkerQueue(System.Uri url, int capacity, bool has_primary_key, bool update_on_existing_pk)
41  {
42  this.url = url;
43  this.capacity = capacity;
44  this.has_primary_key = has_primary_key;
45  this.update_on_existing_pk = update_on_existing_pk;
46 
47  queue = new List<T>();
48 
49  // If the type has primary keys, then initialize with a
50  // capacity of 75% of the final capacity
51  if (this.has_primary_key)
52  primary_key_map = new Dictionary<RecordKey, int>((int)Math.Round(this.capacity / 0.75));
53  } // end constructor WorkerQueue<T>
54 
55 
56 
61  public IList<T> flush()
62  {
63  IList<T> old_queue = this.queue;
64  queue = new List<T>(this.capacity);
65 
66  // Clear the primary key map if one exists
67  if (this.primary_key_map != null)
68  this.primary_key_map.Clear();
69 
70  return old_queue;
71  } // end flush
72 
73 
74 
82  public IList<T> insert(T record, RecordKey key)
83  {
84  if (this.has_primary_key && key.isValid())
85  {
86  // We are to update the record even if the primary key already exists
87  if (this.update_on_existing_pk)
88  {
89  int key_idx;
90 
91  if (this.primary_key_map.TryGetValue(key, out key_idx))
92  {
93  // Key exists, so we need to replace the associated record
94  this.queue[key_idx] = record;
95  }
96  else // key does not exist; add the record and
97  { // update the key->record mapping
98  this.queue.Add(record);
99  this.primary_key_map.Add(key, (this.queue.Count - 1));
100  }
101  }
102  else // do NOT update/add the record if the key already exists
103  {
104  if (this.primary_key_map.ContainsKey(key))
105  return null; // yup, the key already exists
106 
107  // The key does not exist, so add the record and
108  // update the key->record map
109  this.queue.Add(record);
110  this.primary_key_map.Add(key, (this.queue.Count - 1));
111  }
112  }
113  else // simply add the record
114  {
115  queue.Add(record);
116  }
117 
118  // If the queue is full, then flush and return the 'old' queue
119  if (queue.Count == capacity)
120  return flush();
121  else // no records to return
122  return null;
123  } // end insert
124  } // end class WorkerQueue
125 
126 } // end namespace kinetica.Utils