The following is a complete example, using the Python UDF API, of a non-CUDA
UDF that demonstrates how to build multiple models based on distributed data
and combine them into an ensemble. This example (and
others) can be found in the Python UDF API
repository; this repository comes with Kinetica by default (located in
/opt/gpudb/udf/api/python/
) or can be downloaded/cloned from
GitHub.
The general prerequisites for using UDFs in Kinetica can be found on the User-Defined Function Implementation page.
Important
This example cannot run on Mac OSX
The following items are also necessary:
Note
Visit the Conda website to download the Miniconda installer for Python 3.
This example requires local access to the Python UDF API repository. In the
desired directory, run the following but be sure to replace
<kinetica-version>
with the name of the installed Kinetica version, e.g,
v7.0
:
git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-udf-api-python.git
There are six files associated with the distributed model UDF example, all of which can be found in the Python UDF API repo.
test_environment.py
) that is called from the
initialization scriptsetup_db.py
) that creates input tables and
inserts data into them and creates an output ensemble tableregister_execute_train.py
script to register and execute the
model training UDFregister_execute_test.py
script to register and execute the
prediction model combination UDFdt_train
UDF that trains and stores a decision tree modeldt_test
UDF that combines model predictions into an ensembleThis example runs inside a Conda environment. The environment can be
automatically configured using the conda_env_py3.yml
file found in the
Python UDF API repository.
In the same directory you cloned the API, change directory into the root folder of the Python UDF API repository:
cd kinetica-udf-api-python/
Create the Conda environment, replacing <environment name>
with the
desired name:
conda env create --name <environment name> --file conda_env_py3.yml
Note
It may take a few minutes to create the environment.
Verify the environment was created properly:
conda info --envs
Activate the new environment:
source activate <environment name>
Install PyGDF:
conda install -c numba -c conda-forge -c gpuopenanalytics/label/dev -c defaults pygdf=0.1.0a2
Install the Kinetica Python API:
pip install gpudb~=7.0.0
Add the Python UDF API repo's root directory to the PYTHONPATH:
export PYTHONPATH=$(pwd):$PYTHONPATH
Edit the util/test_environment.py
script for the correct host, port,
user, and password for your Kinetica instance:
HOST = 'http://localhost'
PORT = '9191'
USER = 'testuser'
PASSWORD = 'Testuser123!'
Change directory into the UDF distributed model example directory:
cd examples/UDF_distributed_model
Run the UDF initialization script:
python setup_db.py
Update the host and port in register_execute_train.py
and
register_execute_test.py
as necessary:
main(gpudb.GPUdb(encoding='BINARY', host='127.0.0.1', port='9191'))
Run the execute script for the training UDF:
python register_execute_train.py
Run the execute script for the model combination UDF:
python register_execute_test.py
Verify the results in the /opt/gpudb/core/logs/gpudb-proc.log
file on
the head node and/or the example_loan_inference_result
table via GAdmin.
This example details two distributed UDFs to train and test model prediction accuracy. The training and testing occurs using the following tables:
example_loan_train_data
-- the training table and input table for the
dt_train
proc. The algorithm to predict if a loan is bad or not is
trained against this table.ensemble_models
-- the intermediary model storage table that stores all
the models after they've been trained against the training tableexample_loan_test_data
-- the testing table and input table for the
dt_test
proc. The trained algorithm is tested against this table
and the accuracy of the algorithm's predictions are calculated.example_loan_inference_result
-- the results table and output table for
the dt_test
proc. The actual bad loan boolean and predicted bad loan
boolean are compared per loan in this table.The training table contains 11 columns, each containing values from the
loan_sample
dataset (located in kinetica-udf-api-python/util/
):
loan_amnt
-- a float column representing the loan amountint_rate
-- a float column representing the loan's interest rateemp_length
-- a float column representing the borrower's employment
lengthannual_inc
-- a float column representing the borrower's annual incomedti
-- a float column representing the borrower's debt-to-income ratiodelinq_2yrs
-- a float column representing delinquent payments against
the loan for the last two yearsrevol_util
-- a float column representing the borrower's revolving line
utilization ratetotal_acc
-- a float column representing the borrower's total amount
of accountsbad_loan
-- a float column representing if the loan was bad (1
) or
not (0
)longest_credit_length
-- a float column representing the borrower's
longest credit line lengthrecord_id
-- a float column to identify the loanThe testing table has the same columns as the training table.
The results table contains three columns:
record_id
-- a float column to identify the loanbad_loan_actual
-- a float column that is the bad_loan
value from
the test tablebad_loan_predicted
-- a float column that is the bad_loan
value from
the modelsThe first UDF executed, dt_train
, uses the training table to train
decision tree models, evaluate the models' accuracy against the
training table, and then store the models in the model storage table in
Kinetica.
The model storage table contains six columns:
model
-- a bytes column that stores the modelname
-- an unrestricted string column that is the name of the modelrank
-- an int column that represents the processing node container that
holds the processing nodes for the databasetom
-- an int column that represents the processing node that contains
the many shards of data inside the databasenum_input_data
-- an int column that is the number of records used as
input data for the modeldate_created
-- a datetime column that is the date and time the model
was createdThe second UDF executed, dt_test
, combines all the models
in the model storage table and evaluates the combined ensemble accuracy
and predictions against the testing table. The average
accuracy of the individual models and the ensemble are calculated
(and output to the system log and
/opt/gpudb/core/logs/gpudb-proc.log
on the head node). The predictions vs.
actual values against each loan are then stored in the results table.
The setup script, setup_db.py
, which creates all the necessary tables for
the UDFs, imports the test_environment.py
script to access its methods:
from util import test_environment as te
The script calls two methods from test_environment.py
:
te.prepare_loan_data()
te.create_ensemble_model_table()
The two methods in test_environment.py
require a connection to Kinetica.
This is done by instantiating an object of the GPUdb
class with a provided
connection URL (host and port):
HOST = 'http://localhost'
PORT = '9191'
USER = 'testuser'
PASSWORD = 'Testuser123!'
DB_HANDLE = gpudb.GPUdb(encoding='BINARY', host=HOST, port=PORT)
The prepare_loan_data()
method creates the types and tables for the
testing, training, and results tables, but the tables are removed first
if they already exist:
LOAN_TRAIN_DATA_TABLE_NAME = "example_loan_train_data"
LOAN_TEST_DATA_TABLE_NAME = "example_loan_test_data"
LOAN_INFERENCE_TABLE_NAME = "example_loan_inference_result"
LOAN_DATA_TYPE = """
{
"type": "record",
"name": "loan_type",
"fields": [
{"name":"loan_amnt","type":"float"},
{"name":"int_rate","type":"float"},
{"name":"emp_length","type":"float"},
{"name":"annual_inc","type":"float"},
{"name":"dti","type":"float"},
{"name":"delinq_2yrs","type":"float"},
{"name":"revol_util","type":"float"},
{"name":"total_acc","type":"float"},
{"name":"bad_loan","type":"float"},
{"name":"longest_credit_length","type":"float"},
{"name":"record_id","type":"float"}
]
}"""
LOAN_INFERENCE_TYPE = """
{
"type": "record",
"name": "loan_type",
"fields": [
{"name":"record_id","type":"float"},
{"name":"bad_loan_actual","type":"float"},
{"name":"bad_loan_predicted","type":"float"}
]
}"""
if DB_HANDLE.has_table(table_name=LOAN_TRAIN_DATA_TABLE_NAME)['table_exists']:
DB_HANDLE.clear_table(LOAN_TRAIN_DATA_TABLE_NAME)
if DB_HANDLE.has_table(table_name=LOAN_TEST_DATA_TABLE_NAME)['table_exists']:
DB_HANDLE.clear_table(LOAN_TEST_DATA_TABLE_NAME)
if DB_HANDLE.has_table(table_name=LOAN_INFERENCE_TABLE_NAME)['table_exists']:
DB_HANDLE.clear_table(LOAN_INFERENCE_TABLE_NAME)
response = DB_HANDLE.create_type(type_definition=LOAN_DATA_TYPE, label=LOAN_TRAIN_DATA_TABLE_NAME)
type_id = response['type_id']
response = DB_HANDLE.create_table(table_name=LOAN_TRAIN_DATA_TABLE_NAME, type_id=type_id)
print("Create loan train data table response status: {}".format(response['status_info']['status']))
response = DB_HANDLE.create_table(table_name=LOAN_TEST_DATA_TABLE_NAME, type_id=type_id)
print("Create loan test data table response status: {}".format(response['status_info']['status']))
response = DB_HANDLE.create_type(type_definition=LOAN_INFERENCE_TYPE, label=LOAN_INFERENCE_TABLE_NAME)
type_id = response['type_id']
response = DB_HANDLE.create_table(table_name=LOAN_INFERENCE_TABLE_NAME, type_id=type_id)
print("Create loan inference table response status: {}".format(response['status_info']['status']))
Then the method converts the loan_sample
data into a pandas data frame. Two
lists are initialized, which will be used to insert records into the training
table and the testing table. A simple counter is initialized as well:
pythonpath = os.environ['PYTHONPATH'].split(os.pathsep)[0]
records = pd.read_csv(pythonpath + '/util/loan_sample.csv.zip', sep=',', quotechar='"').values
print('Inserting loan data, this may take a few minutes.')
encoded_obj_list_train = []
encoded_obj_list_test = []
i = 0
For each record in the dataset, its ordered values are assigned to the columns in the training and testing tables. From the dataset, 10% of it is added to the testing table's list; the other 90% is added to the training table's list. Once the batch size of 1000 records has been reached, the records are inserted in the database. The lists are cleared and the process repeats:
for record in records:
i += 1
datum = collections.OrderedDict()
datum['loan_amnt'] = float(record[0])
datum['int_rate'] = float(record[1])
datum['emp_length'] = float(record[2])
datum['annual_inc'] = float(record[3])
datum['dti'] = float(record[4])
datum['delinq_2yrs'] = float(record[5])
datum['revol_util'] = float(record[6])
datum['total_acc'] = float(record[7])
datum['bad_loan'] = float(record[8])
datum['longest_credit_length'] = float(record[9])
datum['record_id'] = float(i)
if i % 10 == 0:
encoded_obj_list_test.append(DB_HANDLE.encode_datum(LOAN_DATA_TYPE, datum))
else:
encoded_obj_list_train.append(DB_HANDLE.encode_datum(LOAN_DATA_TYPE, datum))
if i % 1000 == 0:
response = DB_HANDLE.insert_records(table_name=LOAN_TRAIN_DATA_TABLE_NAME, data=encoded_obj_list_train,
list_encoding='binary', options={})
if response['status_info']['status'] == "ERROR":
print("Insert train response: {}".format(response))
response = DB_HANDLE.insert_records(table_name=LOAN_TEST_DATA_TABLE_NAME, data=encoded_obj_list_test,
list_encoding='binary', options={})
if response['status_info']['status'] == "ERROR":
print("Insert test response: {}".format(response))
encoded_obj_list_train = []
encoded_obj_list_test = []
If there's not enough records in the lists at the end of the loop to make a full batch size, the records are flushed to the database:
response = DB_HANDLE.insert_records(table_name=LOAN_TRAIN_DATA_TABLE_NAME, data=encoded_obj_list_train,
list_encoding='binary')
if response['status_info']['status'] == "ERROR":
print("Insert response: {}".format(response))
response = DB_HANDLE.insert_records(table_name=LOAN_TEST_DATA_TABLE_NAME, data=encoded_obj_list_train,
list_encoding='binary')
if response['status_info']['status'] == "ERROR":
print("Insert response: {}".format(response))
print('\nAll data inserted.')
The create_ensemble_model_table()
method creates the type and table for the
model storage table, but the table is removed first if it already exists:
ENSEMBLE_MODEL_TABLE_NAME = "ensemble_models"
ENSEMBLE_MODEL_TYPE = MODEL_TYPE = """
{
"type": "record",
"name": "ensemble_mode_type",
"fields": [
{"name":"model","type":"bytes"},
{"name":"name","type":"string"},
{"name":"rank","type":"int"},
{"name":"tom","type":"int"},
{"name":"num_input_data","type":"int"},
{"name":"date_time_created","type":"string"}
]
}"""
ENSEMBLE_MODEL_TYPE_PROPERTIES = {"date_time_created": ["datetime"]}
if DB_HANDLE.has_table(table_name=ENSEMBLE_MODEL_TABLE_NAME)['table_exists']:
DB_HANDLE.clear_table(ENSEMBLE_MODEL_TABLE_NAME)
response = DB_HANDLE.create_type(type_definition=ENSEMBLE_MODEL_TYPE, label=ENSEMBLE_MODEL_TABLE_NAME,
properties=ENSEMBLE_MODEL_TYPE_PROPERTIES)
response = DB_HANDLE.create_table(table_name=ENSEMBLE_MODEL_TABLE_NAME, type_id=response['type_id'])
print("Create model_table response status: {}".format(response['status_info']['status']))
First, several packages are imported to access the Kinetica Python UDF API,
decision tree modeling, object serialization, test_environment.py
methods,
and an accuracy score calculation:
from kinetica_proc import ProcData
from sklearn import tree
import pickle
import test_environment as te
from sklearn.metrics import accuracy_score
Next, the file gets a handle to the ProcData()
class:
proc_data = ProcData()
To output the number of values found on each processing node and processing node container, the rank and tom number in the request info map pointing to the current instance of the UDF are mapped and displayed:
rank_number = proc_data.request_info['rank_number']
tom_number = proc_data.request_info['tom_number']
print('\nUDF train r{}_t{}: instantiated.'.format(rank_number, tom_number))
The data from the input table (example_loan_train_data
)--excluding the
records with NaN or null values--is converted to a pandas dataframe. The
number of input records is derived from the dataframe. Let y
be the label
with the bad_loan
column values; let X
be the attributes with
rest of the column values:
training_data = proc_data.to_df().dropna() # only use non-NAN rows
num_input_data = training_data.shape[0]
X = training_data[['loan_amnt', 'int_rate', 'emp_length', 'annual_inc', 'dti', 'delinq_2yrs', 'revol_util', 'total_acc',
'longest_credit_length']]
y = training_data[['bad_loan']]
The decision tree is created:
print('UDF train r{}_t{}: learning model on {} data points.'.format(rank_number, tom_number, num_input_data))
tree_classifier = tree.DecisionTreeClassifier(class_weight=None, criterion='entropy', max_depth=7, max_features=None,
max_leaf_nodes=None, min_samples_leaf=5, min_samples_split=2,
min_weight_fraction_leaf=0.0, presort=False, random_state=100,
splitter='best')
As long as there are more than 10 values stored in the attributes group, the
model is created by training the tree algorithm on X
and y
:
if X.shape[0] > 10:
model = tree_classifier.fit(X, y)
The accuracy of the new algorithm is calculated by comparing y
(actual
bad_loan
values) to the model's predictions based on the values
in X
. The accuracy of the algorithm is then output:
acc = accuracy_score(y, model.predict(X))
print('UDF train r{}_t{}: training accuracy: {}'.format(rank_number, tom_number, acc))
The models are converted to byte arrays and stored as records in the model storage table:
print('UDF train r{}_t{}: storing model.'.format(rank_number, tom_number))
model_byte_array = pickle.dumps(model)
te.store_ensemble_model(model_byte_array, 'sklearn_dt', rank_number, tom_number, num_input_data, acc)
def store_ensemble_model(model_binary, name, rank, tom, num_input_data, train_acc):
encoded_object_list = []
datum = collections.OrderedDict()
datum['model'] = model_binary
datum['name'] = str(name)
datum['rank'] = int(rank)
datum['tom'] = int(tom)
datum['num_input_data'] = int(num_input_data)
datum['date_time_created'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
encoded_object_list.append(DB_HANDLE.encode_datum(ENSEMBLE_MODEL_TYPE, datum))
response = DB_HANDLE.insert_records(table_name=ENSEMBLE_MODEL_TABLE_NAME, data=encoded_object_list,
list_encoding='binary')
print(response)
Call complete()
to tell Kinetica the proc code is finished:
proc_data.complete()
To interact with Kinetica, an object of the GPUdb
class is instantiated
while providing the connection URL, including the host and port of the database
server. Ensure the host address and port are correct for your setup:
if __name__ == "__main__":
main(gpudb.GPUdb(encoding='BINARY', host='127.0.0.1', port='9191'))
To upload the dt_train.py
, kinetica_proc.py
, and test_environment.py
files to Kinetica, they will first need to be read in as bytes and added to a
file data map:
file_paths = ["dt_train.py", "../../kinetica_proc.py", "../../util/test_environment.py"]
files = {}
for script_path in file_paths:
script_name = os.path.basename(script_path)
with open(script_path, 'rb') as f:
files[script_name] = f.read()
After the files are placed in a data map, the distributed distributed_train
proc is created in Kinetica and the files are associated with it:
proc_name = 'distributed_train'
print("Registering proc...")
response = db_handle.create_proc(proc_name, 'distributed', files, 'python', [file_paths[0]], {})
print(response)
Note
The proc requires the proper command
and args
to be executed. In
this case, the assembled command line would be:
python dt_train.py
Finally, after the proc is created, it is executed. The training table created in the Database Setup section is passed in here:
print("Executing proc...")
response = db_handle.execute_proc(proc_name, {}, {}, [te.LOAN_TRAIN_DATA_TABLE_NAME], {}, [], {})
print(response)
First, several packages are imported to access the Kinetica Python UDF API,
object serialization, test_environment.py
methods, an accuracy score
calculation, and numpy
math methods:
from kinetica_proc import ProcData
import pickle
import test_environment as te
from sklearn.metrics import accuracy_score
import numpy as np
Next, the file gets a handle to the ProcData()
class:
proc_data = ProcData()
To output the number of values found on each processing node and processing node container, the rank and tom number in the request info map pointing to the current instance of the UDF are mapped and displayed:
rank_number = proc_data.request_info['rank_number']
tom_number = proc_data.request_info['tom_number']
print('\nUDF test r{}_t{}: instantiated.'.format(rank_number, tom_number))
The data from the input table (example_loan_test_data
)--excluding the
records with NaN or null values--is converted to a pandas dataframe. The
number of input records is derived from the dataframe. Let y_actual
be the label with the bad_loan
column values; let X
be the attributes
with the rest of the column values. Let record_ids
represent the loan IDs:
test_data = proc_data.to_df().dropna()
num_test_data = test_data.shape[0]
X = test_data[['loan_amnt', 'int_rate', 'emp_length', 'annual_inc', 'dti', 'delinq_2yrs', 'revol_util', 'total_acc',
'longest_credit_length']]
y_actual = test_data[['bad_loan']]
record_ids = test_data[['record_id']]
Get a handle to the output table from the proc
(example_loan_inference_result
). Its size is expanded to match the number of
records from the dataframe; this will allocated enough memory to copy all
input records to the output table. Handles are created for the columns that
will be written to: record_id
, bad_loan_actual
, and
bad_loan_predicted
:
out_table = proc_data.output_data[0]
out_table.size = num_test_data
record_id_column, y_actual_column, y_predicted_column = out_table[0], out_table[1], out_table[2]
If the number of input records is greater than 0, the models from the
model storage table are retrieved, an array is created to hold a number of
0.
values equal to the number of input records, and an empty list is
initialized:
if num_test_data > 0:
models_from_db = te.load_ensemble_models('sklearn_dt')
sum_predictions = np.full(num_test_data, 0.)
accuracies = list()
The load_ensemble_models
method from test_environment.py
gets the
models, decodes the records, and places them in a list:
load_ensemble_models(name, limit=100):
"""Load serialized ensemble model objects that share the same name from table."""
response = DB_HANDLE.get_records(table_name=ENSEMBLE_MODEL_TABLE_NAME, offset=0, limit=limit,
options={"expression": "(name='" + str(name) + "')"})
res_decoded = gpudb.GPUdbRecord.decode_binary_data(response["type_schema"], response["records_binary"])
result_dict_list = []
for current_result in res_decoded:
current_dict = dict()
current_dict['model'] = current_result['model']
current_dict['num_input_data'] = current_result['num_input_data']
current_dict['rank'] = current_result['rank']
current_dict['tom'] = current_result['tom']
current_dict['date_time_created'] = current_result['date_time_created']
result_dict_list.append(current_dict)
return result_dict_list
For each model, the object is deserialized and loaded into the UDF. The models'
predictions are stored and the accuracy
of the prediction against the testing table's bad_loan
values is calculated. The calculated accuracy is added to the accuracies
list, and the prediction value is added to the summed predictions array:
for current_model_dict in models_from_db:
model = pickle.loads(current_model_dict['model'])
y_test_predict = model.predict(X.values)
test_accuracy = accuracy_score(y_actual, y_test_predict)
accuracies.append(test_accuracy)
sum_predictions += y_test_predict
The prediction value from the combined list is calculated by dividing the sum
of each model's prediction by the number of models. The models' combined
accuracy is also calculated and displayed. The record IDs, actual bad_loan
value, and predicted bad_loan
value are added as values to the matching
output table columns.
combined_predictions = np.around(sum_predictions / len(models_from_db))
combined_accuracy = accuracy_score(y_actual, combined_predictions)
print('Average accuracies of {} models: {}'.format(len(accuracies), (sum(accuracies) / float(len(accuracies)))))
print('Ensemble accuracy: {}'.format(combined_accuracy))
record_id_column[:] = record_ids.values
y_actual_column[:] = y_actual.values
y_predicted_column[:] = combined_predictions
If no data was found on the tom, finish processing:
else:
print('UDF test r{}_t{}: no test data on tom.\n'.format(rank_number, tom_number))
Call complete()
to tell Kinetica the proc code is finished:
proc_data.complete()
To interact with Kinetica, an object of the GPUdb
class is instantiated
while providing the connection URL, including the host and port of the database
server. Ensure the host address and port are correct for your setup:
if __name__ == "__main__":
main(gpudb.GPUdb(encoding='BINARY', host='127.0.0.1', port='9191'))
To upload the dt_test.py
, kinetica_proc.py
, and test_environment.py
files to Kinetica, they will first need to be read in as bytes and added to a
file data map:
file_paths = ["dt_test.py", "../../kinetica_proc.py", "../../util/test_environment.py"]
files = {}
for script_path in file_paths:
script_name = os.path.basename(script_path)
with open(script_path, 'rb') as f:
files[script_name] = f.read()
After the files are placed in a data map, the distributed distributed_test
proc is created in Kinetica and the files are associated with it:
proc_name = 'distributed_test'
print("Registering proc...")
response = db_handle.create_proc(proc_name, 'distributed', files, 'python', [file_paths[0]], {})
print(response)
Note
The proc requires the proper command
and args
to be executed. In
this case, the assembled command line would be:
python dt_test.py
Finally, after the proc is created, it is executed. The testing and results table created in the Database Setup section are passed in here:
TEST_INPUT_TABLE = te.LOAN_TEST_DATA_TABLE_NAME
TEST_OUTPUT_TABLE = te.LOAN_INFERENCE_TABLE_NAME
print("Executing proc...")
response = db_handle.execute_proc(proc_name, {}, {}, [TEST_INPUT_TABLE], {}, [TEST_OUTPUT_TABLE], {})
print(response)