GPUdb C++ API  Version 6.1.0.0
GPUdbIngestor.hpp
Go to the documentation of this file.
1 #ifndef __GPUDB_INGESTOR_HPP__
2 #define __GPUDB_INGESTOR_HPP__
3 
4 #include "gpudb/GPUdb.hpp"
5 #include "gpudb/Http.hpp"
6 #include "gpudb/Type.hpp"
7 
8 
9 #include <atomic>
10 #include <map>
11 #include <regex>
12 #include <string>
13 #include <vector>
14 
15 #include <boost/noncopyable.hpp>
16 #include <boost/shared_ptr.hpp>
17 
18 
19 namespace gpudb
20 {
21 
22 // Forward declaration
23 class GPUdb;
24 
25 void test_worker_list( std::string host );
26 void test_record_key();
28 void test_worker_queue();
29 
30 
31 /*
32  * A list of worker URLs to use for multi-head ingest.
33  */
35 {
36 private:
37 
38  typedef std::vector<gpudb::HttpUrl> worker_list;
39 
40 
41 public:
42 
43  /* Creates a <see cref="WorkerList"/> object and automatically populates it with the
44  * worker URLs from GPUdb to support multi-head ingest. ( If the
45  * specified GPUdb instance has multi-head ingest disabled, the worker
46  * list will be empty and multi-head ingest will not be used.) Note that
47  * in some cases, workers may be configured to use more than one IP
48  * address, not all of which may be accessible to the client; this
49  * constructor uses the first IP returned by the server for each worker.
50  * </summary>
51  *
52  * <param name="db">The <see cref="GPUdb"/> instance from which to
53  * obtain the worker URLs.</param>
54  */
55  WorkerList( const GPUdb &gpudb );
56 
57  /* Creates a <see cref="WorkerList"/> object and automatically populates it with the
58  * worker URLs from GPUdb to support multi-head ingest. ( If the
59  * specified GPUdb instance has multi-head ingest disabled, the worker
60  * list will be empty and multi-head ingest will not be used.) Note that
61  * in some cases, workers may be configured to use more than one IP
62  * address, not all of which may be accessible to the client; this
63  * constructor uses the provided regular expression to match the workers in each
64  * group, and only uses matching workers, if any.
65  * </summary>
66  *
67  * <param name="db">The <see cref="GPUdb"/> instance from which to
68  * obtain the worker URLs.</param>
69  * <param name="ip_regex_str">A regular expression pattern for the IPs to match.</param>
70  */
71  WorkerList( const GPUdb &gpudb, const std::string& ip_regex_str );
72 
73 // ~WorkerList();
74 
75 
76  // Return the size of this WorkerList
77  size_t size() const { return m_worker_urls.size(); }
78 
79  // Iterator related stuff
80  typedef worker_list::const_iterator const_iterator;
81  const_iterator begin() const { return m_worker_urls.begin(); }
82  const_iterator end() const { return m_worker_urls.end(); }
83 
84  // Returns if this WorkerList is empty
85  bool empty() const { return m_worker_urls.empty(); }
86 
87  // Returns a string representation of the workers contained within
88  std::string toString() const;
89 
90 
91 private:
92 
93  worker_list m_worker_urls;
94 
95  static void split_string( const std::string &in_string,
96  char delim,
97  std::vector<std::string> &elements );
98 }; // end class WorkerList
99 
100 
101 
102 // Internal classes
103 class RecordKeyBuilder;
104 class WorkerQueue;
105 
106 
107 /*
108  * The multi-head ingestor class (also handles regular insertion). Using this class is
109  * significantly more computation-intensive compared to a regular insertion. So, it is
110  * highly recommended to use this ingestor only if multi-head ingestion is actually turned
111  * on in the server and there is a large volume of records to be inserted.
112  */
113 class GPUdbIngestor : private boost::noncopyable
114 {
115 
116 public:
117 
118  GPUdbIngestor( const gpudb::GPUdb& db, const gpudb::Type& record_type,
119  const std::string& table_name,
120  const WorkerList& worker_list,
121  const std::map<std::string, std::string>& insert_options,
122  size_t batch_size );
123 
124  GPUdbIngestor( const gpudb::GPUdb& db, const gpudb::Type& record_type,
125  const std::string& table_name,
126  const WorkerList& worker_list,
127  size_t batch_size );
128 
129 
130  GPUdbIngestor( const gpudb::GPUdb& db, const gpudb::Type& record_type,
131  const std::string& table_name,
132  const std::map<std::string, std::string>& insert_options,
133  size_t batch_size );
134 
135  GPUdbIngestor( const gpudb::GPUdb& db, const gpudb::Type& record_type,
136  const std::string& table_name, size_t batch_size );
137 
138  ~GPUdbIngestor();
139 
140  /*
141  * Returns the count of records inserted so far through this ingestor
142  * instance; An atomic operation.
143  */
144  size_t getCountInserted() const { return m_count_inserted; }
145 
146  /*
147  * Returns the count of records updated so far through this ingestor
148  * instance; An atomic operation.
149  */
150  size_t getCountUpdated() const { return m_count_updated; }
151 
152 
153  /*
154  * Ensures that all queued records are inserted into the database. If an error
155  * occurs while inserting the records from any queue, the recoreds will no
156  * longer be in that queue nor in the database; catch <see cref="GPUdbInsertException{T}" />
157  * to get the list of records that were being inserted if needed (for example,
158  * to retry). Other queues may also still contain unflushed records if this
159  * occurs.
160  */
161  void flush();
162 
163 
164  /*
165  * Queues a record for insertion into GPUdb. If the queue reaches
166  * the <member cref="batch_size" />, all records in the queue will be
167  * inserted into Kinetica before the method returns. If an error occurs
168  * while inserting the records, the records will no longer be in the queue
169  * nor in Kinetica; catch <see cref="InsertException{T}"/> to get the list
170  * of records that were being inserted if needed (for example, to retry).
171  *
172  * <param name="record">The record to insert.</param>
173  */
174  void insert( gpudb::GenericRecord record );
175 
176 
177  /*
178  * Queues a list of records for insertion into Kientica. If any queue reaches
179  * the <member cref="batch_size" />, all records in the queue will be
180  * inserted into Kinetica before the method returns. If an error occurs
181  * while inserting the records, the records will no longer be in the queue
182  * nor in Kinetica; catch <see cref="InsertException{T}"/> to get the list
183  * of records that were being inserted if needed (for example, to retry).
184  *
185  * <param name="records">The records to insert.</param>
186  */
187  void insert( std::vector<gpudb::GenericRecord> records );
188 
189 
190 private:
191 
192  typedef std::map<std::string, std::string> str_to_str_map_t;
193  typedef boost::shared_ptr<gpudb::WorkerQueue> worker_queue_ptr_t;
194 // typedef boost::shared_ptr<gpudb::RecordKeyBuilder> record_key_buildter_ptr;
195 
196 
197  GPUdbIngestor();
198 
199  void construct( const gpudb::GPUdb& db,
200  const gpudb::Type& record_type,
201  const std::string& table_name,
202  const WorkerList& worker_list,
203  size_t batch_size );
204 
205  /*
206  * Insert the given list of records to the database residing at the given URL.
207  * Upon any error, thrown InsertException with the queue of records passed into it.
208  */
209  void flush( const std::vector<gpudb::GenericRecord>& queue,
210  const gpudb::HttpUrl& url );
211 
212  const gpudb::GPUdb& m_db;
213  std::string m_table_name;
214  size_t m_batch_size;
215  std::atomic<size_t> m_count_inserted;
216  std::atomic<size_t> m_count_updated;
217  str_to_str_map_t m_insert_options;
218  gpudb::Type m_record_type;
219  gpudb::RecordKeyBuilder* m_primary_key_builder_ptr;
220  gpudb::RecordKeyBuilder* m_shard_key_builder_ptr;
221 // record_key_buildter_ptr m_primary_key_builder_ptr;
222 // record_key_buildter_ptr m_shard_key_builder_ptr;
223  std::vector<int32_t> m_routing_table;
224  std::vector<worker_queue_ptr_t> m_worker_queues;
225 
226 }; // end class GPUdbIngestor
227 
228 
229 
230 } // namespace gpudb
231 
232 
233 
234 #endif // __GPUDB_INGESTOR_HPP__
235 
236 
void test_record_key()
WorkerList(const GPUdb &gpudb)
size_t size() const
std::string toString() const
size_t getCountUpdated() const
const_iterator begin() const
bool empty() const
void test_worker_queue()
void test_record_key_builder()
size_t getCountInserted() const
worker_list::const_iterator const_iterator
const_iterator end() const
void test_worker_list(std::string host)