import threading
import time
import types
import uuid
import zmq
try:
from io import BytesIO
except:
from cStringIO import StringIO as BytesIO
try:
import httplib
except:
import http.client as httplib
import os, sys
import logging
# We'll need to do python 2 vs. 3 things in many places
IS_PYTHON_3 = (sys.version_info[0] >= 3) # checking the major component
IS_PYTHON_2 = (sys.version_info[0] == 2) # checking the major component
IS_PYTHON_27_OR_ABOVE = sys.version_info >= (2, 7)
if IS_PYTHON_3:
long = int
basestring = str
class unicode:
pass
# ---------------------------------------------------------------------------
# The absolute path of this gpudb.py module for importing local packages
gpudb_module_path = os.path.dirname(os.path.abspath(__file__))
# Search for our modules first, probably don't need imp or virt envs.
for gpudb_path in [gpudb_module_path, gpudb_module_path + "/packages"]:
if not gpudb_path in sys.path:
sys.path.append(gpudb_path)
try:
from gpudb.protocol import RecordType
except ImportError:
from protocol import RecordType
try:
from gpudb import GPUdb, GPUdbRecord, GPUdbException, \
GPUdbConnectionException
except:
from gpudb import GPUdb, GPUdbRecord, GPUdbException, \
GPUdbConnectionException
try:
import gpudb.packages.enum34 as enum
except ImportError:
import packages.enum34 as enum
try:
import queue
except ImportError:
import Queue as queue
# -----------------------------------------------------------------
[docs]
class GPUdbTableMonitor(object):
[docs]
class Client(object):
""" This class is the main client side API class implementing most of the
functionalities. Implementing table monitor functions means that this class
creates the server side table monitors in Kinetica and also starts
listening for different events on those monitors like inserts, deletes
and updates. Once the notifications (inserted records, deletions and
updates) are received this class will call the different callback
methods passed in the constructor.
The intended use of this class is to derive from this and define the
different callback methods and pass the callback methods wrapped in
different callback objects of the types defined by the class
:class:`.Callback.Type`. The callback objects thus
created must be passed as a list to the constructor of this class.
A usage that will suffice for most cases has been given in the
``examples/table_monitor_example.py`` file. The user is requested to
go through the example to get a thorough understanding of how to use
this class.
The readme contains the relevant links to navigate directly to the
examples provided.
"""
def __init__(self, db, table_name, callback_list, options=None):
"""
Parameters:
db (:class:`gpudb.GPUdb`)
The :class:`gpudb.GPUdb` object which is created external to
this class and passed in to facilitate calling different
methods of that API. This has to be pre-initialized and must
have a valid value. If this is uninitialized then the
constructor will raise a :class:`gpudb.GPUdbException`.
table_name (str)
The name of the Kinetica table for which a monitor is to be
created. This must have a valid value and cannot be an empty
string. In case this parameter does not have a valid value,
the constructor will raise a :class:`gpudb.GPUdbException`.
callback_list (list(:class:`.Callback`))
List of :class:`.Callback` objects
options (:class:`GPUdbTableMonitor.Options`)
The class to encapsulate the various options that can be
passed to a :class:`.Client` object to alter the behavior of
the monitor instance. The details are given in the
:class:`GPUdbTableMonitor.Options` section.
"""
if not self.__check_params(db, table_name):
raise GPUdbException(
"Both db and table_name need valid values ...")
self.type_id = ""
self.type_schema = ""
self.db = db
self.full_url = self.db.gpudb_full_url
self.table_name = table_name
self.task_list = list()
self.options = options if options is not None else GPUdbTableMonitor.Options.default()
# Setup the logger for this instance
self._id = str(uuid.uuid4())
# self._logger is kept protected since it is also accessed from
# Client derived classes.
self._logger = logging.getLogger(
"gpudb_table_monitor.Client_instance_" + self._id)
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s %(levelname)-8s {%("
"funcName)s:%(lineno)d} %(message)s",
"%Y-%m-%d %H:%M:%S")
handler.setFormatter(formatter)
self._logger.addHandler(handler)
# Prevent logging statements from being duplicated
self._logger.propagate = False
self._logger.setLevel(logging.INFO)
# check whether the socket connection can be made
if not self.__zmq_check_connection(self.options):
raise GPUdbException("Could not connect to table monitor; socket connection failed ...")
else:
# Proceed only if the socket connection is successful
self._logger.info("Connected to socket successfully ...")
self._set_of_callbacks = set()
self._insert_decoded_callback = None
self._insert_raw_callback = None
self._deleted_callback = None
self._updated_callback = None
self._table_altered_callback = None
self._table_dropped_callback = None
no_callbacks = callback_list is None or all(cb is None for cb in callback_list)
if no_callbacks:
raise GPUdbException("No callbacks defined ... cannot proceed")
monitor_callbacks = any(
(cb is not None)
and (
cb.callback_type in GPUdbTableMonitor.Callback.Type.monitor_types()
)
and (cb.event_callback is not None)
for cb in callback_list)
if not monitor_callbacks:
raise GPUdbException(
"No callbacks defined to create table monitors "
"... cannot proceed")
check_callbacks = all([isinstance(func, GPUdbTableMonitor.Callback)
for func in callback_list])
self.__operation_list = set()
if not check_callbacks:
raise GPUdbException(
"callbacks must be of type : 'Callback'")
else:
self._set_of_callbacks = set(callback_list)
# Parse the set of callbacks and populate the respective instance
# variables
for cb in self._set_of_callbacks:
if cb.callback_type == GPUdbTableMonitor.Callback.Type.INSERT_DECODED:
self._insert_decoded_callback = cb
elif cb.callback_type == GPUdbTableMonitor.Callback.Type.INSERT_RAW:
self._insert_raw_callback = cb
elif cb.callback_type == GPUdbTableMonitor.Callback.Type.UPDATED:
self._updated_callback = cb
elif cb.callback_type == GPUdbTableMonitor.Callback.Type.DELETED:
self._deleted_callback = cb
elif cb.callback_type == GPUdbTableMonitor.Callback.Type.TABLE_ALTERED:
self._table_altered_callback = cb
elif cb.callback_type == GPUdbTableMonitor.Callback.Type.TABLE_DROPPED:
self._table_dropped_callback = cb
else:
self._logger.error("Unrecognized callback type ... ")
raise GPUdbException(
"Unrecognized callback type passed ... cannot parse")
self.__operation_list = self.__get_operation_list_from_callbacks()
if ((self.__operation_list is None)
or (len(self.__operation_list) == 0)):
raise GPUdbException(
"Cannot determine table monitors needed from "
"the callback objects passed in ...")
if options is None:
# This is the default, created internally
self.options = GPUdbTableMonitor.Options.default()
else:
# User has passed in options, check everything for validity
if isinstance(options, GPUdbTableMonitor.Options):
try:
self.options = options
self.port = options.monitor_port
except GPUdbException as ge:
self._logger.error(ge.message)
raise GPUdbException(ge)
else:
raise GPUdbException(
"Passed in options is not of the expected "
"type: Expected "
"'Options' type")
# END check for socket connect
# End __init__ Client
def __zmq_check_connection(self, options):
import zmq.utils, zmq.utils.monitor
zmq_url = f"tcp://{self.db.host}:{options.monitor_port}"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect(zmq_url)
# Monitor the socket for events
monitor = socket.get_monitor_socket(
zmq.EVENT_CONNECTED | zmq.EVENT_CONNECT_RETRIED | zmq.EVENT_DISCONNECTED)
try:
# Listen for monitor events
max_retries = options.max_retries
num_retries = 0
wait_interval = options.wait_interval # seconds
while num_retries < max_retries:
event = zmq.utils.monitor.recv_monitor_message(monitor)
event_id = event["event"]
endpoint = event['endpoint'].decode('utf-8')
if event_id == zmq.EVENT_CONNECTED:
self._logger.info(f"Socket connected to {endpoint}")
return True
elif event_id == zmq.EVENT_CONNECT_RETRIED:
self._logger.warning(f"Retrying connection to {endpoint}")
elif event_id == zmq.EVENT_DISCONNECTED:
self._logger.error(f"Socket disconnected from {endpoint}")
return False
time.sleep(wait_interval)
num_retries += 1
except zmq.ZMQError as e:
self._logger.error(f"Error: {e}")
return False
finally:
socket.close()
def __get_operation_list_from_callbacks(self):
"""Internal method to retrieve a set of _TableEvent objects which is
used to create the table monitors.
Returns: A set of _TableEvent (enum) type objects.
"""
operation_list = set()
for obj in self._set_of_callbacks:
if obj.callback_type in GPUdbTableMonitor.Callback.Type.monitor_types():
if (obj.callback_type in
[GPUdbTableMonitor.Callback.Type.INSERT_RAW,
GPUdbTableMonitor.Callback.Type.INSERT_DECODED]):
operation_list.add(
GPUdbTableMonitor.Client._TableEvent.INSERT)
elif obj.callback_type == GPUdbTableMonitor.Callback.Type.DELETED:
operation_list.add(
GPUdbTableMonitor.Client._TableEvent.DELETE)
elif obj.callback_type == GPUdbTableMonitor.Callback.Type.UPDATED:
operation_list.add(
GPUdbTableMonitor.Client._TableEvent.UPDATE)
else:
self._logger.error("No callback object found of one of "
"the types of"
"[INSERT_RAW, INSERT_DECODED, DELETED"
"or UPDATED] to create table monitor"
"... No table monitor can be created")
raise GPUdbException("Error : No callback object found "
"of one of the types of"
"[INSERT_RAW, INSERT_DECODED, DELETED"
"or UPDATED] to create table monitor"
"... No table monitor can be created")
return operation_list
def __check_params(self, db, table_name):
""" This method checks the parameters passed into the Client
constructor for correctness.
Checks for existence of the table as well.
Parameters:
db (GPUdb)
the GPUdb object needed to access the different APIs
table_name (str)
Name of the table to create the monitor for.
Returns: Returns True or False if both the arguments are correct or
wrong respectively.
"""
table_name_value_correct = False
if ((db is None) or (not isinstance(db, GPUdb))):
return False
if (table_name is not None
and isinstance(table_name, (basestring, unicode))
and len(table_name.strip()) > 0):
try:
has_table_response = db.has_table(table_name,
options={})
if has_table_response.is_ok:
table_name_value_correct = has_table_response[
"table_exists"]
except GPUdbException as gpe:
self._logger.error(gpe.message)
return table_name_value_correct
[docs]
def start_monitor(self):
""" This the API called by the client to start the table monitor
This method has to be called to activate the table monitors which
have been created in accordance to the callback objects passed in
the constructor, whether this class is instantiated directly or a
class derived from (:class:`.Client`) is used.
.. seealso:: :meth:`stop_monitor`
"""
for event_type in self.__operation_list:
if event_type == GPUdbTableMonitor.Client._TableEvent.INSERT:
insert_task = _InsertWatcherTask(self.db,
self.table_name,
options=self.options,
callbacks=[
self._insert_raw_callback,
self._insert_decoded_callback,
self._table_dropped_callback,
self._table_altered_callback]
)
insert_task.setup()
insert_task.logging_level = self.logging_level
insert_task.start()
self.task_list.append(insert_task)
elif event_type == GPUdbTableMonitor.Client._TableEvent.UPDATE:
update_task = _UpdateWatcherTask(self.db,
self.table_name,
options=self.options,
callbacks=[
self._updated_callback,
self._table_dropped_callback,
self._table_altered_callback]
)
update_task.setup()
update_task.logging_level = self.logging_level
update_task.start()
self.task_list.append(update_task)
else:
delete_task = _DeleteWatcherTask(self.db,
self.table_name,
options=self.options,
callbacks=[
self._deleted_callback,
self._table_dropped_callback,
self._table_altered_callback]
)
delete_task.setup()
delete_task.logging_level = self.logging_level
delete_task.start()
self.task_list.append(delete_task)
[docs]
def stop_monitor(self):
""" This API is called by the client to stop the table monitor.
This has to be called to stop the table monitor which has been
started by the call :meth:`start_monitor`.
Failure to call this method will produce unpredictable results
since the table monitors running in the background will not be
stopped and cleaned up properly.
.. seealso:: :meth:`start_monitor`
"""
for task in self.task_list:
task.stop()
task.join()
@property
def logging_level(self):
return self._logger.level
@logging_level.setter
def logging_level(self, value):
"""
This property sets the log level for this class and its derivatives.
Parameters:
value (logging.level): Default setting is logging.INFO
Raises:
GPUdbException: If the value passed is not one of logging.INFO
or logging.DEBUG etc.
"""
try:
self._logger.setLevel(value)
except (ValueError, TypeError, Exception) as ex:
raise GPUdbException("Invalid log level: '{}'".format(str(ex)))
class _TableEvent(enum.Enum):
""" Enum for table monitor event types
This is an internal enum used for two purposes:
1. Generating an internal operation list by parsing the callbacks passed
to the Client class. The operations used for creating
the table monitors are INSERT, UPDATE and DELETE. The other two are
used for callbacks related to dropped and altered table notifications.
2. Create the required table monitor of the right type by traversing
the operation list.
This is not meant to be used by the users of this API.
"""
INSERT = 1
"""
int: Indicates an INSERT event has occurred
"""
UPDATE = 2
"""
int: Indicates an UPDATE event has occurred
"""
DELETE = 3
"""
int: Indicates a DELETE event has occurred
"""
TABLE_ALTERED = 4
"""
int: Indicates a table has been altered
"""
TABLE_DROPPED = 5
"""
int: Indicates a table has been dropped
"""
# End Client class
[docs]
class Options(object):
"""
Encapsulates the various options used to create a table monitor. The
class is initialized with sensible defaults which can be overridden by
the users of the class. The following options are supported:
* **inactivity_timeout**
This option controls the time interval to set the timeout to
determine when the program would do idle time processing like checking
for the table existence, server HA failover, etc., if needed. It is
specified in minutes as a float so that seconds can be accommodated
as well. The default value is set to 20 minutes, which is
converted internally to seconds.
Example usage::
options = GPUdbTableMonitor.Options(_dict=dict(
inactivity_timeout=0.1
))
"""
__inactivity_timeout = 'inactivity_timeout'
__INACTIVITY_TIMEOUT_DEFAULT = 20 * 60 * 1000
__max_retries = "max_retries"
__MAX_RETRIES_DEFAULT = 5
__wait_interval = "wait_interval"
__WAIT_INTERVAL_DEFAULT = 1 # seconds
__monitor_port = "monitor_port"
__DEFAULT_PORT = 9002
_supported_options = [
__inactivity_timeout,
__max_retries,
__wait_interval,
__monitor_port
]
[docs]
@staticmethod
def default():
"""Create a default set of options for :class:`.Client`
Returns:
:class:`GPUdbTableMonitor.Options` instance
"""
return GPUdbTableMonitor.Options()
def __init__(self, _dict=None):
""" Constructor for :class:`GPUdbTableMonitor.Options` class
Parameters:
_dict (dict)
Optional dictionary with options already loaded. Value can
be None; if it is None suitable sensible defaults will be
set internally.
Returns:
A :class:`GPUdbTableMonitor.Options` object.
"""
# Set default values
# Default is 0.1 minutes = 6 secs
self._inactivity_timeout = self.__INACTIVITY_TIMEOUT_DEFAULT
self._max_retries = self.__MAX_RETRIES_DEFAULT
self._wait_interval = self.__WAIT_INTERVAL_DEFAULT # seconds
self._monitor_port = self.__DEFAULT_PORT
if _dict is None:
return # nothing to do
if not isinstance(_dict, dict):
raise GPUdbException(
"Argument '_dict' must be a dict; given '%s'."
% type(_dict))
# Else,_dict is a dict; extract options from within it
# Check for invalid options
unsupported_options = set(_dict.keys()).difference(
self._supported_options)
if unsupported_options:
raise GPUdbException(
"Invalid options: %s" % unsupported_options)
# Extract and save each option
for (key, val) in _dict.items():
setattr(self, key, val)
# end __init__
@property
def inactivity_timeout(self):
"""This is the getter for the property `inactivity_timeout`.
This is specified in minutes as a float so that seconds can be
accommodated. This indicates a timeout interval after which if no
notification is received from the server table monitors, the program
will check whether everything is alright, like whether the table is
still there and in the process will automatically trigger HA
failover if needed.
The default value is set to 20 minutes converted to milliseconds.
Returns:
The value of the timeout as set in the
:class:`GPUdbTableMonitor.Options` instance.
"""
return self._inactivity_timeout
@inactivity_timeout.setter
def inactivity_timeout(self, val):
"""This is the setter for the property `inactivity_timeout`.
Parameters:
val (float)
This value is in minutes and internally converted to float
so that seconds can be accommodated easily. The default
value is 20 minutes converted to milliseconds.
"""
try:
value = float(val)
except:
raise GPUdbException(
"Property 'inactivity_timeout' must be numeric; "
"given {}".format(str(type(val))))
# Must be > 0
if (value <= 0):
raise GPUdbException(
"Property 'inactivity_timeout' must be "
"greater than 0; given {}".format(str(value)))
# Convert the value to milliseconds
self._inactivity_timeout = val * 60 * 1000
@property
def max_retries(self):
return self._max_retries
@max_retries.setter
def max_retries(self, val):
"""This is the setter for the property `max_retries`.
Parameters:
val (int)
The number of times to retry to get a connection to the table monitor socket
"""
if not isinstance(val, int) or val < 0:
raise GPUdbException(
"Property 'max_retries' must be of type 'int' and "
"greater than 0; given {}".format(str(val)))
self._max_retries = val
@property
def wait_interval(self):
return self._wait_interval
@wait_interval.setter
def wait_interval(self, val):
"""This is the setter for the property `wait_interval`.
Parameters:
val (int)
The number of seconds to wait before retrying to get a connection to the table monitor socket
"""
if not isinstance(val, int) or val < 0:
raise GPUdbException(
"Property 'wait_interval' must be of type 'int' and "
"greater than 0; given {}".format(str(val)))
self._wait_interval = val
@property
def monitor_port(self):
return self._monitor_port
@monitor_port.setter
def monitor_port(self, val):
"""This is the setter for the property `monitor_port`.
Parameters:
val (int)
The port to use to connect to the table monitor
"""
if not isinstance(val, int) or val < 0:
raise GPUdbException(
"Property 'monitor_port' must be of type 'int' and "
"greater than 0; given {}".format(str(val)))
self._monitor_port = val
[docs]
def as_json(self):
"""Return the options as a JSON"""
result = {}
if self.__inactivity_timeout is not None:
result[self.__inactivity_timeout] = self._inactivity_timeout
if self.__max_retries is not None:
result[self.__max_retries] = self._max_retries
if self.__wait_interval is not None:
result[self.__wait_interval] = self._wait_interval
if self.__monitor_port is not None:
result[self.__monitor_port] = self._monitor_port
return result
# end as_json
[docs]
def as_dict(self):
"""Return the options as a dict """
return self.as_json()
# end as_dict
# End Options class
[docs]
class Callback(object):
"""Use this class to indicate which type of table monitor is desired.
When the :class:`.Client` is constructed, a list of objects of this
class has to be supplied to the constructor of the class.
If the list of callbacks is empty or the list does not contain at least
one of the callbacks of types (:class:`.Callback.Type`)
``INSERT_DECODED``, ``INSERT_RAW``, ``DELETED``, or ``UPDATED``, no
table monitor would be created internally and the program would raise a
:class:`gpudb.GPUdbException` and exit. So, a list of objects of this
class is mandatory for the table monitor to function.
An example of using this class and passing on to the constructor of
:class:`.Client` is as follows::
class GPUdbTableMonitorExample(GPUdbTableMonitor.Client):
def __init__(self, db, table_name, options=None):
# Create the list of callbacks objects which are to be passed to the
# GPUdbTableMonitor.Client class constructor
# This example shows only two callbacks being created so
# only an insert type table monitor will be created. For other
# types callback objects could be created similarly to receive
# notifications about other events.
callbacks = [
GPUdbTableMonitor.Callback(GPUdbTableMonitor.Callback.Type.INSERT_RAW,
self.on_insert_raw,
self.on_error),
GPUdbTableMonitor.Callback(GPUdbTableMonitor.Callback.Type.INSERT_DECODED,
self.on_insert_decoded,
self.on_error,
GPUdbTableMonitor.Callback.InsertDecodedOptions( GPUdbTableMonitor.Callback.InsertDecodedOptions.DecodeFailureMode.ABORT ))
]
# Invoke the base class constructor and pass in the list of callback
# objects created earlier. This invocation is mandatory for the table
# monitor to be actually functional.
super(GPUdbTableMonitorExample, self).__init__(
db, table_name,
callbacks, options=options)
"""
def __init__(
self,
callback_type,
event_callback,
error_callback=None,
event_options=None,
):
"""
Constructor for this class.
Parameters:
callback_type (:class:`.Callback.Type`)
This indicates the type of the table monitor this callback
will be used for. It must be of the type
:class:`.Callback.Type` enum.
event_callback (method reference)
This is to be called for any event related to an operation
on a table like insert, update, delete etc. As soon as such
an event is observed this method will be called.
This method can have only one parameter. For each
table monitor event (`callback_type`), the parameter would
be different. The method name has got no significance as
long as the signature is as given below::
def method_name(param):
# param - Could be a (dict|bytes|int|str)
# depending on the :attr:`callback_type`
# Processing Code follows ....
The method thus defined does not return anything.
The following table describes the parameter types which
correspond to each of the `callback_type`:
== ================== ==============
NO callback_type Parameter Type
== ================== ==============
1 ``INSERT_DECODED`` ``dict``
2 ``INSERT_RAW`` ``bytes``
3 ``DELETED`` ``int``
4 ``UPDATED`` ``int``
5 ``TABLE_ALTERED`` ``str``
6 ``TABLE_DROPPED`` ``str``
== ================== ==============
error_callback (method reference)
Optional parameter.
This will be called in case of any operational error that
typically could manifest in the form of some exception
(:class:`gpudb.GPUdbException`).
The name of the method does not matter. It must have only
one argument of type ``str``. The argument to this method
will contain information related to the error that
occurred; often details about any exception that was
raised.
The signature of this method has to be::
def method_name(param):
# param - str
# code here ...
event_options (:class:`.Callback.Options`)
Optional parameter.
Options applicable to a specific callback type, e.g.,
insert, delete, update etc. Right now, the only option
applicable is for the callback handling insertion of records
where the record information is decoded and sent to the
callback by the table monitor.
"""
if isinstance(callback_type, GPUdbTableMonitor.Callback.Type):
self.__type = callback_type
else:
raise GPUdbException(
"Argument type must be of type "
"Callback.Type enum ...")
if not self.__check_whether_function(error_callback):
raise GPUdbException("'error_callback' passed in is not a "
"valid method reference")
if not self.__check_whether_function(event_callback):
raise GPUdbException("'event_callback' passed in is not a "
"valid method reference")
self.__event_callback = event_callback
self.__error_callback = error_callback
if event_options is not None and not isinstance(event_options, GPUdbTableMonitor.Callback.Options):
raise GPUdbException("event_options must be of type class 'Options'"
" or a derived class")
else:
self.__event_options = event_options
# End of Callback.init
@property
def event_callback(self):
"""
Get the method pointed to when an event is received
"""
return self.__event_callback
@property
def error_callback(self):
"""
Get the method pointed to when an error related to
the :attr:`callback_type` of this class occurs.
"""
return self.__error_callback
@property
def callback_type(self):
return self.__type
@property
def event_options(self):
return self.__event_options
def __check_whether_function(self, func):
""" Tests whether the object passed in is actually a function or not
Parameters:
func (Callback.event_callback):
Returns: True or False
"""
return func is None or isinstance(func, (types.FunctionType,
types.BuiltinFunctionType,
types.MethodType,
types.BuiltinMethodType
)) \
or callable(func)
[docs]
class Options(object):
"""
This class embodies the options for any given callback type. The
:class:`.Callback` constructor expects an instance of this class.
However, instead of using this class directly, the user is supposed
to use an instance of one of its derived classes. Each derived class
is specialized with options that pertain to a certain type of
callback.
Note that, currently, there is only one derived class as other
callback types do not have special options at the moment.
.. seealso:: :class:`.Callback.InsertDecodedOptions`
"""
pass
# End of Options class
[docs]
class InsertDecodedOptions(Options):
"""
Options used to control the behavior if an error occurs while
receiving notifications about inserted records after decoding.
"""
def __init__(self, decode_failure_mode=None):
"""Constructor for this class.
Parameters:
decode_failure_mode (:class:`.Callback.InsertDecodedOptions.DecodeFailureMode`)
This is either ``SKIP`` or ``ABORT`` as described in the
class documentation.
"""
if decode_failure_mode is None:
self.__decode_failure_mode = GPUdbTableMonitor.Callback.InsertDecodedOptions.DecodeFailureMode.SKIP
return
if not isinstance(decode_failure_mode,
GPUdbTableMonitor.Callback.InsertDecodedOptions.DecodeFailureMode):
raise GPUdbException("error_mode must be of type "
"InsertDecodedOptions.DecodeFailureMode enum (SKIP|ABORT)")
else:
self.__decode_failure_mode = decode_failure_mode
@property
def decode_failure_mode(self):
"""Get the decode failure mode value.
"""
return self.__decode_failure_mode
@decode_failure_mode.setter
def decode_failure_mode(self, value):
"""Setter
Only allowed values are
1. GPUdbTableMonitor.Callback.InsertDecodedOptions.DecodeFailureMode.SKIP
2. GPUdbTableMonitor.Callback.InsertDecodedOptions.DecodeFailureMode.ABORT
.. seealso:: :class:`.Callback.InsertDecodedOptions`
"""
if (not isinstance(value, int)
or not isinstance(value, GPUdbTableMonitor.Callback.InsertDecodedOptions)
or (value not in
[GPUdbTableMonitor.Callback.InsertDecodedOptions.DecodeFailureMode.SKIP,
GPUdbTableMonitor.Callback.InsertDecodedOptions.DecodeFailureMode.ABORT])):
raise GPUdbException(
"'decode_failure_mode' value must be one of [PUdbTableMonitorCallback.InsertDecodedOptions.DecodeFailureMode.SKIP, "
"Callback.InsertDecodedOptions.DecodeFailureMode.ABORT]")
else:
self.__decode_failure_mode = value
[docs]
class DecodeFailureMode(enum.Enum):
"""
This enum is used to identify the two possible modes to handle any
error that can occur while decoding the payload received from the
server table monitors.
In both the cases (``SKIP`` and ``ABORT``) it will try to
recover once by default.
"""
ABORT = 1
"""
int: If a decoding error occurs and ``ABORT`` is specified, then
the program aborts (quits with an exception)
"""
SKIP = 2
"""
int: if ``SKIP`` is specified, then the program will skip to the
next record and try to decode that. In ``SKIP`` mode, the record
that has been skipped due to a problem in decoding will appear
in the error log.
"""
# End of DecodeFailureMode enum class
# End of InsertDecodedOptions class
[docs]
class Type(enum.Enum):
""" Indicates that the callback is for insert/update/delete event for
the target table. The API will create a[n] insert/update/delete
table monitor, and per event, will invoke the appropriate event
callback method. [Optional based on context: "Upon receiving records
that have been recently inserted into the target table, the table
monitor will/will not decode the records and pass the binary/decoded
records to the event callback method."]
"""
INSERT_DECODED = 1
"""
int: This mode indicates an interest in receiving records after
decoding according to the table schema. This is used to create an
insert monitor internally. The user will get the notification
through the callback method pointed to by the `event_callback`
property of :class:`.Callback`. The inserted records will be returned
as a dict as an argument to the `event_callback`.
.. seealso:: :class:`.Client`
"""
INSERT_RAW = 2
"""
int: This mode indicates an interest in receiving records before
decoding that is as raw bytes. This is used to create an insert
monitor internally. The user will get the notification through the
callback method pointed to by the `event_callback` property of
:class:`.Callback`. The inserted records will be returned as bytes
as an argument to the `event_callback`.
.. seealso:: :class:`.Client`
"""
DELETED = 3
"""
int: This mode indicates an interest in receiving notification about
the count of deleted records. This is used to create a delete
monitor internally. The user will get the notification through the
callback method pointed to by the `event_callback` property of
:class:`.Callback`.
.. seealso:: :class:`.Client`
"""
UPDATED = 4
"""
int: This mode indicates an interest in receiving notification about
the count of updated records. This is used to create an update
monitor internally. The user will get the notification through the
callback method pointed to by the `event_callback` property of
:class:`.Callback`.
.. seealso:: :class:`.Client`
"""
TABLE_ALTERED = 5
"""
int: This mode indicates an interest in receiving notification about
the possible table alterations while one or more table monitors
(insert, update, delete) are monitoring a table. If this is supplied
then the user will be notified using the callback pointed to by the
`event_callback` property of :class:`.Callback`.
.. seealso:: :class:`.Client`
"""
TABLE_DROPPED = 6
"""
int: This mode indicates an interest in receiving notification about
the possible table deletions while one or more table monitors
(insert, update, delete) are monitoring a table. If this is supplied
then the user will be notified using the callback pointed to by the
`event_callback` property of :class:`.Callback`.
.. seealso:: :class:`.Client`
"""
@staticmethod
def event_types():
"""This method returns the list of all available types and it is
called to validate the callback type supplied to the constructor
of :class:`.Callback`.
"""
return [GPUdbTableMonitor.Callback.Type.INSERT_RAW,
GPUdbTableMonitor.Callback.Type.INSERT_DECODED,
GPUdbTableMonitor.Callback.Type.DELETED,
GPUdbTableMonitor.Callback.Type.UPDATED,
GPUdbTableMonitor.Callback.Type.TABLE_ALTERED,
GPUdbTableMonitor.Callback.Type.TABLE_DROPPED]
@staticmethod
def monitor_types():
"""This method returns the list of all available types that
could be relevant to the creation of a table monitor and it is
called to validate the callback type supplied to the constructor
of :class:`.Callback`.
"""
return [GPUdbTableMonitor.Callback.Type.INSERT_RAW,
GPUdbTableMonitor.Callback.Type.INSERT_DECODED,
GPUdbTableMonitor.Callback.Type.DELETED,
GPUdbTableMonitor.Callback.Type.UPDATED]
# End of Callback class
class _BaseTask(threading.Thread):
""" This an internal class and not to be used by clients.
This is the base Task class from which all other tasks are derived
that run the specific monitors for insert, update and delete etc.
"""
def __init__(self,
db,
table_name,
table_event,
table_dropped_callback=None,
table_altered_callback=None,
options=None,
id=None):
"""
Constructor for _BaseTask class, generally will not be needed to be
called directly, will be called by one of the subclasses
:class:`_InsertWatcherTask`, :class:`_UpdateWatcherTask`
or :class:`_DeleteWatcherTask`
Args:
db (GPUdb)
Handle to GPUdb instance
table_name (str)
Name of the table to create the monitor for
table_event (_TableEvent)
Enum of :class:`GPUdbTableMonitor.Client._TableEvent`
Indicates whether the monitor is an insert, delete or
update monitor
table_dropped_callback (method reference)
Reference to method passed to handle notifications related to
dropped table.
.. seealso: :class:`GPUdbTableMonitor.Callback`,
:class:`GPUdbTableMonitor.Callback.Type`
table_altered_callback (method reference)
Reference to method passed to handle notifications related to
an altered table.
.. seealso: :class:`GPUdbTableMonitor.Callback`,
:class:`GPUdbTableMonitor.Callback.Type`
options (Options)
Options to configure Client
.. seealso: :class:`GPUdbTableMonitor.Options`
Raises:
GPUdbException
"""
super(_BaseTask, self).__init__()
if ((db is None) or (not isinstance(db, GPUdb))):
raise GPUdbException("db must be of type GPUdb")
self.db = db
if ((table_name is None)
or (not isinstance(table_name, (basestring, unicode)))):
raise GPUdbException("table_name must be a string")
self.table_name = table_name
if ((table_event is None)
or (not isinstance(table_event, GPUdbTableMonitor.Client._TableEvent))):
raise GPUdbException("table_event must be of type enum _TableEvent")
self.event_type = table_event
self.id = id
if not isinstance(options, GPUdbTableMonitor.Options):
options = GPUdbTableMonitor.Options.default()
self._options = options
self._table_dropped_callback = table_dropped_callback
self._table_altered_callback = table_altered_callback
self.type_id = None
self.type_schema = None
self.topic_id = ""
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.kill = False
self.zmq_url = f"tcp://{self.db.host}:{self._options.monitor_port}"
self.full_url = self.db.gpudb_full_url
self.record_type = None
# Setup the logger for this instance
self._id = str(uuid.uuid4())
# self._logger is kept protected since it is also accessed from the
# _BaseTask derived classes
self._logger = logging.getLogger(
"gpudb_table_monitor.BaseTask_instance_" + self._id)
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s %(levelname)-8s {%("
"funcName)s:%(lineno)d} %(message)s",
"%Y-%m-%d %H:%M:%S")
handler.setFormatter(formatter)
self._logger.addHandler(handler)
# Prevent logging statements from being duplicated
self._logger.propagate = False
# End __init__ _BaseTask
def setup(self):
"""This method sets up the internal state variables of the
Client object like type_id, type_schema and create the
table monitor and connects to the server table monitor.
"""
self.type_id, self.type_schema = self._get_type_and_schema_for_table()
if ((self.type_schema is not None)
and (self.type_id is not None)):
self.record_type = RecordType.from_type_schema(
label="",
type_schema=self.type_schema,
properties={})
else:
raise GPUdbException(
"Failed to retrieve type_schema and type_id for table {}".format(
self.table_name))
if self._create_table_monitor(table_event=self.event_type):
self._connect_to_topic(self.zmq_url, self.topic_id)
# End setup _BaseTask
def _create_table_monitor(self, table_event):
"""This method creates the table monitor for the table name passed
in as a parameter to the constructor of the table monitor class. It
also caches the topic_id and the table schema in instance variables
so that decoding of the messages received could be done.
Returns:
Whether the table monitor creation succeeded
"""
try:
if (table_event == GPUdbTableMonitor.Client._TableEvent.DELETE):
retval = self.db.create_table_monitor(self.table_name,
options={
'event': 'delete'})
elif (table_event == GPUdbTableMonitor.Client._TableEvent.INSERT):
retval = self.db.create_table_monitor(self.table_name,
options={
'event': 'insert'})
elif (table_event == GPUdbTableMonitor.Client._TableEvent.UPDATE):
retval = self.db.create_table_monitor(self.table_name,
options={
'event': 'update'})
else:
raise GPUdbException("Invalid 'table_event' value .. cannot "
"create table monitor")
# Retain the topic ID, used in verifying the queued messages'
# source and removing the table monitor at the end
self.topic_id = retval["topic_id"]
# Retain the type schema for decoding queued messages
self.type_schema = retval["type_schema"]
return True
except GPUdbException as gpe:
self._logger.error(gpe.message)
return False
# End __create_table_monitor _BaseTask
def _remove_table_monitor(self):
""" Remove the established table monitor, by topic ID
"""
self.db.clear_table_monitor(self.topic_id)
# End _remove_table_monitor _BaseTask
def _connect_to_topic(self, table_monitor_queue_url, topic_id):
""" Create a connection to the message queue published to by the
table monitor, filtering messages for the specific topic ID returned
by the table monitor creation call
"""
# Connect to queue using specified table monitor URL and topic ID
self._logger.debug("Starting...")
self.socket.connect(table_monitor_queue_url)
topic_id = "".join(chr(x) for x in bytearray(topic_id, 'utf-8'))
self.socket.setsockopt_string(zmq.SUBSCRIBE, topic_id)
self._logger.debug(" Started!")
# End _connect_to_topic _BaseTask
def _disconnect_from_topic(self):
""" This method closes the server socket and terminates the context
"""
self._logger.debug(" Stopping...")
self.socket.close()
self.context.term()
self._logger.debug(" Stopped!")
# End _disconnect_from_topic _BaseTask
def run(self):
""" Process queued messages until this client is stopped externally.
Poll the message queue every second for new messages. When the
client is stopped externally, finish processing any messages in
the current batch, and then disconnect from the message queue and
remove the table monitor
"""
while not self.kill:
try:
self._fetch_message()
except zmq.ZMQError as zmqe:
if not self.kill:
self._logger.error("ZMQ connection error : %s" % zmqe.strerror)
# Try to re-create the table monitor, resorting to HA
if not self._check_state_on_inactivity_timeout_expiry():
self.kill = True
# End of while loop
# Clean up when this table monitor is stopped
self._disconnect_from_topic()
self._remove_table_monitor()
# End run _BaseTask
def stop(self):
""" This is a private method which just terminates the background
thread which subscribes to the server table monitor topic and receives
messages from it.
"""
self.kill = True
self.socket.close()
self.context.term()
# End stop _BaseTask
def _check_state_on_inactivity_timeout_expiry(self):
"""This method checks for table existence and other sanity checks while
the main message processing loop is idle because server on
successful polling returned nothing.
Returns:
Nothing
"""
self._logger.debug("In _check_state_on_inactivity_timeout_expiry ...")
table_monitor_created = False
try:
# Check whether the table is still valid
table_exists = self.db.has_table(self.table_name, options={})[
'table_exists']
current_full_url = self.full_url
if table_exists:
if (current_full_url != self.db.gpudb_full_url):
# HA taken over
# Cache the full_url value
self.full_url = self.db.gpudb_full_url
self._logger.warning("{} :: HA Switchover "
"happened : Current_full_url = {} "
"and "
"new_gpudb_full_url = {}".format(
self.id, current_full_url, self.db.gpudb_full_url))
new_type_id, new_type_schema = self._get_type_and_schema_for_table()
self._logger.debug(
"Old type_id = {} : New type_id = {}".format(
self.type_id, new_type_id))
# create table monitors if not terminated due to a
# table alteration
self._create_table_monitor(
table_event=self.event_type)
self.type_id = new_type_id
self.type_schema = new_type_schema
# Connect to the new topic_id
self._connect_to_topic(
table_monitor_queue_url=self.zmq_url,
topic_id=self.topic_id)
table_monitor_created = True
else:
self._quit_on_exception(
event_type=GPUdbTableMonitor.Client._TableEvent.TABLE_DROPPED,
message="Table %s does not "
"exist anymore ..."
% self.table_name)
except GPUdbException as gpe:
self._logger.error("GpuDb error : %s" % gpe.message)
except Exception as e:
self._quit_on_exception(event_type=None, message=str(e))
return table_monitor_created
# End _check_state_on_inactivity_timeout_expiry _BaseTask
def _quit_on_exception(self, event_type=None, message=None):
""" This method is invoked on an exception which could be difficult
to recover from and then it will simply terminate the background
thread and exit cleanly. It will also indicate the clients of the
table monitor by placing a special object 'None' in the shared Queue
so that the clients know that they should terminate as well and can
exit gracefully.
Parameters:
message
The exact exception message that could be logged for
further troubleshooting
"""
if message is not None:
self._logger.error(message)
if ((event_type == GPUdbTableMonitor.Client._TableEvent.TABLE_DROPPED)
and (self._table_dropped_callback is not None)):
self._table_dropped_callback.event_callback(message)
if ((event_type == GPUdbTableMonitor.Client._TableEvent.TABLE_ALTERED)
and (self._table_altered_callback is not None)):
self._table_altered_callback.event_callback(message)
# Connection to GPUDb failed or some other GPUDb failure, might as
# well quit
self.stop()
# End _quit_on_exception _BaseTask
def _get_type_and_schema_for_table(self):
""" This method retrieves the table schema and type_id and returns a
tuple composed of the values Args: table_name: The name of the table
for which the type_id and schema are to be retrieved
Returns:
A tuple containing the type_id and schema
"""
try:
show_table_response = self.db.show_table(
table_name=self.table_name, options={})
# Retrieve the latest schema
latest_type_schema = show_table_response['type_schemas'][0]
# Retrieve the latest type_id
latest_type_id = show_table_response['type_ids'][0]
return latest_type_id, latest_type_schema
except GPUdbException as gpe:
self._logger.error(gpe.message)
return None, None
# End __get_type_and_schema_for_table _BaseTask
def execute(self):
""" This method does the job of executing the task. It calls in sequence
_connect, start and _disconnect.
_connect connects to the server socket and sets up everything
start starts the background thread
_disconnect drops the server socket connection.
This is actually a template method where _connect and _disconnect are
implemented by the derived classes.
"""
self._connect()
self.start()
self._disconnect()
# End execute _BaseTask
def _connect(self):
""" Implemented by the derived classes _InsertWatcherTask,
_UpdateWatcherTask and _DeleteWatcherTask.
"""
raise NotImplementedError(
"Method '_connect' of '_BaseTask' must be overridden in the derived classes")
# End _connect _BaseTask
def _fetch_message(self):
""" This method is called by the run method which polls the socket and calls
the method _process_message for doing the actual processing.
_process_message is once again overridden in the derived classes.
"""
ret = self.socket.poll(self._options.inactivity_timeout)
if ret != 0:
self._logger.debug("Received message .. ")
messages = self.socket.recv_multipart()
self._process_message(messages)
else:
# ret==0, meaning nothing received from socket.
# Process all the other cases here since there is no
# message to be processed.
self._check_state_on_inactivity_timeout_expiry()
# End _fetch_message _BaseTask
def _process_message(self, messages):
""" This method does the actual processing of the messages received
from the socket and suitably calls the event handlers and
callback methods.
The implementations differ and are taken care of in the derived
classes since insert and delete/update are handled completely
differently.
This method has to be overridden in the derived classes
Parameters:
messages
"""
raise NotImplementedError(
"Method '_process_message' of '_BaseTask' must be overridden in the derived classes")
# End _process_message _BaseTask
def _disconnect(self):
"""Implemented by the derived classes _InsertWatcherTask,
_UpdateWatcherTask and _DeleteWatcherTask.
"""
raise NotImplementedError(
"Method '_disconnect' of '_BaseTask' must be overridden in the derived classes")
# End _disconnect _BaseTask
@property
def logging_level(self):
return self._logger.level
@logging_level.setter
def logging_level(self, value):
"""
This property sets the log level for this class and its derivatives.
Parameters:
value (logging.level)
Default setting is logging.INFO
Raises:
GPUdbException
If the value passed is not one of logging.INFO
or logging.DEBUG etc.
"""
try:
self._logger.setLevel(value)
except (ValueError, TypeError, Exception) as ex:
raise GPUdbException("Invalid log level: '{}'".format(str(ex)))
# End class _BaseTask
class _InsertWatcherTask(_BaseTask):
""" This is the class which handles only inserts and subsequent processing
of the messages received as a result of notifications from server on
insertions of new records into the table.
"""
def __init__(self, db, table_name,
options=None, callbacks=None):
"""
[summary]
Parameters:
db (GPUdb)
Handle to GPUdb instance
table_name (str)
Name of the table to create the monitor for
options (:class:`GPUdbTableMonitor.Options`)
Options to configure Client
callbacks (list of :class:`GPUdbTableMonitor.Callback`)
List of Callback objects passed by user to be called on various
events relevant to Insert operation
Order of callbacks in the list:
- 0 - insert_raw callback
- 1 - insert_decoded callback
- 2 - table_dropped callback
- 3 - table_altered callback
"""
table_event = GPUdbTableMonitor.Client._TableEvent.INSERT
self._callbacks = None if callbacks is None else callbacks
self.__cb_insert_raw = None if self._callbacks is None else self._callbacks[0]
self.__cb_insert_decoded = None if self._callbacks is None else self._callbacks[1]
super(_InsertWatcherTask, self).__init__(db,
table_name,
table_event=table_event,
table_dropped_callback=self._callbacks[2],
table_altered_callback=self._callbacks[3],
options=options,
id='INSERT_' + table_name)
def _connect(self):
""" Overrides the base class method
wrapping the call to setup method.
"""
self.setup()
def _try_decoding_on_table_altered(self, message_data):
"""This method tries to decode with the new type schema in case a
table has been altered. The method will retry forever and would
only fail if the 'DECODE_FAILURE_THRESHOLD_SECS' seconds have
elapsed and still the decoding of the message has failed.
Parameters:
message_data
The raw message data (binary)
Returns:
record
The decoded record if there is one else None
"""
record = None
try:
# retry with refreshed type details id
# and schema
new_type_id, new_type_schema = self._get_type_and_schema_for_table()
self.record_type = RecordType.from_type_schema(
label="",
type_schema=new_type_schema,
properties={})
record = dict(GPUdbRecord.decode_binary_data(self.record_type,
message_data)[0])
# Update the instance variables on
# success
self.type_id, self.type_schema = new_type_id, new_type_schema
except Exception as e:
self._logger.error("Exception received "
"while decoding : "
"%s" % str(e))
self._logger.error(
"Failed to decode message %s with "
"updated schema %s" % message_data,
self.type_schema)
return record
# End _try_decoding_on_table_altered
def _process_message(self, messages):
""" Process only messages assuming that they are inserts.
Parameters:
messages (list)
Multi-part messages received from a single socket poll.
"""
topic_id_recvd = str(messages[0], 'utf-8')
self._logger.info("Topic_id_received = " + topic_id_recvd)
# Process all messages, skipping the (first) topic frame
for message_index, message_data in enumerate(messages[1:]):
if (self.__cb_insert_raw is not None
and self.__cb_insert_raw.event_callback is not None):
try:
self.__cb_insert_raw.event_callback(message_data)
except Exception as e:
self._logger.error(e)
raise GPUdbException(str(e))
# Decode the record from the message using the type
# schema, initially returned during table monitor
# creation
if (self.__cb_insert_decoded is not None
and self.__cb_insert_decoded.event_callback is not None):
try:
record = dict(GPUdbRecord.decode_binary_data(self.record_type,
message_data)[0])
try:
self.__cb_insert_decoded.event_callback(record)
except Exception as cbe:
self._logger.error(cbe)
raise GPUdbException("Exception in calling event_callback"
"for insert decoded : " + str(cbe))
except Exception as e:
# The exception could only be because of some
# issue with decoding the data; possibly due to
# a different schema resulting from a table
# alteration.
self._logger.error(
"Exception received while decoding {}".format(str(e)))
# Attempt recovery once anyway
record = self._try_decoding_on_table_altered(
message_data)
if (record is None):
if (self.__cb_insert_decoded.event_options.error_mode
== GPUdbTableMonitor.Callback.InsertDecodedOptions.DecodeFailureMode.ABORT):
self._logger.error("Failed to decode message {} "
"with schema {}".format(
message_data,
self.type_schema))
if (self.__cb_insert_decoded.error_callback is not None):
self.__cb_insert_decoded.error_callback("Failed to decode message {} "
"with schema {}".format(
message_data,
self.type_schema))
self._quit_on_exception(GPUdbTableMonitor.Client._TableEvent.TABLE_ALTERED,
"Table altered, "
"terminating ..."
)
elif (self.__cb_insert_decoded.event_options.error_mode
== GPUdbTableMonitor.Callback.InsertDecodedOptions.DecodeFailureMode.SKIP):
# This is the case of DecodeFailureMode.SKIP
# Emit a waring log message and skip over to
# subsequent records.
self._logger.warning("Failed to decode message {} "
"with schema {}, skipping to next "
"records.".format(message_data,
self.type_schema))
continue
else:
self._logger.error("Unknown 'DecodeFailureMode' found .. cannot handle")
raise GPUdbException("Unknown 'DecodeFailureMode' found .. cannot handle")
else:
# Decoded second time, send the notification
self.__cb_insert_decoded.event_callback(record)
# End if (not decoded)
# End _process_message _InsertWatcherTask(_BaseTask)
def _disconnect(self):
"""
"""
self.stop()
class _UpdateWatcherTask(_BaseTask):
""" This is the class which handles only updates and subsequent processing
of the messages received as a result of notifications from server on
updates to the records of a table.
"""
def __init__(self, db, table_name,
options=None,
callbacks=None,
):
"""
Constructor of the class _UpdateWatcherTask which inherits from
_BaseTask
Parameters:
db (GPUdb)
Handle to GPUdb instance
table_name (str)
Name of the table to create the monitor for
options (:class:`GPUdbTableMonitor.Options`)
Options to configure Client
callbacks (list of :class:`GPUdbTableMonitor.Callback`)
List of Callback objects passed by user to be called on
various events relevant to Update operation
Order of callbacks in the list:
- 0 - updated callback
- 1 - table_dropped callback
- 2 - table_altered callback
"""
table_event = GPUdbTableMonitor.Client._TableEvent.UPDATE
self._callbacks = None if (callbacks is None
or (not isinstance(callbacks, list)
or len(callbacks) <= 0)) else callbacks
self.__cb_update = self._callbacks[0]
super(_UpdateWatcherTask, self).__init__(db,
table_name=table_name,
table_event=table_event,
table_dropped_callback=self._callbacks[1],
table_altered_callback=self._callbacks[2],
options=options,
id='UPDATE_' + table_name)
def _connect(self):
""" Overrides the base class method
wrapping the call to setup method.
"""
self.setup()
def _process_message(self, messages):
"""
"""
topic_id_recvd = str(messages[0], 'utf-8')
# Process all messages, skipping the (first) topic frame
# Decode the record from the message using the type
# schema, initially returned during table monitor
# creation
if (self.__cb_update is not None
and self.__cb_update.event_callback is not None):
try:
returned_obj = dict(
GPUdbRecord.decode_binary_data(self.type_schema, messages[1])[
0])
self.__cb_update.event_callback(returned_obj["count"])
self._logger.debug("Topic Id = {} , record = {} "
.format(topic_id_recvd,
returned_obj["count"]))
except Exception as e:
# The exception could only be because of some
# issue with decoding the data; possibly due to
# a different schema resulting from a table
# alteration.
self._logger.error(
"Exception received while decoding {}".format(
str(e)))
self._logger.error("Failed to decode message {} "
"with schema {}".format(
messages[1],
self.type_schema
))
# End _process_message _UpdateWatcherTask(_BaseTask)
def _disconnect(self):
"""
"""
self.stop()
class _DeleteWatcherTask(_BaseTask):
""" This is the class which handles only deletes and subsequent processing
of the messages received as a result of notifications from server on
on deletions of records of a table.
"""
def __init__(self, db, table_name,
options=None, callbacks=None):
"""
Constructor of the _DeleteWatcherTask class
Parameters:
db (GPUdb)
Handle to GPUdb instance
table_name (str)
Name of the table to create the monitor for
options (:class:`GPUdbTableMonitor.Options`)
Options to configure Client
callbacks (List of :class:`GPUdbTableMonitor.Callback`)
List of Callback objects passed by user to be called on various
events relevant to the Delete operation
Order of callbacks in the list:
- 0 - deleted callback
- 1 - table_dropped callback
- 2 - table_altered callback
"""
table_event = GPUdbTableMonitor.Client._TableEvent.DELETE
self._callbacks = (None if callbacks is None
or (not isinstance(callbacks, list)
or len(callbacks) <= 0) else callbacks)
self.__cb_delete = None if self._callbacks is None else self._callbacks[0]
super(_DeleteWatcherTask, self).__init__(db,
table_name=table_name,
table_event=table_event,
table_dropped_callback=self._callbacks[1],
table_altered_callback=self._callbacks[2],
options=options,
id='DELETE_' + table_name)
def _connect(self):
""" Overrides the base class method
wrapping the call to setup method.
"""
self.setup()
def _process_message(self, messages):
"""
"""
self._logger.debug("Messages = %s" % messages)
topic_id_recvd = str(messages[0], 'utf-8')
# Process all messages, skipping the (first) topic frame
# Decode the record from the message using the type
# schema, initially returned during table monitor
# creation
if (self.__cb_delete is not None
and self.__cb_delete.event_callback is not None):
try:
retobj = dict(
GPUdbRecord.decode_binary_data(self.type_schema, messages[1])[
0])
self.__cb_delete.event_callback(retobj["count"])
self._logger.debug("Topic Id = {} , record = {} "
.format(topic_id_recvd,
retobj["count"]))
except Exception as e:
self._logger.error(
"Exception received while decoding {}".format(str(e)))
self._logger.error("Failed to decode message {} "
"with schema {}".format(messages[1],
self.type_schema))
# End _process_message _DeleteWatcherTask(_BaseTask)
def _disconnect(self):
"""
"""
self.stop()