Version:

Table Monitor Tutorial

The following is an excerpted tutorial, extracted from a complete example using the Python Table Monitor API, demonstrating the basic use of the three table monitor types:

  • table insert monitor - watches a table for newly inserted records and can make use of those records elsewhere
  • table update monitor - watches a table for updates and reports the number of records updated in each update request issued
  • table delete monitor - watches a table for deletes and reports the number of records deleted in each delete request issued

The table monitors will be managed via instances of the GPUdbTableMonitorBase class, which can create, process, & remove table monitors.

Prerequisites

The prerequisites for running the table monitor example are listed below:

Python API Installation

The native Kinetica Python API is accessible through the following means:

  • For development on the Kinetica server:
  • For development not on the Kinetica server:

Kinetica RPM

In default Kinetica installations, the native Python API is located in the /opt/gpudb/api/python directory. The /opt/gpudb/bin/gpudb_python wrapper script is provided, which sets the execution environment appropriately.

Test the installation:

/opt/gpudb/bin/gpudb_python /opt/gpudb/api/python/examples/example.py

Important

When developing on the Kinetica server, use /opt/gpudb/bin/gpudb_python to run Python programs and /opt/gpudb/bin/gpudb_pip to install dependent libraries.

Git

  1. In the desired directory, run the following but be sure to replace <kinetica-version> with the name of the installed Kinetica version, e.g., v7.0:

    git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-api-python.git
    
  2. Change directory into the newly downloaded repository:

    cd kinetica-api-python
    
  3. In the root directory of the unzipped repository, install the Kinetica API:

    sudo python setup.py install
    
  4. Test the installation (Python 2.7 (or greater) is necessary for running the API example):

    python examples/example.py
    

PyPI

The Python package manager, pip, is required to install the API from PyPI.

  1. Install the API:

    pip install gpudb --upgrade
    
  2. Test the installation:

    python -c "import gpudb;print('Import Successful')"
    

    If Import Successful is displayed, the API has been installed as is ready for use.

Script Detail

In this example, a set of batches of randomly-generated weather data for a fixed set of cities will be inserted into a "history" table. A table monitor manager (via GPUdbTableMonitorBase) will be created to watch for those newly inserted records and update a "status" table of those cities with their latest randomly-generated temperature values.

Afterwards, a set of updates will be made to the "status" table, followed by a set of deletes. A second table monitor manager will be created to watch for these two event types and output the numbers of records updated & deleted to the console.

There are four main components involved in this example:

  • The "history" table, into which weather records will be inserted, in batch
  • The "status" table, which reflects the latest weather data for each city
  • The table monitor manager for the "history" table, which will watch for inserts into the table and update the "status" table with those records
  • The table monitor manager for the "status" table, which will watch for updates & deletes on the table and output the corresponding records counts

The key parts of table monitor processing via Python are:

Imports & Aliases

The first step is to import Kinetica libraries, aliasing some of the classes for ease of use. Other supporting libraries need to be imported, as well.

import gpudb
from gpudb import GPUdbRecordColumn as GRC
from gpudb import GPUdbColumnProperty as GCP
from gpudb import GPUdbTableMonitorBase
from gpudb import TableEventType

import json
from tabulate import tabulate

Creating a Table Monitor Manager

Next, create a connection to Kinetica.

# Establish connection with an instance of Kinetica on port 9191
kinetica = gpudb.GPUdb(host = ['http://' + args.host + ':9191'], username = args.username, password = args.password)

Then, define a convenient container for the table monitor manager on the "history" table, which will use that connection to manage the "history" table's monitor as well as process the insert events on it.

class StatusUpdater(GPUdbTableMonitorBase):

    def __init__(self, kinetica):

Within the initialization, specify a callback function for insert events to use in the creation of the table monitor manager base class, GPUdbTableMonitorBase. Also grab a handle to the "status" table for issuing updates.

        callbacks = GPUdbTableMonitorBase.Callbacks(
                cb_insert_decoded = self.on_insert_decoded
        )

        self.table_monitor = GPUdbTableMonitorBase(kinetica, HISTORY_TABLE, callbacks)

        self.status_table = gpudb.GPUdbTable(name = STATUS_TABLE, db = kinetica)

