The following is a complete example, using the Python UDF API, of a non-CUDA
UDF that demonstrates how to build multiple models based on distributed data
and combine them into an ensemble. This example (and
others) can be found in the Python UDF API
repository; this repository can be downloaded/cloned from
GitHub.
References
Prerequisites
The general prerequisites for using UDFs in Kinetica can be found under
UDF Prerequisites.
This example cannot run on Mac OSX
The following items are also necessary:
Visit the Conda website to download
the Miniconda installer for Python 3.
UDF API Download
This example requires local access to the Python UDF API repository. In the
desired directory, run the following but be sure to replace
<kinetica-version> with the name of the installed Kinetica version, e.g,
v7.2:
git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-udf-api-python.git
Relevant Scripts
There are six files associated with the distributed model UDF example, all
of which can be found in the Python UDF API repo.
-
A database setup script (
test_environment.py) that is called from the
initialization script
-
An initialization script (
setup_db.py) that creates input tables and
inserts data into them and creates an output ensemble table
-
Two scripts to register and execute the UDFs:
- a
register_execute_train.py script to register and execute the
model training UDF
- a
register_execute_test.py script to register and execute the
prediction model combination UDF
-
Two UDFs to train and test the model:
- a
dt_train UDF that trains and stores a decision tree model
- a
dt_test UDF that combines model predictions into an ensemble
This example runs inside a Conda environment. The environment can be
automatically configured using the conda_env_py3.yml file found in the
Python UDF API repository.
-
In the same directory you cloned the API, change directory into the root
folder of the Python UDF API repository:
cd kinetica-udf-api-python/
-
Create the Conda environment, replacing
<environment name> with the
desired name:
conda env create --name <environment name> --file conda_env_py3.yml
It may take a few minutes to create the environment.
-
Verify the environment was created properly:
-
Activate the new environment:
conda activate <environment name>
-
Install PyGDF:
conda install -c numba -c conda-forge -c gpuopenanalytics/label/dev -c defaults pygdf=0.1.0a2
-
Install the Kinetica Python API:
-
Add the Python UDF API repo’s root directory to the PYTHONPATH:
export PYTHONPATH=$(pwd):$PYTHONPATH
-
Edit the
util/test_environment.py script for the correct database url,
user, and password for your Kinetica instance:
URL = 'http://localhost:9191;CombinePrepareAndExecute=1;RowsPerFetch=20000'
USER = 'admin'
PASSWORD = 'admin123'
UDF Deployment
-
Change directory into the UDF distributed model example directory:
cd examples/UDF_distributed_model
-
Run the UDF initialization script:
-
Run the execute script for the training UDF:
python register_execute_train.py --url <kinetica-url> --username <kinetica-user> --password <kinetica-pass>
-
Run the execute script for the model combination UDF:
python register_execute_test.py --url <kinetica-url> --username <kinetica-user> --password <kinetica-pass>
-
Verify the results in GAdmin, either on the
logging page or in the
example_loan_inference_result output table on the
table view page.
Execution Detail
This example details two distributed UDFs to train and test model prediction
accuracy. The training and testing occurs using the following schema and tables:
example_udf_python — the schema containing all of the below
tables
example_loan_train_data — the training table and input table for the
dt_train proc. The algorithm to predict if a loan is bad or not is
trained against this table.
ensemble_models — the intermediary model storage table that stores all
the models after they’ve been trained against the training table
example_loan_test_data — the testing table and input table for the
dt_test proc. The trained algorithm is tested against this table
and the accuracy of the algorithm’s predictions are calculated.
example_loan_inference_result — the results table and output table for
the dt_test proc. The actual bad loan boolean and predicted bad loan
boolean are compared per loan in this table.
The training table contains 11 columns, each containing values from the
loan_sample dataset (located in kinetica-udf-api-python/util/):
loan_amnt — a float column representing the loan amount
int_rate — a float column representing the loan’s interest rate
emp_length — a float column representing the borrower’s employment
length
annual_inc — a float column representing the borrower’s annual income
dti — a float column representing the borrower’s debt-to-income ratio
delinq_2yrs — a float column representing delinquent payments against
the loan for the last two years
revol_util — a float column representing the borrower’s revolving line
utilization rate
total_acc — a float column representing the borrower’s total amount
of accounts
bad_loan — a float column representing if the loan was bad (1) or
not (0)
longest_credit_length — a float column representing the borrower’s
longest credit line length
record_id — a float column to identify the loan
The testing table has the same columns as the training table.
The results table contains three columns:
record_id — a float column to identify the loan
bad_loan_actual — a float column that is the bad_loan value from
the test table
bad_loan_predicted — a float column that is the bad_loan value from
the models
The first UDF executed, dt_train, uses the training table to train
decision tree models, evaluate the models’ accuracy against the
training table, and then store the models in the model storage table in
Kinetica.
The model storage table contains six columns:
model — a bytes column that stores the model
name — an unrestricted string column that is the name of the model
rank — an int column that represents the processing node container that
holds the processing nodes for the database
tom — an int column that represents the processing node that contains
the many shards of data inside the database
num_input_data — an int column that is the number of records used as
input data for the model
date_created — a datetime column that is the date and time the model
was created
The second UDF executed, dt_test, combines all the models
in the model storage table and evaluates the combined ensemble accuracy
and predictions against the testing table. The average accuracy of the
individual models and the ensemble are calculated. The predictions vs.
actual values against each loan are then stored in the results table.
Any output to the system log can be viewed in the GAdmin
logging page.
Database Setup
The setup script, setup_db.py, which creates all the necessary tables for
the UDFs, imports the test_environment.py script to access its methods:
from util import test_environment as te
The script calls three methods from test_environment.py:
te.create_schema()
te.prepare_loan_data()
te.create_ensemble_model_table()
The three methods in test_environment.py require a connection to Kinetica.
This is done by instantiating an object of the GPUdb class with a provided
connection URL. See Connecting via API for details on the URL format and
how to look it up.
URL = 'http://localhost:9191'
USER = 'admin'
PASSWORD = 'admin123'
DB_HANDLE = gpudb.GPUdb(host=[URL], username=USER, password=PASSWORD)
The create_schema() method creates the schema that will contain all tables
used in the example:
SCHEMA = 'example_udf_python'
DB_HANDLE.create_schema(SCHEMA, options={'no_error_if_exists': 'true'})
The prepare_loan_data() method creates the types and tables for the
testing, training, and results tables, but the tables are removed first
if they already exist:
LOAN_TRAIN_DATA_TABLE_NAME = SCHEMA + ".example_loan_train_data"
LOAN_TEST_DATA_TABLE_NAME = SCHEMA + ".example_loan_test_data"
LOAN_INFERENCE_TABLE_NAME = SCHEMA + ".example_loan_inference_result"
LOAN_DATA_TYPE = """
{
"type": "record",
"name": "loan_type",
"fields": [
{"name":"loan_amnt","type":"float"},
{"name":"int_rate","type":"float"},
{"name":"emp_length","type":"float"},
{"name":"annual_inc","type":"float"},
{"name":"dti","type":"float"},
{"name":"delinq_2yrs","type":"float"},
{"name":"revol_util","type":"float"},
{"name":"total_acc","type":"float"},
{"name":"bad_loan","type":"float"},
{"name":"longest_credit_length","type":"float"},
{"name":"record_id","type":"float"}
]
}"""
LOAN_INFERENCE_TYPE = """
{
"type": "record",
"name": "loan_type",
"fields": [
{"name":"record_id","type":"float"},
{"name":"bad_loan_actual","type":"float"},
{"name":"bad_loan_predicted","type":"float"}
]
}"""
if DB_HANDLE.has_table(table_name=LOAN_TRAIN_DATA_TABLE_NAME)['table_exists']:
DB_HANDLE.clear_table(LOAN_TRAIN_DATA_TABLE_NAME)
if DB_HANDLE.has_table(table_name=LOAN_TEST_DATA_TABLE_NAME)['table_exists']:
DB_HANDLE.clear_table(LOAN_TEST_DATA_TABLE_NAME)
if DB_HANDLE.has_table(table_name=LOAN_INFERENCE_TABLE_NAME)['table_exists']:
DB_HANDLE.clear_table(LOAN_INFERENCE_TABLE_NAME)
response = DB_HANDLE.create_type(type_definition=LOAN_DATA_TYPE, label=LOAN_TRAIN_DATA_TABLE_NAME)
type_id = response['type_id']
response = DB_HANDLE.create_table(table_name=LOAN_TRAIN_DATA_TABLE_NAME, type_id=type_id)
print("Create loan train data table response status: {}".format(response['status_info']['status']))
response = DB_HANDLE.create_table(table_name=LOAN_TEST_DATA_TABLE_NAME, type_id=type_id)
print("Create loan test data table response status: {}".format(response['status_info']['status']))
response = DB_HANDLE.create_type(type_definition=LOAN_INFERENCE_TYPE, label=LOAN_INFERENCE_TABLE_NAME)
type_id = response['type_id']
response = DB_HANDLE.create_table(table_name=LOAN_INFERENCE_TABLE_NAME, type_id=type_id)
print("Create loan inference table response status: {}".format(response['status_info']['status']))
Then the method converts the loan_sample data into a pandas data frame. Two
lists are initialized, which will be used to insert records into the training
table and the testing table. A simple counter is initialized as well:
pythonpath = os.environ['PYTHONPATH'].split(os.pathsep)[0]
records = pd.read_csv(pythonpath + '/util/loan_sample.csv.zip', sep=',', quotechar='"').values
print('Inserting loan data, this may take a few minutes.')
encoded_obj_list_train = []
encoded_obj_list_test = []
i = 0
For each record in the dataset, its ordered values are assigned to the columns
in the training and testing tables. From the dataset, 10% of it is added to
the testing table’s list; the other 90% is added to the training table’s
list. Once the batch size of 1000 records has been reached, the records are
inserted in the database. The lists are cleared and the process repeats:
for record in records:
i += 1
datum = collections.OrderedDict()
datum['loan_amnt'] = float(record[0])
datum['int_rate'] = float(record[1])
datum['emp_length'] = float(record[2])
datum['annual_inc'] = float(record[3])
datum['dti'] = float(record[4])
datum['delinq_2yrs'] = float(record[5])
datum['revol_util'] = float(record[6])
datum['total_acc'] = float(record[7])
datum['bad_loan'] = float(record[8])
datum['longest_credit_length'] = float(record[9])
datum['record_id'] = float(i)
if i % 10 == 0:
encoded_obj_list_test.append(DB_HANDLE.encode_datum(LOAN_DATA_TYPE, datum))
else:
encoded_obj_list_train.append(DB_HANDLE.encode_datum(LOAN_DATA_TYPE, datum))
if i % 1000 == 0:
response = DB_HANDLE.insert_records(table_name=LOAN_TRAIN_DATA_TABLE_NAME, data=encoded_obj_list_train,
list_encoding='binary', options={})
if response['status_info']['status'] == "ERROR":
print("Insert train response: {}".format(response))
response = DB_HANDLE.insert_records(table_name=LOAN_TEST_DATA_TABLE_NAME, data=encoded_obj_list_test,
list_encoding='binary', options={})
if response['status_info']['status'] == "ERROR":
print("Insert test response: {}".format(response))
encoded_obj_list_train = []
encoded_obj_list_test = []
If there’s not enough records in the lists at the end of the loop to make a full
batch size, the records are flushed to the database:
response = DB_HANDLE.insert_records(table_name=LOAN_TRAIN_DATA_TABLE_NAME, data=encoded_obj_list_train,
list_encoding='binary')
if response['status_info']['status'] == "ERROR":
print("Insert response: {}".format(response))
response = DB_HANDLE.insert_records(table_name=LOAN_TEST_DATA_TABLE_NAME, data=encoded_obj_list_train,
list_encoding='binary')
if response['status_info']['status'] == "ERROR":
print("Insert response: {}".format(response))
print('\nAll data inserted.')
The create_ensemble_model_table() method creates the type and table for the
model storage table, but the table is removed first if it already exists:
ENSEMBLE_MODEL_TABLE_NAME = SCHEMA + ".ensemble_models"
ENSEMBLE_MODEL_TYPE = MODEL_TYPE = """
{
"type": "record",
"name": "ensemble_mode_type",
"fields": [
{"name":"model","type":"bytes"},
{"name":"name","type":"string"},
{"name":"rank","type":"int"},
{"name":"tom","type":"int"},
{"name":"num_input_data","type":"int"},
{"name":"date_time_created","type":"string"}
]
}"""
ENSEMBLE_MODEL_TYPE_PROPERTIES = {"date_time_created": ["datetime"]}
if DB_HANDLE.has_table(table_name=ENSEMBLE_MODEL_TABLE_NAME)['table_exists']:
DB_HANDLE.clear_table(ENSEMBLE_MODEL_TABLE_NAME)
response = DB_HANDLE.create_type(type_definition=ENSEMBLE_MODEL_TYPE, label=ENSEMBLE_MODEL_TABLE_NAME,
properties=ENSEMBLE_MODEL_TYPE_PROPERTIES)
response = DB_HANDLE.create_table(table_name=ENSEMBLE_MODEL_TABLE_NAME, type_id=response['type_id'])
print("Create model_table response status: {}".format(response['status_info']['status']))
Model Training UDF (dt_train.py)
First, several packages are imported to access the Kinetica Python UDF API,
decision tree modeling, object serialization, test_environment.py methods,
and an accuracy score calculation:
from kinetica_proc import ProcData
from sklearn import tree
import pickle
import test_environment as te
from sklearn.metrics import accuracy_score
Next, the file gets a handle to the ProcData() class:
To output the number of values found on each processing node and processing node
container, the rank and tom number in the request info map pointing to the
current instance of the UDF are mapped and displayed:
rank_number = proc_data.request_info['rank_number']
tom_number = proc_data.request_info['tom_number']
print('\nUDF train r{}_t{}: instantiated.'.format(rank_number, tom_number))
The data from the input table (example_loan_train_data)—excluding the
records with NaN or null values—is converted to a pandas dataframe. The
number of input records is derived from the dataframe. Let y be the label
with the bad_loan column values; let X be the attributes with
rest of the column values:
training_data = proc_data.to_df().dropna() # only use non-NAN rows
num_input_data = training_data.shape[0]
X = training_data[['loan_amnt', 'int_rate', 'emp_length', 'annual_inc', 'dti', 'delinq_2yrs', 'revol_util', 'total_acc',
'longest_credit_length']]
y = training_data[['bad_loan']]
The decision tree is created:
print('UDF train r{}_t{}: learning model on {} data points.'.format(rank_number, tom_number, num_input_data))
tree_classifier = tree.DecisionTreeClassifier(class_weight=None, criterion='entropy', max_depth=7, max_features=None,
max_leaf_nodes=None, min_samples_leaf=5, min_samples_split=2,
min_weight_fraction_leaf=0.0, presort=False, random_state=100,
splitter='best')
As long as there are more than 10 values stored in the attributes group, the
model is created by training the tree algorithm on X and y:
if X.shape[0] > 10:
model = tree_classifier.fit(X, y)
The accuracy of the new algorithm is calculated by comparing y (actual
bad_loan values) to the model’s predictions based on the values
in X. The accuracy of the algorithm is then output:
acc = accuracy_score(y, model.predict(X))
print('UDF train r{}_t{}: training accuracy: {}'.format(rank_number, tom_number, acc))
The models are converted to byte arrays and stored as records in the
model storage table:
print('UDF train r{}_t{}: storing model.'.format(rank_number, tom_number))
model_byte_array = pickle.dumps(model)
te.store_ensemble_model(model_byte_array, 'sklearn_dt', rank_number, tom_number, num_input_data, acc)
def store_ensemble_model(model_binary, name, rank, tom, num_input_data, train_acc):
"""Store serialized model object into table"""
encoded_object_list = []
datum = collections.OrderedDict()
datum['model'] = model_binary
datum['name'] = str(name)
datum['rank'] = int(rank)
datum['tom'] = int(tom)
datum['num_input_data'] = int(num_input_data)
datum['date_time_created'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
encoded_object_list.append(DB_HANDLE.encode_datum(ENSEMBLE_MODEL_TYPE, datum))
response = DB_HANDLE.insert_records(table_name=ENSEMBLE_MODEL_TABLE_NAME, data=encoded_object_list,
list_encoding='binary')
print(response)
Call complete() to tell Kinetica the proc code is finished:
Training UDF Registration (register_execute_train.py)
To interact with Kinetica, an object of the GPUdb class is instantiated
while providing the connection URL of the database server.
main(gpudb.GPUdb(host=[args.url], username=args.username, password=args.password))
To upload the dt_train.py, kinetica_proc.py, and test_environment.py
files to Kinetica, they will first need to be read in as bytes and added to a
file data map:
file_paths = ["dt_train.py", "../../kinetica_proc.py", "../../util/test_environment.py"]
files = {}
for script_path in file_paths:
script_name = os.path.basename(script_path)
with open(script_path, 'rb') as f:
files[script_name] = f.read()
After the files are placed in a data map, the distributed distributed_train
proc is created in Kinetica and the files are associated with it:
proc_name = 'distributed_train'
print("Registering proc...")
response = db_handle.create_proc(proc_name, 'distributed', files, 'python', [file_paths[0]], {})
print(response)
The proc requires the proper command and args to be executed. In
this case, the assembled command line would be:
Finally, after the proc is created, it is executed. The training table created
in the Database Setup section is passed in here:
print("Executing proc...")
response = db_handle.execute_proc(proc_name, {}, {}, [te.LOAN_TRAIN_DATA_TABLE_NAME], {}, [], {})
print(response)
Model Testing UDF (dt_test.py)
First, several packages are imported to access the Kinetica Python UDF API,
object serialization, test_environment.py methods, an accuracy score
calculation, and numpy math methods:
from kinetica_proc import ProcData
import pickle
import test_environment as te
from sklearn.metrics import accuracy_score
import numpy as np
Next, the file gets a handle to the ProcData() class:
To output the number of values found on each processing node and processing node
container, the rank and tom number in the request info map pointing to the
current instance of the UDF are mapped and displayed:
rank_number = proc_data.request_info['rank_number']
tom_number = proc_data.request_info['tom_number']
print('\nUDF test r{}_t{}: instantiated.'.format(rank_number, tom_number))
The data from the input table (example_loan_test_data)—excluding the
records with NaN or null values—is converted to a pandas dataframe. The
number of input records is derived from the dataframe. Let y_actual
be the label with the bad_loan column values; let X be the attributes
with the rest of the column values. Let record_ids represent the loan IDs:
test_data = proc_data.to_df().dropna()
num_test_data = test_data.shape[0]
X = test_data[['loan_amnt', 'int_rate', 'emp_length', 'annual_inc', 'dti', 'delinq_2yrs', 'revol_util', 'total_acc',
'longest_credit_length']]
y_actual = test_data[['bad_loan']]
record_ids = test_data[['record_id']]
Get a handle to the output table from the proc
(example_loan_inference_result). Its size is expanded to match the number of
records from the dataframe; this will allocated enough memory to copy all
input records to the output table. Handles are created for the columns that
will be written to: record_id, bad_loan_actual, and
bad_loan_predicted:
out_table = proc_data.output_data[0]
out_table.size = num_test_data
record_id_column, y_actual_column, y_predicted_column = out_table[0], out_table[1], out_table[2]
If the number of input records is greater than 0, the models from the
model storage table are retrieved, an array is created to hold a number of
0. values equal to the number of input records, and an empty list is
initialized:
if num_test_data > 0:
models_from_db = te.load_ensemble_models('sklearn_dt')
sum_predictions = np.full(num_test_data, 0.)
accuracies = list()
The load_ensemble_models method from test_environment.py gets the
models, decodes the records, and places them in a list:
def load_ensemble_models(name, limit=100):
"""Load serialized ensemble model objects that share the same name from table."""
response = DB_HANDLE.get_records(table_name=ENSEMBLE_MODEL_TABLE_NAME, offset=0, limit=limit,
options={"expression": "(name='" + str(name) + "')"})
res_decoded = gpudb.GPUdbRecord.decode_binary_data(response["type_schema"], response["records_binary"])
result_dict_list = []
for current_result in res_decoded:
current_dict = dict()
current_dict['model'] = current_result['model']
current_dict['num_input_data'] = current_result['num_input_data']
current_dict['rank'] = current_result['rank']
current_dict['tom'] = current_result['tom']
current_dict['date_time_created'] = current_result['date_time_created']
result_dict_list.append(current_dict)
return result_dict_list
For each model, the object is deserialized and loaded into the UDF. The models’
predictions are stored and the accuracy
of the prediction against the testing table’s bad_loan
values is calculated. The calculated accuracy is added to the accuracies
list, and the prediction value is added to the summed predictions array:
for current_model_dict in models_from_db:
model = pickle.loads(current_model_dict['model'])
y_test_predict = model.predict(X.values)
test_accuracy = accuracy_score(y_actual, y_test_predict)
accuracies.append(test_accuracy)
sum_predictions += y_test_predict
The prediction value from the combined list is calculated by dividing the sum
of each model’s prediction by the number of models. The models’ combined
accuracy is also calculated and displayed. The record IDs, actual bad_loan
value, and predicted bad_loan value are added as values to the matching
output table columns.
combined_predictions = np.around(sum_predictions / len(models_from_db))
combined_accuracy = accuracy_score(y_actual, combined_predictions)
print('Average accuracies of {} models: {}'.format(len(accuracies), (sum(accuracies) / float(len(accuracies)))))
print('Ensemble accuracy: {}'.format(combined_accuracy))
record_id_column[:] = record_ids.values
y_actual_column[:] = y_actual.values
y_predicted_column[:] = combined_predictions
If no data was found on the tom, finish processing:
else:
print('UDF test r{}_t{}: no test data on tom.\n'.format(rank_number, tom_number))
Call complete() to tell Kinetica the proc code is finished:
Testing UDF Registration (register_execute_test.py)
To interact with Kinetica, an object of the GPUdb class is instantiated
while providing the connection URL of the database server.
main(gpudb.GPUdb(host=[args.url], username=args.username, password=args.password))
To upload the dt_test.py, kinetica_proc.py, and test_environment.py
files to Kinetica, they will first need to be read in as bytes and added to a
file data map:
file_paths = ["dt_test.py", "../../kinetica_proc.py", "../../util/test_environment.py"]
files = {}
for script_path in file_paths:
script_name = os.path.basename(script_path)
with open(script_path, 'rb') as f:
files[script_name] = f.read()
After the files are placed in a data map, the distributed distributed_test
proc is created in Kinetica and the files are associated with it:
proc_name = 'distributed_test'
print("Registering proc...")
response = db_handle.create_proc(proc_name, 'distributed', files, 'python', [file_paths[0]], {})
print(response)
The proc requires the proper command and args to be executed. In
this case, the assembled command line would be:
Finally, after the proc is created, it is executed. The testing and
results table created in the Database Setup section are passed in here:
TEST_INPUT_TABLE = te.LOAN_TEST_DATA_TABLE_NAME
TEST_OUTPUT_TABLE = te.LOAN_INFERENCE_TABLE_NAME
print("Executing proc...")
response = db_handle.execute_proc(proc_name, {}, {}, [TEST_INPUT_TABLE], {}, [TEST_OUTPUT_TABLE], {})
print(response)