GPUdb C++ API  Version 7.1.10.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
GPUdbMultiHeadIOUtils.h
Go to the documentation of this file.
1 #ifndef __GPUDB_MULTIHEAD_IO_UTILS_H__
2 #define __GPUDB_MULTIHEAD_IO_UTILS_H__
3 
4 #include "gpudb/GPUdb.hpp"
5 #include "gpudb/Http.hpp"
6 #include "gpudb/Type.hpp"
7 
8 #include <regex>
9 
10 
11 
12 namespace gpudb
13 {
14 
15 // Forward declaration
16 class GPUdb;
17 
18 
19 /*
20  * A list of worker URLs to use for multi-head ingest.
21  */
23 {
24 private:
25 
26  typedef std::vector<gpudb::HttpUrl> worker_list;
27 
28 
29 public:
30 
31  /* Creates a <see cref="WorkerList"/> object and automatically populates it with the
32  * worker URLs from GPUdb to support multi-head ingest. ( If the
33  * specified GPUdb instance has multi-head ingest disabled, the worker
34  * list will be empty and multi-head ingest will not be used.) Note that
35  * in some cases, workers may be configured to use more than one IP
36  * address, not all of which may be accessible to the client; this
37  * constructor uses the first IP returned by the server for each worker.
38  *
39  * If multi-head ingestion is turned off, then returns the server's head
40  * node address.
41  * </summary>
42  *
43  * <param name="db">The <see cref="GPUdb"/> instance from which to
44  * obtain the worker URLs.</param>
45  */
46  WorkerList( const GPUdb &gpudb );
47 
48  /* Creates a <see cref="WorkerList"/> object and automatically populates it with the
49  * worker URLs from GPUdb to support multi-head ingest. ( If the
50  * specified GPUdb instance has multi-head ingest disabled, the worker
51  * list will be empty and multi-head ingest will not be used.) Note that
52  * in some cases, workers may be configured to use more than one IP
53  * address, not all of which may be accessible to the client; this
54  * constructor uses the provided regular expression to match the workers in each
55  * group, and only uses matching workers, if any.
56  *
57  * If multi-head ingestion is turned off, then returns the server's head
58  * node address.
59  * </summary>
60  *
61  * <param name="db">The <see cref="GPUdb"/> instance from which to
62  * obtain the worker URLs.</param>
63  * <param name="ip_regex_str">A regular expression pattern for the IPs to match.</param>
64  */
65  WorkerList( const GPUdb &gpudb, const std::string& ip_regex_str );
66 
67  /* Creates an empty WorkerList
68  */
70  {}
71 
72 // ~WorkerList();
73 
74 
75  // Return the size of this WorkerList
76  size_t size() const { return m_worker_urls.size(); }
77 
78  // Iterator related stuff
79  typedef worker_list::const_iterator const_iterator;
80  const_iterator begin() const { return m_worker_urls.begin(); }
81  const_iterator end() const { return m_worker_urls.end(); }
82 
83  // Returns if this WorkerList is empty
84  bool empty() const { return m_worker_urls.empty(); }
85 
86  // Returns a string representation of the workers contained within
87  std::string toString() const;
88 
89 
90 private:
91 
92  worker_list m_worker_urls;
93 
94  static void split_string( const std::string &in_string,
95  char delim,
96  std::vector<std::string> &elements );
97 }; // end class WorkerList
98 
99 
100 
101 /*
102  * A key based on a given record that serves as either a primary key
103  * or a shard key. The <see cref="RecordKeyBuilder"/> class creates
104  * these record keys.
105  */
107 {
108 public:
109 
110  RecordKey();
111  RecordKey( size_t buffer_size );
112  RecordKey( const RecordKey &other );
113  ~RecordKey();
114 
115  // Returns whether the key is valid at the moment
116  bool is_valid() const { return m_is_valid; }
117 
118  // Return the key's hash code
119  int32_t get_hash_code() const { return m_hash_code; }
120 
121  // Return the key's routing hash code
122  int64_t get_routing_hash() const { return m_routing_hash; }
123 
124  // Resets the key to be an empty one with the new buffer size
125  void reset( size_t buffer_size );
126 
127 
128  // Adds a boolean to the buffer.
129  void add_boolean( bool value, bool is_null );
130 
131  // Adds a char1 to the buffer
132  void add_char1( const std::string& value, bool is_null );
133 
134  // Adds a char2 to the buffer
135  void add_char2( const std::string& value, bool is_null );
136 
137  // Adds a char4 to the buffer
138  void add_char4( const std::string& value, bool is_null );
139 
140  // Adds a char8 to the buffer
141  void add_char8( const std::string& value, bool is_null );
142 
143  // Adds a char16 to the buffer
144  void add_char16( const std::string& value, bool is_null );
145 
146  // Adds a char32 to the buffer
147  void add_char32( const std::string& value, bool is_null );
148 
149  // Adds a char64 to the buffer
150  void add_char64( const std::string& value, bool is_null );
151 
152  // Adds a char128 to the buffer
153  void add_char128( const std::string& value, bool is_null );
154 
155  // Adds a char256 to the buffer
156  void add_char256( const std::string& value, bool is_null );
157 
158  // Adds a date to the buffer
159  void add_date( const std::string& value, bool is_null );
160 
161  // Adds a datetime to the buffer
162  void add_datetime( const std::string& value, bool is_null );
163 
164  // Adds a decimal to the buffer
165  void add_decimal( const std::string& value, bool is_null );
166 
167  // Adds a double to the buffer
168  void add_double( double value, bool is_null );
169 
170  // Adds a float to the buffer
171  void add_float( float value, bool is_null );
172 
173  // Adds an int8 to the buffer
174  void add_int8( int8_t value, bool is_null );
175 
176  // Adds an int16 to the buffer
177  void add_int16( int16_t value, bool is_null );
178 
179  // Adds an integer to the buffer
180  void add_int( int32_t value, bool is_null );
181 
182  // Adds a IPv4 address to the buffer
183  void add_ipv4( const std::string& value, bool is_null );
184 
185  // Adds a long to the buffer
186  void add_long( int64_t value, bool is_null );
187 
188  // Adds a time to the buffer
189  void add_time( const std::string& value, bool is_null );
190 
191  // Adds a timestamp (long) to the buffer
192  void add_timestamp( int64_t value, bool is_null );
193 
194  // Adds (the hash value of) a string to the buffer
195  void add_string( const std::string& value, bool is_null );
196 
197  // Adds an unsigned long value to the buffer
198  void add_ulong( const std::string& value, bool is_null );
199 
200  // Adds a uuid to the buffer
201  void add_uuid( const std::string& value, bool is_null );
202 
203 
205  void compute_hash();
206 
209  size_t route( const std::vector<int32_t>& routing_table ) const;
210 
212  // ulong value
213  static bool verify_ulong_value( const std::string& value );
214 
216  RecordKey& operator=(const RecordKey& other);
217 
219  bool operator==(const RecordKey& rhs) const;
220  bool operator!=(const RecordKey& rhs) const { return !(*this == rhs); }
221 
223  bool operator<(const RecordKey& rhs) const;
224  bool operator>(const RecordKey& rhs) const { return ( !(*this < rhs)
225  && !(*this == rhs) ); }
226 
227  std::string toString( const std::string& separator = " " ) const;
228 private:
229 
230  // Copy contents of another key into this one
231  void copy( const RecordKey& other );
232 
233  // Returns whether the buffer is full or not
234  bool is_buffer_full( bool throw_if_full = true ) const;
235 
236  // Check whether the buffer will overflow if we attempt to add n more bytes
237  bool will_buffer_overflow( int n, bool throw_if_overflow = true ) const;
238 
239  // Adds a single byte to the buffer; does the accounting, too
240  void add( uint8_t b );
241 
242  std::vector<unsigned char> m_buffer;
243  size_t m_buffer_size;
244  size_t m_current_size;
245  int32_t m_hash_code;
246  int64_t m_routing_hash;
247  bool m_is_valid;
248  bool m_key_is_complete;
249 
250 }; // end class RecordKey
251 
252 
253 
255 {
256 private:
257 
258  enum ColumnType_T
259  {
260  BOOLEAN,
261  CHAR1,
262  CHAR2,
263  CHAR4,
264  CHAR8,
265  CHAR16,
266  CHAR32,
267  CHAR64,
268  CHAR128,
269  CHAR256,
270  DATE,
271  DATETIME,
272  DECIMAL,
273  DOUBLE,
274  FLOAT,
275  INT,
276  INT8,
277  INT16,
278  IPV4,
279  LONG,
280  STRING,
281  TIME,
282  TIMESTAMP,
283  ULONG,
284  UUID,
285  };
286 
287  // Some typedefs for nullable types
288  typedef boost::optional<int32_t> nullableInt;
289 
290  gpudb::Type m_record_type;
291  std::vector<int32_t> m_pk_shard_key_indices;
292  std::vector<ColumnType_T> m_column_types;
293  size_t m_key_buffer_size;
294 
295  RecordKeyBuilder() : m_record_type( gpudb::Type( "empty_type" ) ) {}
296 
297 public:
298 
299  // Constructs a RecordKey builder
300  RecordKeyBuilder( bool is_primary_key, const gpudb::Type& record_type );
301 
302  // Build a RecordKey object based on a generic record
303  bool build( const gpudb::GenericRecord& record, RecordKey& record_key ) const;
304 
305  /*
306  * Build a key-lookup expression based on a generic record.
307  *
308  * Returns true if expression building succeeded, false otherwise.
309  */
310  bool buildExpression( const gpudb::GenericRecord& record,
311  std::string& result ) const;
312 
313  // Returns whether this builder builds any routing keys. That is,
314  // if there are any routing columns in the relevant record type
315  bool has_key() const { return !m_pk_shard_key_indices.empty(); }
316 
317  // Returns true if the other RecordKeyBuilder is equivalent to this builder
318  bool operator==(const RecordKeyBuilder& rhs) const;
319 
320  bool operator!=(const RecordKeyBuilder& rhs) const { return !(*this == rhs); }
321 }; // end class RecordKeyBuilder
322 
323 
324 
325 /*
326  * The WorkerQueue class maintains queues of record to be inserted
327  * into GPUdb. It is templated on the type of the record that is to
328  * be ingested into the DB server.
329  */
331 {
332 public:
333 
334  // We need a shared pointer to move vectors of records around
335  typedef std::vector<gpudb::GenericRecord> recordVector_T;
336 
337 private:
338 
339  gpudb::HttpUrl m_url;
340  size_t m_capacity;
341  recordVector_T m_queue;
342 
343  WorkerQueue();
344 
345 public:
346 
347  // Takes a string for the url; capacity 1, no PK, no update on existing PK
348  // (this is used by the RecordRetriever class which doesn't care about
349  // the other arguments)
350  WorkerQueue( const std::string& url );
351 
352  // Takes a string for the url
353  WorkerQueue( const std::string& url, size_t capacity );
354  ~WorkerQueue();
355 
357  void clear();
358 
360  const gpudb::HttpUrl& get_url() const { return m_url; }
361 
363  void flush( recordVector_T& flushed_records );
364 
366  bool insert( const gpudb::GenericRecord& record,
367  const RecordKey& key,
368  recordVector_T& flushed_records );
369 
370 
371 }; // end class WorkerQueue
372 
373 
374 
375 
376 
377 } // namespace gpudb
378 
379 
380 
381 #endif // __GPUDB_MULTIHEAD_IO_UTILS_H__
382 
383 
bool operator!=(const RecordKeyBuilder &rhs) const
void clear()
Clear queue without sending.
void add_date(const std::string &value, bool is_null)
bool build(const gpudb::GenericRecord &record, RecordKey &record_key) const
void reset(size_t buffer_size)
void add_string(const std::string &value, bool is_null)
void add_char2(const std::string &value, bool is_null)
bool operator<(const RecordKey &rhs) const
Returns true if this RecordKey is less than the other key.
void add_char32(const std::string &value, bool is_null)
void add_char8(const std::string &value, bool is_null)
void add_boolean(bool value, bool is_null)
void add_ipv4(const std::string &value, bool is_null)
RecordKey & operator=(const RecordKey &other)
The assignment operator.
bool buildExpression(const gpudb::GenericRecord &record, std::string &result) const
void add_int8(int8_t value, bool is_null)
void add_time(const std::string &value, bool is_null)
static bool verify_ulong_value(const std::string &value)
A static utility function for verifying if a given string is a valid.
void add_char128(const std::string &value, bool is_null)
std::vector< gpudb::GenericRecord > recordVector_T
void add_timestamp(int64_t value, bool is_null)
size_t route(const std::vector< int32_t > &routing_table) const
Given a routing table consisting of worker rank indices, choose a worker rank based on the hash of th...
const_iterator begin() const
bool insert(const gpudb::GenericRecord &record, const RecordKey &key, recordVector_T &flushed_records)
Inserts a record into the queue.
void add_ulong(const std::string &value, bool is_null)
int32_t get_hash_code() const
const_iterator end() const
bool operator!=(const RecordKey &rhs) const
std::string toString(const std::string &separator=" ") const
bool operator==(const RecordKey &rhs) const
Returns true if the other RecordKey is equivalent to this key.
void add_char64(const std::string &value, bool is_null)
void add_double(double value, bool is_null)
void add_char1(const std::string &value, bool is_null)
bool operator==(const RecordKeyBuilder &rhs) const
void add_long(int64_t value, bool is_null)
void add_int(int32_t value, bool is_null)
bool operator>(const RecordKey &rhs) const
void add_decimal(const std::string &value, bool is_null)
void add_uuid(const std::string &value, bool is_null)
void add_datetime(const std::string &value, bool is_null)
void add_char16(const std::string &value, bool is_null)
void compute_hash()
Compute the hash of the key in the buffer.
void add_char4(const std::string &value, bool is_null)
std::string toString() const
void add_char256(const std::string &value, bool is_null)
worker_list::const_iterator const_iterator
void add_int16(int16_t value, bool is_null)
void add_float(float value, bool is_null)
void flush(recordVector_T &flushed_records)
Returns the current queue and creates a new internal queue.
const gpudb::HttpUrl & get_url() const
Returns the URL in string format for this worker.
int64_t get_routing_hash() const