Define the callback function that will be used to create the table monitor manager, which will update the "status" table with new "history" table records.

    def on_insert_decoded(self, payload):

        # List of city records with their new temperatures and timestamps to
        # create for update in the "status" table
        status_update_records = []
        
        # Process each location/temperature record in the payload
        for history_record in payload:

            # Create a "status" record from the given "history" record
            status_update_records.append([
                history_record["city"],
                history_record["state_province"],
                history_record["country"],
                history_record["temperature"],
                history_record["ts"]
            ])
        
        # Upsert new weather records into "status" table
        print "[TM/SU]  Updating <%s> city temperature statuses with received messages..." % len(status_update_records)
        self.status_table.insert_records(status_update_records, options = {"update_on_existing_pk": "true"})

Define a second table monitor manager container, this time for monitoring updates & deletes on the "status" table.

class StatusReporter(GPUdbTableMonitorBase):

    def __init__(self, kinetica):

Within the initialization, specify one callback function for update events and one for delete events to use in the creation of the table monitor manager base class, GPUdbTableMonitorBase. As the insert monitor is the default monitor type, it didn't need to be specified in the first table monitor manager container, but in this one, update & delete monitoring options will need to be specified. Also grab a handle to the "status" table for issuing updates.

        callbacks = GPUdbTableMonitorBase.Callbacks(
                cb_updated = self.on_update,
                cb_deleted = self.on_delete
        )
        options = GPUdbTableMonitorBase.Options.default()
        options.operation_list = [TableEventType.UPDATE, TableEventType.DELETE]

        self.table_monitor = GPUdbTableMonitorBase(kinetica, STATUS_TABLE, callbacks, options)

        self.status_table = gpudb.GPUdbTable(name = STATUS_TABLE, db = kinetica)

Define one callback function for updates and one for deletes that will be used to create the table monitor manager. Each of these will output their respective event counts and output the full set of records from the "status" table.

    def on_update(self, count):

        print "[TM/SR]  Updated <%s> city temperature statuses:" % (count)
        self.show_status()
    def on_delete(self, count):

        print "[TM/SR]  Deleted <%s> city temperature statuses:" % (count)
        self.show_status()

Lastly, define the show_status function that will be called by each event handler to output the records in the "status" table.

    def show_status(self):
        
        self.status_table.get_records_by_column(
            column_names = ["city", "state_province", "country", "temperature", "last_update_ts"],
            options = {"sort_by":"city"},
            print_data = True
        )

Displaying a Table's Monitors

After the table monitor managers have been created, confirm this by looking up the table monitors attached to the "history" & "status" tables and display the monitored event and event topic ID for each table monitor the tables have.

table_monitor_headers = ["Table Name", "Monitor Type", "Topic ID"]
table_monitor_records = []

# Show table monitors on all the tables
for table_name in [
    HISTORY_TABLE,
    STATUS_TABLE
]:
    table_info = kinetica.show_table(table_name)['additional_info'][0]
    
    if 'table_monitor' in table_info:

        table_monitor_info = json.loads(table_info['table_monitor'])

        for monitor_type in table_monitor_info:
            table_monitor_records.append([table_name, monitor_type, table_monitor_info[monitor_type]])

print( tabulate( table_monitor_records, headers = table_monitor_headers, tablefmt = 'grid' ) ) 
+-----------------------+----------------+--------------------------+
| Table Name            | Monitor Type   | Topic ID                 |
+=======================+================+==========================+
| table_monitor_history | insert         | Wx1YJP5/IoD3aa28v6u+Ag== |
+-----------------------+----------------+--------------------------+
| table_monitor_status  | update         | lNniSd9D2GZInM8V2I4ZpQ== |
+-----------------------+----------------+--------------------------+
| table_monitor_status  | delete         | O8ZzS8w+Au3SlpLRyn2CGw== |
+-----------------------+----------------+--------------------------+

Starting/Stopping the Table Monitor Manager

To start the table monitor managers, instantiate each container class with a handle to the database and call their respective start functions.

status_updater = StatusUpdater(kinetica)
status_reporter = StatusReporter(kinetica)

status_updater.start()
status_reporter.start()

To stop and remove the table monitor managers (and their corresponding managed table monitors), call their respective stop functions.

status_updater.stop()
status_reporter.stop()

Download & Run

Included below is a complete example of the Python Table Monitor API, containing all of the above code samples, as well as the loading of data into the "history" & "status" tables and the verification output of records.

To run the complete sample, ensure the table_monitor.py file is in the current directory, and do the following:

  • If on the Kinetica host:

    /opt/gpudb/bin/gpudb_python table_monitor.py [--username <username> --password <password>]
    
  • If running after using PyPI or GitHub to install the Python API:

    python table_monitor.py [--host <target_host_ip>] [--username <username> --password <password>]
    

Note

As this script creates a schema and several database objects within it, system admin permission is required to run it.