The following is an excerpted tutorial, extracted from a complete example using the Python Table Monitor API, demonstrating the basic use of the three table monitor types:
The table monitors will be managed via instances of the
GPUdbTableMonitorBase
class, which can create, process, & remove
table monitors.
The prerequisites for running the table monitor example are listed below:
Table monitor example script
The native Kinetica Python API is accessible through the following means:
In default Kinetica installations, the native Python API is located in the
/opt/gpudb/api/python
directory. The
/opt/gpudb/bin/gpudb_python
wrapper script is provided, which sets the
execution environment appropriately.
Test the installation:
/opt/gpudb/bin/gpudb_python /opt/gpudb/api/python/examples/example.py
Important
When developing on the Kinetica server, use /opt/gpudb/bin/gpudb_python to run Python programs and /opt/gpudb/bin/gpudb_pip to install dependent libraries.
In the desired directory, run the following but be sure to replace
<kinetica-version>
with the name of the installed Kinetica version,
e.g., v7.0
:
git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-api-python.git
Change directory into the newly downloaded repository:
cd kinetica-api-python
In the root directory of the unzipped repository, install the Kinetica API:
sudo python setup.py install
Test the installation (Python 2.7 (or greater) is necessary for running the API example):
python examples/example.py
The Python package manager, pip, is required to install the API from PyPI.
Install the API:
pip install gpudb --upgrade
Test the installation:
python -c "import gpudb;print('Import Successful')"
If Import Successful is displayed, the API has been installed as is ready for use.
In this example, a set of batches of randomly-generated weather data for a
fixed set of cities will be inserted into a "history" table. A
table monitor manager (via GPUdbTableMonitorBase
) will be created to watch
for those newly inserted records and update a "status" table of those cities
with their latest randomly-generated temperature values.
Afterwards, a set of updates will be made to the "status" table, followed by a set of deletes. A second table monitor manager will be created to watch for these two event types and output the numbers of records updated & deleted to the console.
There are four main components involved in this example:
The key parts of table monitor processing via Python are:
The first step is to import Kinetica libraries, aliasing some of the classes for ease of use. Other supporting libraries need to be imported, as well.
import gpudb
from gpudb import GPUdbRecordColumn as GRC
from gpudb import GPUdbColumnProperty as GCP
from gpudb import GPUdbTableMonitorBase
from gpudb import TableEventType
import json
from tabulate import tabulate
Next, create a connection to Kinetica.
# Establish connection with an instance of Kinetica on port 9191
kinetica = gpudb.GPUdb(host = ['http://' + args.host + ':9191'], username = args.username, password = args.password)
Then, define a convenient container for the table monitor manager on the "history" table, which will use that connection to manage the "history" table's monitor as well as process the insert events on it.
class StatusUpdater(GPUdbTableMonitorBase):
def __init__(self, kinetica):
Within the initialization, specify a callback function for insert events to use
in the creation of the table monitor manager base class,
GPUdbTableMonitorBase
. Also grab a handle to the "status" table for issuing
updates.
callbacks = GPUdbTableMonitorBase.Callbacks(
cb_insert_decoded = self.on_insert_decoded
)
self.table_monitor = GPUdbTableMonitorBase(kinetica, HISTORY_TABLE, callbacks)
self.status_table = gpudb.GPUdbTable(name = STATUS_TABLE, db = kinetica)
Define the callback function that will be used to create the table monitor manager, which will update the "status" table with new "history" table records.
def on_insert_decoded(self, payload):
# List of city records with their new temperatures and timestamps to
# create for update in the "status" table
status_update_records = []
# Process each location/temperature record in the payload
for history_record in payload:
# Create a "status" record from the given "history" record
status_update_records.append([
history_record["city"],
history_record["state_province"],
history_record["country"],
history_record["temperature"],
history_record["ts"]
])
# Upsert new weather records into "status" table
print "[TM/SU] Updating <%s> city temperature statuses with received messages..." % len(status_update_records)
self.status_table.insert_records(status_update_records, options = {"update_on_existing_pk": "true"})
Define a second table monitor manager container, this time for monitoring updates & deletes on the "status" table.
class StatusReporter(GPUdbTableMonitorBase):
def __init__(self, kinetica):
Within the initialization, specify one callback function for update events and
one for delete events to use in the creation of the table monitor manager base
class, GPUdbTableMonitorBase
. As the insert monitor is the default monitor
type, it didn't need to be specified in the first table monitor manager
container, but in this one, update & delete monitoring options will need to be
specified. Also grab a handle to the "status" table for issuing
updates.
callbacks = GPUdbTableMonitorBase.Callbacks(
cb_updated = self.on_update,
cb_deleted = self.on_delete
)
options = GPUdbTableMonitorBase.Options.default()
options.operation_list = [TableEventType.UPDATE, TableEventType.DELETE]
self.table_monitor = GPUdbTableMonitorBase(kinetica, STATUS_TABLE, callbacks, options)
self.status_table = gpudb.GPUdbTable(name = STATUS_TABLE, db = kinetica)
Define one callback function for updates and one for deletes that will be used to create the table monitor manager. Each of these will output their respective event counts and output the full set of records from the "status" table.
def on_update(self, count):
print "[TM/SR] Updated <%s> city temperature statuses:" % (count)
self.show_status()
def on_delete(self, count):
print "[TM/SR] Deleted <%s> city temperature statuses:" % (count)
self.show_status()
Lastly, define the show_status
function that will be called by each event
handler to output the records in the "status" table.
def show_status(self):
self.status_table.get_records_by_column(
column_names = ["city", "state_province", "country", "temperature", "last_update_ts"],
options = {"sort_by":"city"},
print_data = True
)
After the table monitor managers have been created, confirm this by looking up the table monitors attached to the "history" & "status" tables and display the monitored event and event topic ID for each table monitor the tables have.
table_monitor_headers = ["Table Name", "Monitor Type", "Topic ID"]
table_monitor_records = []
# Show table monitors on all the tables
for table_name in [
HISTORY_TABLE,
STATUS_TABLE
]:
table_info = kinetica.show_table(table_name)['additional_info'][0]
if 'table_monitor' in table_info:
table_monitor_info = json.loads(table_info['table_monitor'])
for monitor_type in table_monitor_info:
table_monitor_records.append([table_name, monitor_type, table_monitor_info[monitor_type]])
print( tabulate( table_monitor_records, headers = table_monitor_headers, tablefmt = 'grid' ) )
+-----------------------+----------------+--------------------------+
| Table Name | Monitor Type | Topic ID |
+=======================+================+==========================+
| table_monitor_history | insert | Wx1YJP5/IoD3aa28v6u+Ag== |
+-----------------------+----------------+--------------------------+
| table_monitor_status | update | lNniSd9D2GZInM8V2I4ZpQ== |
+-----------------------+----------------+--------------------------+
| table_monitor_status | delete | O8ZzS8w+Au3SlpLRyn2CGw== |
+-----------------------+----------------+--------------------------+
To start the table monitor managers, instantiate each container class with a
handle to the database and call their respective start
functions.
status_updater = StatusUpdater(kinetica)
status_reporter = StatusReporter(kinetica)
status_updater.start()
status_reporter.start()
To stop and remove the table monitor managers (and their corresponding managed
table monitors), call their respective stop
functions.
status_updater.stop()
status_reporter.stop()
Included below is a complete example of the Python Table Monitor API, containing all of the above code samples, as well as the loading of data into the "history" & "status" tables and the verification output of records.
To run the complete sample, ensure the table_monitor.py
file is in the
current directory, and do the following:
If on the Kinetica host:
/opt/gpudb/bin/gpudb_python table_monitor.py [--username <username> --password <password>]
If running after using PyPI or GitHub to install the Python API:
python table_monitor.py [--host <target_host_ip>] [--username <username> --password <password>]
Note
As this script creates a schema and several database objects within it, system admin permission is required to run it.