Kinetica C# API  Version 7.0.19.0
 All Classes Namespaces Files Functions Variables Enumerations Enumerator Properties Pages
RecordRetriever.cs
Go to the documentation of this file.
1 using Avro.IO;
2 using System;
3 using System.Collections.Generic;
4 
5 
6 namespace kinetica
7 {
16  public class RecordRetriever<T> where T : new()
17  {
18 
19  public Kinetica kineticaDB { get; }
20  public string table_name { get; }
21  private KineticaType ktype;
22  private Utils.RecordKeyBuilder<T> shard_key_builder;
23  private IList<int> routing_table;
24  private IList<Utils.WorkerQueue<T>> worker_queues;
25  private Random random;
26 
27 
35  public RecordRetriever( Kinetica kdb, string table_name,
36  KineticaType ktype,
37  Utils.WorkerList workers = null)
38  {
39  this.kineticaDB = kdb;
40  this.table_name = table_name;
41  this.ktype = ktype;
42 
43  // Set up the shard key builder
44  // ----------------------------
45  this.shard_key_builder = new Utils.RecordKeyBuilder<T>(false, this.ktype);
46  // Check if there is shard key for T
47  if (!this.shard_key_builder.hasKey())
48  this.shard_key_builder = null;
49 
50 
51  // Set up the worker queues
52  // -------------------------
53  this.worker_queues = new List<Utils.WorkerQueue<T>>();
54  try
55  {
56  // If no workers are given, try to get them from Kinetica
57  if ((workers == null) || (workers.Count == 0))
58  {
59  workers = new Utils.WorkerList(kdb);
60  }
61 
62  // If we end up with multiple workers, either given by the
63  // user or obtained from Kinetica, then use those
64  if ((workers != null) && (workers.Count > 0))
65  {
66  // Add worker queues per worker
67  foreach (System.Uri worker_url in workers)
68  {
69  string get_records_worker_url_str = (worker_url.ToString() + "get/records");
70  System.Uri url = new System.Uri( get_records_worker_url_str );
71  Utils.WorkerQueue<T> worker_queue = new Utils.WorkerQueue<T>( url );
72  this.worker_queues.Add( worker_queue );
73  }
74 
75  // Get the worker rank information from Kinetica
76  this.routing_table = kdb.adminShowShards().rank;
77  // Check that enough worker URLs are specified
78  for (int i = 0; i < routing_table.Count; ++i)
79  {
80  if (this.routing_table[i] > this.worker_queues.Count)
81  throw new KineticaException("Not enough worker URLs specified.");
82  }
83  }
84  else // multihead-ingest is NOT turned on; use the regular Kinetica IP address
85  {
86  string get_records_url_str = ( kdb.URL.ToString() + "get/records" );
87  System.Uri url = new System.Uri( get_records_url_str );
88  Utils.WorkerQueue<T> worker_queue = new Utils.WorkerQueue<T>( url );
89  this.worker_queues.Add(worker_queue);
90  this.routing_table = null;
91  }
92  }
93  catch (Exception ex)
94  {
95  throw new KineticaException(ex.ToString());
96  }
97 
98  // Create the random number generator
99  this.random = new Random((int)DateTime.Now.Ticks);
100  } // end constructor RecordRetriever
101 
102 
125  string expression = null )
126  {
127  if ( this.shard_key_builder == null)
128  throw new KineticaException( "Cannot get by key from unsharded table: " + this.table_name );
129 
130  try
131  {
132  // Build the expression
133  string full_expression = this.shard_key_builder.buildExpression( record );
134  if ( full_expression == null )
135  throw new KineticaException( "No expression could be made from given record." );
136  if ( expression != null )
137  full_expression = (full_expression + " and (" + expression + ")");
138 
139  // Create the options map for the /get/records call
140  IDictionary<string, string> options = new Dictionary<string, string>();
141  options[GetRecordsRequest.Options.EXPRESSION] = full_expression;
142  options[GetRecordsRequest.Options.FAST_INDEX_LOOKUP] = GetRecordsRequest.Options.TRUE;
143 
144  // Create a /get/records request packet
145  GetRecordsRequest request = new GetRecordsRequest( this.table_name,
146  0, Kinetica.END_OF_SET,
147  options );
148 
149  // Submit the /get/records request
150  if ( this.routing_table == null )
151  { // No routing information is available; talk to rank-0
152  return kineticaDB.getRecords<T>( request );
153  }
154  else // Talk to the appropriate worker rank
155  {
156  // Create the appropriate response objects
157  RawGetRecordsResponse raw_response = new RawGetRecordsResponse();
158  GetRecordsResponse<T> decoded_response = new GetRecordsResponse<T>();
159 
160  // Find the appropriate worker rank
161  Utils.RecordKey shard_key = this.shard_key_builder.build( record );
162  System.Uri url = this.worker_queues[ shard_key.route( this.routing_table ) ].url;
163  // Make the call
164  raw_response = this.kineticaDB.SubmitRequest<RawGetRecordsResponse>( url, request );
165 
166  // Set up the values of the decoded response properly
167  decoded_response.table_name = raw_response.table_name;
168  decoded_response.type_name = raw_response.type_name;
169  decoded_response.type_schema = raw_response.type_schema;
170  decoded_response.has_more_records = raw_response.has_more_records;
171  decoded_response.total_number_of_records = raw_response.total_number_of_records;
172 
173  // Decode the records
174  kineticaDB.DecodeRawBinaryDataUsingRecordType( ktype,
175  raw_response.records_binary,
176  decoded_response.data );
177  return decoded_response;
178  }
179  } catch ( KineticaException ex )
180  {
181  throw new KineticaException( "Error in retrieving records by key: ", ex );
182  } catch ( Exception ex )
183  {
184  throw new KineticaException( "Error in retrieving records by key: ", ex );
185  }
186  } // end getRecordsByKey()
187 
188  } // end class RecordRetriever
189 
190 } // end namespace kinetica
A list of worker URLs to use for multi-head ingest.
Definition: WorkerList.cs:11
GetRecordsResponse< T > getRecordsByKey(T record, string expression=null)
Retrieves records for a given shard key, optionally further limited by an additional expression...
const int END_OF_SET
No Limit
Definition: Kinetica.cs:45
A set of results returned by Kinetica.getRecords{T}(string,long,long,IDictionary{string, string}).
Definition: GetRecords.cs:495
A set of results returned by Kinetica.getRecords{T}(string,long,long,IDictionary{string, string}).
Definition: GetRecords.cs:539
A set of parameters for Kinetica.getRecords{T}(string,long,long,IDictionary{string, string}).
Definition: GetRecords.cs:30
RecordRetriever(Kinetica kdb, string table_name, KineticaType ktype, Utils.WorkerList workers=null)
Create a RecordRetriever object with the given parameters.
API to talk to Kinetica Database
Definition: Kinetica.cs:40