GPUdb C++ API  Version 7.0.19.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
GPUdbMultiHeadIOUtils.h
Go to the documentation of this file.
1 #ifndef __GPUDB_MULTIHEAD_IO_UTILS_H__
2 #define __GPUDB_MULTIHEAD_IO_UTILS_H__
3 
4 #include "gpudb/GPUdb.hpp"
5 #include "gpudb/Http.hpp"
6 #include "gpudb/Type.hpp"
7 
8 #include <regex>
9 
10 
11 
12 namespace gpudb
13 {
14 
15 // Forward declaration
16 class GPUdb;
17 
18 
19 /*
20  * A list of worker URLs to use for multi-head ingest.
21  */
23 {
24 private:
25 
26  typedef std::vector<gpudb::HttpUrl> worker_list;
27 
28 
29 public:
30 
31  /* Creates a <see cref="WorkerList"/> object and automatically populates it with the
32  * worker URLs from GPUdb to support multi-head ingest. ( If the
33  * specified GPUdb instance has multi-head ingest disabled, the worker
34  * list will be empty and multi-head ingest will not be used.) Note that
35  * in some cases, workers may be configured to use more than one IP
36  * address, not all of which may be accessible to the client; this
37  * constructor uses the first IP returned by the server for each worker.
38  *
39  * If multi-head ingestion is turned off, then returns the server's head
40  * node address.
41  * </summary>
42  *
43  * <param name="db">The <see cref="GPUdb"/> instance from which to
44  * obtain the worker URLs.</param>
45  */
46  WorkerList( const GPUdb &gpudb );
47 
48  /* Creates a <see cref="WorkerList"/> object and automatically populates it with the
49  * worker URLs from GPUdb to support multi-head ingest. ( If the
50  * specified GPUdb instance has multi-head ingest disabled, the worker
51  * list will be empty and multi-head ingest will not be used.) Note that
52  * in some cases, workers may be configured to use more than one IP
53  * address, not all of which may be accessible to the client; this
54  * constructor uses the provided regular expression to match the workers in each
55  * group, and only uses matching workers, if any.
56  *
57  * If multi-head ingestion is turned off, then returns the server's head
58  * node address.
59  * </summary>
60  *
61  * <param name="db">The <see cref="GPUdb"/> instance from which to
62  * obtain the worker URLs.</param>
63  * <param name="ip_regex_str">A regular expression pattern for the IPs to match.</param>
64  */
65  WorkerList( const GPUdb &gpudb, const std::string& ip_regex_str );
66 
67 // ~WorkerList();
68 
69 
70  // Return the size of this WorkerList
71  size_t size() const { return m_worker_urls.size(); }
72 
73  // Iterator related stuff
74  typedef worker_list::const_iterator const_iterator;
75  const_iterator begin() const { return m_worker_urls.begin(); }
76  const_iterator end() const { return m_worker_urls.end(); }
77 
78  // Returns if this WorkerList is empty
79  bool empty() const { return m_worker_urls.empty(); }
80 
81  // Returns a string representation of the workers contained within
82  std::string toString() const;
83 
84 
85 private:
86 
87  worker_list m_worker_urls;
88 
89  static void split_string( const std::string &in_string,
90  char delim,
91  std::vector<std::string> &elements );
92 }; // end class WorkerList
93 
94 
95 
96 /*
97  * A key based on a given record that serves as either a primary key
98  * or a shard key. The <see cref="RecordKeyBuilder"/> class creates
99  * these record keys.
100  */
102 {
103 public:
104 
105  RecordKey();
106  RecordKey( size_t buffer_size );
107  RecordKey( const RecordKey &other );
108  ~RecordKey();
109 
110  // Returns whether the key is valid at the moment
111  bool is_valid() const { return m_is_valid; }
112 
113  // Return the key's hash code
114  int32_t get_hash_code() const { return m_hash_code; }
115 
116  // Resets the key to be an empty one with the new buffer size
117  void reset( size_t buffer_size );
118 
119 
120  // Adds a char1 to the buffer
121  void add_char1( const std::string& value, bool is_null );
122 
123  // Adds a char2 to the buffer
124  void add_char2( const std::string& value, bool is_null );
125 
126  // Adds a char4 to the buffer
127  void add_char4( const std::string& value, bool is_null );
128 
129  // Adds a char8 to the buffer
130  void add_char8( const std::string& value, bool is_null );
131 
132  // Adds a char16 to the buffer
133  void add_char16( const std::string& value, bool is_null );
134 
135  // Adds a char32 to the buffer
136  void add_char32( const std::string& value, bool is_null );
137 
138  // Adds a char64 to the buffer
139  void add_char64( const std::string& value, bool is_null );
140 
141  // Adds a char128 to the buffer
142  void add_char128( const std::string& value, bool is_null );
143 
144  // Adds a char256 to the buffer
145  void add_char256( const std::string& value, bool is_null );
146 
147  // Adds a date to the buffer
148  void add_date( const std::string& value, bool is_null );
149 
150  // Adds a datetime to the buffer
151  void add_datetime( const std::string& value, bool is_null );
152 
153  // Adds a decimal to the buffer
154  void add_decimal( const std::string& value, bool is_null );
155 
156  // Adds a double to the buffer
157  void add_double( double value, bool is_null );
158 
159  // Adds a float to the buffer
160  void add_float( float value, bool is_null );
161 
162  // Adds an int8 to the buffer
163  void add_int8( int8_t value, bool is_null );
164 
165  // Adds an int16 to the buffer
166  void add_int16( int16_t value, bool is_null );
167 
168  // Adds an integer to the buffer
169  void add_int( int32_t value, bool is_null );
170 
171  // Adds a IPv4 address to the buffer
172  void add_ipv4( const std::string& value, bool is_null );
173 
174  // Adds a long to the buffer
175  void add_long( int64_t value, bool is_null );
176 
177  // Adds a time to the buffer
178  void add_time( const std::string& value, bool is_null );
179 
180  // Adds a timestamp (long) to the buffer
181  void add_timestamp( int64_t value, bool is_null );
182 
183  // Adds (the hash value of) a string to the buffer
184  void add_string( const std::string& value, bool is_null );
185 
186  // Adds an unsigned long value to the buffer
187  void add_ulong( const std::string& value, bool is_null );
188 
189 
191  void compute_hash();
192 
195  size_t route( const std::vector<int32_t>& routing_table ) const;
196 
198  // ulong value
199  static bool verify_ulong_value( const std::string& value );
200 
202  RecordKey& operator=(const RecordKey& other);
203 
205  bool operator==(const RecordKey& rhs) const;
206  bool operator!=(const RecordKey& rhs) const { return !(*this == rhs); }
207 
209  bool operator<(const RecordKey& rhs) const;
210  bool operator>(const RecordKey& rhs) const { return ( !(*this < rhs)
211  && !(*this == rhs) ); }
212 
213  std::string toString( const std::string& separator = " " ) const;
214 private:
215 
216  // Copy contents of another key into this one
217  void copy( const RecordKey& other );
218 
219  // Returns whether the buffer is full or not
220  bool is_buffer_full( bool throw_if_full = true ) const;
221 
222  // Check whether the buffer will overflow if we attempt to add n more bytes
223  bool will_buffer_overflow( int n, bool throw_if_overflow = true ) const;
224 
225  // Adds a single byte to the buffer; does the accounting, too
226  void add( uint8_t b );
227 
228  std::vector<unsigned char> m_buffer;
229  size_t m_buffer_size;
230  size_t m_current_size;
231  int32_t m_hash_code;
232  int64_t m_routing_hash;
233  bool m_is_valid;
234  bool m_key_is_complete;
235 
236 }; // end class RecordKey
237 
238 
239 
241 {
242 private:
243 
244  enum ColumnType_T
245  {
246  CHAR1,
247  CHAR2,
248  CHAR4,
249  CHAR8,
250  CHAR16,
251  CHAR32,
252  CHAR64,
253  CHAR128,
254  CHAR256,
255  DATE,
256  DATETIME,
257  DECIMAL,
258  DOUBLE,
259  FLOAT,
260  INT,
261  INT8,
262  INT16,
263  IPV4,
264  LONG,
265  STRING,
266  TIME,
267  TIMESTAMP,
268  ULONG
269  };
270 
271  // Some typedefs for nullable types
272  typedef boost::optional<int32_t> nullableInt;
273 
274  gpudb::Type m_record_type;
275  std::vector<int32_t> m_pk_shard_key_indices;
276  std::vector<ColumnType_T> m_column_types;
277  size_t m_key_buffer_size;
278 
279  RecordKeyBuilder() : m_record_type( gpudb::Type( "empty_type" ) ) {}
280 
281 public:
282 
283  // Constructs a RecordKey builder
284  RecordKeyBuilder( bool is_primary_key, const gpudb::Type& record_type );
285 
286  // Build a RecordKey object based on a generic record
287  bool build( const gpudb::GenericRecord& record, RecordKey& record_key ) const;
288 
289  /*
290  * Build a key-lookup expression based on a generic record.
291  *
292  * Returns true if expression building succeeded, false otherwise.
293  */
294  bool buildExpression( const gpudb::GenericRecord& record,
295  std::string& result ) const;
296 
297  // Returns whether this builder builds any routing keys. That is,
298  // if there are any routing columns in the relevant record type
299  bool has_key() const { return !m_pk_shard_key_indices.empty(); }
300 
301  // Returns true if the other RecordKeyBuilder is equivalent to this builder
302  bool operator==(const RecordKeyBuilder& rhs) const;
303 
304  bool operator!=(const RecordKeyBuilder& rhs) const { return !(*this == rhs); }
305 }; // end class RecordKeyBuilder
306 
307 
308 
309 /*
310  * The WorkerQueue class maintains queues of record to be inserted
311  * into GPUdb. It is templated on the type of the record that is to
312  * be ingested into the DB server.
313  */
315 {
316 public:
317 
318  // We need a shared pointer to move vectors of records around
319  typedef std::vector<gpudb::GenericRecord> recordVector_T;
320 
321 private:
322 
323  gpudb::HttpUrl m_url;
324  size_t m_capacity;
325  bool m_has_primary_key;
326  bool m_update_on_existing_pk;
327  recordVector_T m_queue;
328 
330 // typedef std::map<const RecordKey&, size_t> primary_key_map_t;
331  typedef std::map<RecordKey, size_t> primary_key_map_t;
332  primary_key_map_t m_primary_key_map;
333 
334  WorkerQueue();
335 
336 public:
337 
338  // Takes a string for the url; capacity 1, no PK, no update on existing PK
339  // (this is used by the RecordRetriever class which doesn't care about
340  // the other arguments)
341  WorkerQueue( const std::string& url );
342 
343  // Takes a string for the url
344  WorkerQueue( const std::string& url, size_t capacity, bool has_primary_key,
345  bool update_on_existing_pk );
346  ~WorkerQueue();
347 
349  const gpudb::HttpUrl& get_url() const { return m_url; }
350 
352  void flush( recordVector_T& flushed_records );
353 
355  bool insert( const gpudb::GenericRecord& record,
356  const RecordKey& key,
357  recordVector_T& flushed_records );
358 
359 
360 }; // end class WorkerQueue
361 
362 
363 
364 
365 
366 } // namespace gpudb
367 
368 
369 
370 #endif // __GPUDB_MULTIHEAD_IO_UTILS_H__
371 
372 
bool operator!=(const RecordKeyBuilder &rhs) const
void add_date(const std::string &value, bool is_null)
bool build(const gpudb::GenericRecord &record, RecordKey &record_key) const
void reset(size_t buffer_size)
WorkerList(const GPUdb &gpudb)
void add_string(const std::string &value, bool is_null)
void add_char2(const std::string &value, bool is_null)
bool operator<(const RecordKey &rhs) const
Returns true if this RecordKey is less than the other key.
void add_char32(const std::string &value, bool is_null)
void add_char8(const std::string &value, bool is_null)
void add_ipv4(const std::string &value, bool is_null)
RecordKey & operator=(const RecordKey &other)
The assignment operator.
bool buildExpression(const gpudb::GenericRecord &record, std::string &result) const
void add_int8(int8_t value, bool is_null)
void add_time(const std::string &value, bool is_null)
static bool verify_ulong_value(const std::string &value)
A static utility function for verifying if a given string is a valid.
void add_char128(const std::string &value, bool is_null)
std::vector< gpudb::GenericRecord > recordVector_T
void add_timestamp(int64_t value, bool is_null)
size_t route(const std::vector< int32_t > &routing_table) const
Given a routing table consisting of worker rank indices, choose a worker rank based on the hash of th...
const_iterator begin() const
bool insert(const gpudb::GenericRecord &record, const RecordKey &key, recordVector_T &flushed_records)
Inserts a record into the queue.
void add_ulong(const std::string &value, bool is_null)
int32_t get_hash_code() const
const_iterator end() const
bool operator!=(const RecordKey &rhs) const
std::string toString(const std::string &separator=" ") const
bool operator==(const RecordKey &rhs) const
Returns true if the other RecordKey is equivalent to this key.
void add_char64(const std::string &value, bool is_null)
void add_double(double value, bool is_null)
void add_char1(const std::string &value, bool is_null)
bool operator==(const RecordKeyBuilder &rhs) const
void add_long(int64_t value, bool is_null)
void add_int(int32_t value, bool is_null)
bool operator>(const RecordKey &rhs) const
void add_decimal(const std::string &value, bool is_null)
void add_datetime(const std::string &value, bool is_null)
void add_char16(const std::string &value, bool is_null)
void compute_hash()
Compute the hash of the key in the buffer.
void add_char4(const std::string &value, bool is_null)
std::string toString() const
void add_char256(const std::string &value, bool is_null)
worker_list::const_iterator const_iterator
void add_int16(int16_t value, bool is_null)
void add_float(float value, bool is_null)
void flush(recordVector_T &flushed_records)
Returns the current queue and creates a new internal queue.
const gpudb::HttpUrl & get_url() const
Returns the URL in string format for this worker.