3 using System.Collections.Generic;
16 public class RecordRetriever<T> where T : new()
20 public string table_name {
get; }
22 private Utils.RecordKeyBuilder<T> shard_key_builder;
23 private IList<int> routing_table;
24 private IList<Utils.WorkerQueue<T>> worker_queues;
25 private Random random;
37 Utils.WorkerList workers = null)
39 this.kineticaDB = kdb;
40 this.table_name = table_name;
45 this.shard_key_builder =
new Utils.RecordKeyBuilder<T>(
false, this.ktype);
47 if (!this.shard_key_builder.hasKey())
48 this.shard_key_builder = null;
53 this.worker_queues =
new List<Utils.WorkerQueue<T>>();
57 if ((workers == null) || (workers.Count == 0))
64 if ((workers != null) && (workers.Count > 0))
67 foreach (System.Uri worker_url in workers)
69 string get_records_worker_url_str = (worker_url.ToString() +
"get/records");
70 System.Uri url =
new System.Uri( get_records_worker_url_str );
71 Utils.WorkerQueue<T> worker_queue =
new Utils.WorkerQueue<T>( url );
72 this.worker_queues.Add( worker_queue );
76 this.routing_table = kdb.adminShowShards().rank;
78 for (
int i = 0; i < routing_table.Count; ++i)
80 if (this.routing_table[i] > this.worker_queues.Count)
86 string get_records_url_str = ( kdb.URL.ToString() +
"get/records" );
87 System.Uri url =
new System.Uri( get_records_url_str );
88 Utils.WorkerQueue<T> worker_queue =
new Utils.WorkerQueue<T>( url );
89 this.worker_queues.Add(worker_queue);
90 this.routing_table = null;
99 this.random =
new Random((
int)DateTime.Now.Ticks);
125 string expression = null )
127 if ( this.shard_key_builder == null)
128 throw new KineticaException(
"Cannot get by key from unsharded table: " + this.table_name );
133 string full_expression = this.shard_key_builder.buildExpression( record );
134 if ( full_expression == null )
136 if ( expression != null )
137 full_expression = (full_expression +
" and (" + expression +
")");
140 IDictionary<string, string> options =
new Dictionary<string, string>();
141 options[GetRecordsRequest.Options.EXPRESSION] = full_expression;
142 options[GetRecordsRequest.Options.FAST_INDEX_LOOKUP] = GetRecordsRequest.Options.TRUE;
150 if ( this.routing_table == null )
152 return kineticaDB.getRecords<T>( request );
161 Utils.RecordKey shard_key = this.shard_key_builder.build( record );
162 System.Uri url = this.worker_queues[ shard_key.route( this.routing_table ) ].url;
167 decoded_response.table_name = raw_response.table_name;
168 decoded_response.type_name = raw_response.type_name;
169 decoded_response.type_schema = raw_response.type_schema;
170 decoded_response.has_more_records = raw_response.has_more_records;
171 decoded_response.total_number_of_records = raw_response.total_number_of_records;
174 kineticaDB.DecodeRawBinaryDataUsingRecordType( ktype,
175 raw_response.records_binary,
176 decoded_response.data );
177 return decoded_response;
182 }
catch ( Exception ex )
A list of worker URLs to use for multi-head ingest.
GetRecordsResponse< T > getRecordsByKey(T record, string expression=null)
Retrieves records for a given shard key, optionally further limited by an additional expression...
const int END_OF_SET
No Limit
A set of results returned by Kinetica.getRecords{T}(string,long,long,IDictionary{string, string}).
A set of results returned by Kinetica.getRecords{T}(string,long,long,IDictionary{string, string}).
A set of parameters for Kinetica.getRecords{T}(string,long,long,IDictionary{string, string}).
RecordRetriever(Kinetica kdb, string table_name, KineticaType ktype, Utils.WorkerList workers=null)
Create a RecordRetriever object with the given parameters.
API to talk to Kinetica Database