Version:

Example UDF (Non-CUDA) - Distributed Model

The following is a complete example, using the Python UDF API, of a non-CUDA UDF that demonstrates how to build multiple models based on distributed data and combine them into an ensemble. This example (and others) can be found in the Python UDF API repository; this repository comes with Kinetica by default (located in /opt/gpudb/udf/api/python/) or can be downloaded/cloned from GitHub.

References

Prerequisites

The general prerequisites for using UDFs in Kinetica can be found on the User-Defined Function Implementation page.

Important

This example cannot run on Mac OSX

The following items are also necessary:

  • Python 3
  • Miniconda

Note

Visit the Conda website to download the Miniconda installer for Python 3.

UDF API Download

This example requires local access to the Python UDF API repository. In the desired directory, run the following but be sure to replace <kinetica-version> with the name of the installed Kinetica version, e.g, v6.2.0:

git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-udf-api-python.git

Relevant Scripts

There are six files associated with the distributed model UDF example, all of which can be found in the Python UDF API repo.

  • A database setup script (test_environment.py) that is called from the initialization script
  • An initialization script (setup_db.py) that creates input tables and inserts data into them and creates an output ensemble table
  • Two scripts to register and execute the UDFs:
    • a register_execute_train.py script to register and execute the model training UDF
    • a register_execute_test.py script to register and execute the prediction model combination UDF
  • Two UDFs to train and test the model:
    • a dt_train UDF that trains and stores a decision tree model
    • a dt_test UDF that combines model predictions into an ensemble

Prepare Environment

This example runs inside a Conda environment. The environment can be automatically configured using the conda_env_py3.yml file found in the Python UDF API repository.

  1. In the same directory you cloned the API, change directory into the root folder of the Python UDF API repository:

    cd kinetica-udf-api-python/
    
  2. Create the Conda environment, replacing <environment name> with the desired name:

    conda env create --name <environment name> --file conda_env_py3.yml
    

    Note

    It may take a few minutes to create the environment.

  3. Verify the environment was created properly:

    conda info --envs
    
  4. Activate the new environment:

    source activate <environment name>
    
  5. Install PyGDF:

    conda install -c numba -c conda-forge -c gpuopenanalytics/label/dev -c defaults pygdf=0.1.0a2
    
  6. Install the Kinetica Python API where n is the desired version of Kinetica from PyPI:

    pip install gpudb==6.2.0.n
    
  7. Add the Python UDF API repo's root directory to the PYTHONPATH:

    export PYTHONPATH=$(pwd):$PYTHONPATH
    
  8. Edit the util/test_environment.py script for the correct host, port, user, and password for your Kinetica instance:

    HOST = 'http://localhost'
    PORT = '9191'
    USER = 'testuser'
    PASSWORD = 'Testuser123!'
    

UDF Deployment

  1. Change directory into the UDF distributed model example directory:

    cd examples/UDF_distributed_model
    
  2. Run the UDF initialization script:

    python setup_db.py
    
  3. Update the host and port in register_execute_train.py and register_execute_test.py as necessary:

    main(gpudb.GPUdb(encoding='BINARY', host='127.0.0.1', port='9191'))
    
  4. Run the execute script for the training UDF:

    python register_execute_train.py
    
  5. Run the execute script for the model combination UDF:

    python register_execute_test.py
    
  6. Verify the results in the /opt/gpudb/core/logs/gpudb-proc.log file on the head node and/or the example_loan_inference_result table via GAdmin.

Execution Detail

This example details two distributed UDFs to train and test model prediction accuracy. The training and testing occurs using the following tables:

  • example_loan_train_data -- the training table and input table for the dt_train proc. The algorithm to predict if a loan is bad or not is trained against this table.
  • ensemble_models -- the intermediary model storage table that stores all the models after they've been trained against the training table
  • example_loan_test_data -- the testing table and input table for the dt_test proc. The trained algorithm is tested against this table and the accuracy of the algorithm's predictions are calculated.
  • example_loan_inference_result -- the results table and output table for the dt_test proc. The actual bad loan boolean and predicted bad loan boolean are compared per loan in this table.

The training table contains 11 columns, each containing values from the loan_sample dataset (located in kinetica-udf-api-python/util/):

  • loan_amnt -- a float column representing the loan amount
  • int_rate -- a float column representing the loan's interest rate
  • emp_length -- a float column representing the borrower's employment length
  • annual_inc -- a float column representing the borrower's annual income
  • dti -- a float column representing the borrower's debt-to-income ratio
  • delinq_2yrs -- a float column representing delinquent payments against the loan for the last two years
  • revol_util -- a float column representing the borrower's revolving line utilization rate
  • total_acc -- a float column representing the borrower's total amount of accounts
  • bad_loan -- a float column representing if the loan was bad (1) or not (0)
  • longest_credit_length -- a float column representing the borrower's longest credit line length
  • record_id -- a float column to identify the loan

The testing table has the same columns as the training table.

The results table contains three columns:

  • record_id -- a float column to identify the loan
  • bad_loan_actual -- a float column that is the bad_loan value from the test table
  • bad_loan_predicted -- a float column that is the bad_loan value from the models

The first UDF executed, dt_train, uses the training table to train decision tree models, evaluate the models' accuracy against the training table, and then store the models in the model storage table in Kinetica.

The model storage table contains six columns:

  • model -- a bytes column that stores the model
  • name -- an unrestricted string column that is the name of the model
  • rank -- an int column that represents the processing node container that holds the processing nodes for the database
  • tom -- an int column that represents the processing node that contains the many shards of data inside the database
  • num_input_data -- an int column that is the number of records used as input data for the model
  • date_created -- a datetime column that is the date and time the model was created

The second UDF executed, dt_test, combines all the models in the model storage table and evaluates the combined ensemble accuracy and predictions against the testing table. The average accuracy of the individual models and the ensemble are calculated (and output to /opt/gpudb/core/logs/gpudb.log and /opt/gpudb/core/logs/gpudb-proc.log on the head node). The predictions vs. actual values against each loan are then stored in the results table.

Database Setup

The setup script, setup_db.py, which creates all the necessary tables for the UDFs, imports the test_environment.py script to access its methods:

from util import test_environment as te

The script calls two methods from test_environment.py:

te.prepare_loan_data()
te.create_ensemble_model_table()

The two methods in test_environment.py require a connection to Kinetica. This is done by instantiating an object of the GPUdb class with a provided connection URL (host and port):

HOST = 'http://localhost'
PORT = '9191'
USER = 'testuser'
PASSWORD = 'Testuser123!'
DB_HANDLE = gpudb.GPUdb(encoding='BINARY', host=HOST, port=PORT)

The prepare_loan_data() method creates the types and tables for the testing, training, and results tables, but the tables are removed first if they already exist:

LOAN_TRAIN_DATA_TABLE_NAME = "example_loan_train_data"
LOAN_TEST_DATA_TABLE_NAME = "example_loan_test_data"
LOAN_INFERENCE_TABLE_NAME = "example_loan_inference_result"
LOAN_DATA_TYPE = """
    {
        "type": "record",
        "name": "loan_type",
        "fields": [
            {"name":"loan_amnt","type":"float"},
            {"name":"int_rate","type":"float"},
            {"name":"emp_length","type":"float"},
            {"name":"annual_inc","type":"float"},
            {"name":"dti","type":"float"},
            {"name":"delinq_2yrs","type":"float"},
            {"name":"revol_util","type":"float"},
            {"name":"total_acc","type":"float"},
            {"name":"bad_loan","type":"float"},
            {"name":"longest_credit_length","type":"float"},
            {"name":"record_id","type":"float"}
        ]
    }"""
LOAN_INFERENCE_TYPE = """
{
        "type": "record",
        "name": "loan_type",
        "fields": [
            {"name":"record_id","type":"float"},
            {"name":"bad_loan_actual","type":"float"},
            {"name":"bad_loan_predicted","type":"float"}
        ]
    }"""
if DB_HANDLE.has_table(table_name=LOAN_TRAIN_DATA_TABLE_NAME)['table_exists']:
    DB_HANDLE.clear_table(LOAN_TRAIN_DATA_TABLE_NAME)
if DB_HANDLE.has_table(table_name=LOAN_TEST_DATA_TABLE_NAME)['table_exists']:
    DB_HANDLE.clear_table(LOAN_TEST_DATA_TABLE_NAME)
if DB_HANDLE.has_table(table_name=LOAN_INFERENCE_TABLE_NAME)['table_exists']:
    DB_HANDLE.clear_table(LOAN_INFERENCE_TABLE_NAME)
response = DB_HANDLE.create_type(type_definition=LOAN_DATA_TYPE, label=LOAN_TRAIN_DATA_TABLE_NAME)
type_id = response['type_id']
response = DB_HANDLE.create_table(table_name=LOAN_TRAIN_DATA_TABLE_NAME, type_id=type_id)
print("Create loan train data table response status: {}".format(response['status_info']['status']))
response = DB_HANDLE.create_table(table_name=LOAN_TEST_DATA_TABLE_NAME, type_id=type_id)
print("Create loan test data table response status: {}".format(response['status_info']['status']))
response = DB_HANDLE.create_type(type_definition=LOAN_INFERENCE_TYPE, label=LOAN_INFERENCE_TABLE_NAME)
type_id = response['type_id']
response = DB_HANDLE.create_table(table_name=LOAN_INFERENCE_TABLE_NAME, type_id=type_id)
print("Create loan inference table response status: {}".format(response['status_info']['status']))

Then the method converts the loan_sample data into a pandas data frame. Two lists are initialized, which will be used to insert records into the training table and the testing table. A simple counter is initialized as well:

pythonpath = os.environ['PYTHONPATH'].split(os.pathsep)[0]
records = pd.read_csv(pythonpath + '/util/loan_sample.csv.zip', sep=',', quotechar='"').values
print('Inserting loan data, this may take a few minutes.')
encoded_obj_list_train = []
encoded_obj_list_test = []
i = 0

For each record in the dataset, its ordered values are assigned to the columns in the training and testing tables. From the dataset, 10% of it is added to the testing table's list; the other 90% is added to the training table's list. Once the batch size of 1000 records has been reached, the records are inserted in the database. The lists are cleared and the process repeats:

for record in records:
    i += 1
    datum = collections.OrderedDict()
    datum['loan_amnt'] = float(record[0])
    datum['int_rate'] = float(record[1])
    datum['emp_length'] = float(record[2])
    datum['annual_inc'] = float(record[3])
    datum['dti'] = float(record[4])
    datum['delinq_2yrs'] = float(record[5])
    datum['revol_util'] = float(record[6])
    datum['total_acc'] = float(record[7])
    datum['bad_loan'] = float(record[8])
    datum['longest_credit_length'] = float(record[9])
    datum['record_id'] = float(i)
    if i % 10 == 0:
        encoded_obj_list_test.append(DB_HANDLE.encode_datum(LOAN_DATA_TYPE, datum))
    else:
        encoded_obj_list_train.append(DB_HANDLE.encode_datum(LOAN_DATA_TYPE, datum))
    if i % 1000 == 0:
        response = DB_HANDLE.insert_records(table_name=LOAN_TRAIN_DATA_TABLE_NAME, data=encoded_obj_list_train,
                                            list_encoding='binary', options={})
        if response['status_info']['status'] == "ERROR":
            print("Insert train response: {}".format(response))
        response = DB_HANDLE.insert_records(table_name=LOAN_TEST_DATA_TABLE_NAME, data=encoded_obj_list_test,
                                            list_encoding='binary', options={})
        if response['status_info']['status'] == "ERROR":
            print("Insert test response: {}".format(response))
        encoded_obj_list_train = []
        encoded_obj_list_test = []

If there's not enough records in the lists at the end of the loop to make a full batch size, the records are flushed to the database:

response = DB_HANDLE.insert_records(table_name=LOAN_TRAIN_DATA_TABLE_NAME, data=encoded_obj_list_train,
                                    list_encoding='binary')
if response['status_info']['status'] == "ERROR":
    print("Insert response: {}".format(response))
response = DB_HANDLE.insert_records(table_name=LOAN_TEST_DATA_TABLE_NAME, data=encoded_obj_list_train,
                                    list_encoding='binary')
if response['status_info']['status'] == "ERROR":
    print("Insert response: {}".format(response))
print('\nAll data inserted.')

The create_ensemble_model_table() method creates the type and table for the model storage table, but the table is removed first if it already exists:

ENSEMBLE_MODEL_TABLE_NAME = "ensemble_models"
ENSEMBLE_MODEL_TYPE = MODEL_TYPE = """
    {
        "type": "record",
        "name": "ensemble_mode_type",
        "fields": [
            {"name":"model","type":"bytes"},
            {"name":"name","type":"string"},
            {"name":"rank","type":"int"},
            {"name":"tom","type":"int"},
            {"name":"num_input_data","type":"int"},
            {"name":"date_time_created","type":"string"}
        ]
    }"""
ENSEMBLE_MODEL_TYPE_PROPERTIES = {"date_time_created": ["datetime"]}
if DB_HANDLE.has_table(table_name=ENSEMBLE_MODEL_TABLE_NAME)['table_exists']:
    DB_HANDLE.clear_table(ENSEMBLE_MODEL_TABLE_NAME)
response = DB_HANDLE.create_type(type_definition=ENSEMBLE_MODEL_TYPE, label=ENSEMBLE_MODEL_TABLE_NAME,
                                 properties=ENSEMBLE_MODEL_TYPE_PROPERTIES)
response = DB_HANDLE.create_table(table_name=ENSEMBLE_MODEL_TABLE_NAME, type_id=response['type_id'])
print("Create model_table response status: {}".format(response['status_info']['status']))

Model Training UDF (dt_train.py)

First, several packages are imported to access the Kinetica Python UDF API, decision tree modeling, object serialization, test_environment.py methods, and an accuracy score calculation:

from kinetica_proc import ProcData
from sklearn import tree
import pickle
import test_environment as te
from sklearn.metrics import accuracy_score

Next, the file gets a handle to the ProcData() class:

proc_data = ProcData()

To output the number of values found on each processing node and processing node container, the rank and tom number in the request info map pointing to the current instance of the UDF are mapped and displayed:

rank_number = proc_data.request_info['rank_number']
tom_number = proc_data.request_info['tom_number']
print('\nUDF train r{}_t{}: instantiated.'.format(rank_number, tom_number))

The data from the input table (example_loan_train_data)--excluding the records with NaN or null values--is converted to a pandas dataframe. The number of input records is derived from the dataframe. Let y be the label with the bad_loan column values; let X be the attributes with rest of the column values:

training_data = proc_data.to_df().dropna()  # only use non-NAN rows
num_input_data = training_data.shape[0]
X = training_data[['loan_amnt', 'int_rate', 'emp_length', 'annual_inc', 'dti', 'delinq_2yrs', 'revol_util', 'total_acc',
                   'longest_credit_length']]
y = training_data[['bad_loan']]

The decision tree is created:

print('UDF train r{}_t{}: learning model on {} data points.'.format(rank_number, tom_number, num_input_data))
tree_classifier = tree.DecisionTreeClassifier(class_weight=None, criterion='entropy', max_depth=7, max_features=None,
                                              max_leaf_nodes=None, min_samples_leaf=5, min_samples_split=2,
                                              min_weight_fraction_leaf=0.0, presort=False, random_state=100,
                                              splitter='best')

As long as there are more than 10 values stored in the attributes group, the model is created by training the tree algorithm on X and y:

if X.shape[0] > 10:
    model = tree_classifier.fit(X, y)

The accuracy of the new algorithm is calculated by comparing y (actual bad_loan values) to the model's predictions based on the values in X. The accuracy of the algorithm is then output:

acc = accuracy_score(y, model.predict(X))
print('UDF train r{}_t{}: training accuracy: {}'.format(rank_number, tom_number, acc))

The models are converted to byte arrays and stored as records in the model storage table:

print('UDF train r{}_t{}: storing model.'.format(rank_number, tom_number))
model_byte_array = pickle.dumps(model)
te.store_ensemble_model(model_byte_array, 'sklearn_dt', rank_number, tom_number, num_input_data, acc)
def store_ensemble_model(model_binary, name, rank, tom, num_input_data, train_acc):
    encoded_object_list = []
    datum = collections.OrderedDict()
    datum['model'] = model_binary
    datum['name'] = str(name)
    datum['rank'] = int(rank)
    datum['tom'] = int(tom)
    datum['num_input_data'] = int(num_input_data)

    datum['date_time_created'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
    encoded_object_list.append(DB_HANDLE.encode_datum(ENSEMBLE_MODEL_TYPE, datum))
    response = DB_HANDLE.insert_records(table_name=ENSEMBLE_MODEL_TABLE_NAME, data=encoded_object_list,
                                        list_encoding='binary')
    print(response)

Call complete() to tell Kinetica the proc code is finished:

proc_data.complete()

Training UDF Registration (register_execute_train.py)

To interact with Kinetica, an object of the GPUdb class is instantiated while providing the connection URL, including the host and port of the database server. Ensure the host address and port are correct for your setup:

if __name__ == "__main__":
    main(gpudb.GPUdb(encoding='BINARY', host='127.0.0.1', port='9191'))

To upload the dt_train.py, kinetica_proc.py, and test_environment.py files to Kinetica, they will first need to be read in as bytes and added to a file data map:

file_paths = ["dt_train.py", "../../kinetica_proc.py", "../../util/test_environment.py"]
files = {}
for script_path in file_paths:
    script_name = os.path.basename(script_path)
    with open(script_path, 'rb') as f:
        files[script_name] = f.read()

After the files are placed in a data map, the distributed distributed_train proc is created in Kinetica and the files are associated with it:

proc_name = 'distributed_train'
print("Registering proc...")
response = db_handle.create_proc(proc_name, 'distributed', files, 'python', [file_paths[0]], {})
print(response)

Note

The proc requires the proper command and args to be executed. In this case, the assembled command line would be:

python dt_train.py

Finally, after the proc is created, it is executed. The training table created in the Database Setup section is passed in here:

print("Executing proc...")
response = db_handle.execute_proc(proc_name, {}, {}, [te.LOAN_TRAIN_DATA_TABLE_NAME], {}, [], {})
print(response)

Model Testing UDF (dt_test.py)

First, several packages are imported to access the Kinetica Python UDF API, object serialization, test_environment.py methods, an accuracy score calculation, and numpy math methods:

from kinetica_proc import ProcData
import pickle
import test_environment as te
from sklearn.metrics import accuracy_score
import numpy as np

Next, the file gets a handle to the ProcData() class:

proc_data = ProcData()

To output the number of values found on each processing node and processing node container, the rank and tom number in the request info map pointing to the current instance of the UDF are mapped and displayed:

rank_number = proc_data.request_info['rank_number']
tom_number = proc_data.request_info['tom_number']
print('\nUDF test r{}_t{}: instantiated.'.format(rank_number, tom_number))

The data from the input table (example_loan_test_data)--excluding the records with NaN or null values--is converted to a pandas dataframe. The number of input records is derived from the dataframe. Let y_actual be the label with the bad_loan column values; let X be the attributes with the rest of the column values. Let record_ids represent the loan IDs:

test_data = proc_data.to_df().dropna()
num_test_data = test_data.shape[0]
X = test_data[['loan_amnt', 'int_rate', 'emp_length', 'annual_inc', 'dti', 'delinq_2yrs', 'revol_util', 'total_acc',
               'longest_credit_length']]
y_actual = test_data[['bad_loan']]
record_ids = test_data[['record_id']]

Get a handle to the output table from the proc (example_loan_inference_result). Its size is expanded to match the number of records from the dataframe; this will allocated enough memory to copy all input records to the output table. Handles are created for the columns that will be written to: record_id, bad_loan_actual, and bad_loan_predicted:

out_table = proc_data.output_data[0]
out_table.size = num_test_data
record_id_column, y_actual_column, y_predicted_column = out_table[0], out_table[1], out_table[2]

If the number of input records is greater than 0, the models from the model storage table are retrieved, an array is created to hold a number of 0. values equal to the number of input records, and an empty list is initialized:

if num_test_data > 0:
    models_from_db = te.load_ensemble_models('sklearn_dt')
    sum_predictions = np.full(num_test_data, 0.)
    accuracies = list()

The load_ensemble_models method from test_environment.py gets the models, decodes the records, and places them in a list:

load_ensemble_models(name, limit=100):
"""Load serialized ensemble model objects that share the same name from table."""
response = DB_HANDLE.get_records(table_name=ENSEMBLE_MODEL_TABLE_NAME, offset=0, limit=limit,
                                 options={"expression": "(name='" + str(name) + "')"})
res_decoded = gpudb.GPUdbRecord.decode_binary_data(response["type_schema"], response["records_binary"])
result_dict_list = []
for current_result in res_decoded:
    current_dict = dict()
    current_dict['model'] = current_result['model']
    current_dict['num_input_data'] = current_result['num_input_data']
    current_dict['rank'] = current_result['rank']
    current_dict['tom'] = current_result['tom']
    current_dict['date_time_created'] = current_result['date_time_created']
    result_dict_list.append(current_dict)
return result_dict_list

For each model, the object is deserialized and loaded into the UDF. The models' predictions are stored and the accuracy of the prediction against the testing table's bad_loan values is calculated. The calculated accuracy is added to the accuracies list, and the prediction value is added to the summed predictions array:

for current_model_dict in models_from_db:
    model = pickle.loads(current_model_dict['model'])
    y_test_predict = model.predict(X.values)
    test_accuracy = accuracy_score(y_actual, y_test_predict)
    accuracies.append(test_accuracy)
    sum_predictions += y_test_predict

The prediction value from the combined list is calculated by dividing the sum of each model's prediction by the number of models. The models' combined accuracy is also calculated and displayed. The record IDs, actual bad_loan value, and predicted bad_loan value are added as values to the matching output table columns.

combined_predictions = np.around(sum_predictions / len(models_from_db))
combined_accuracy = accuracy_score(y_actual, combined_predictions)
print('Average accuracies of {} models: {}'.format(len(accuracies), (sum(accuracies) / float(len(accuracies)))))
print('Ensemble accuracy: {}'.format(combined_accuracy))
record_id_column[:] = record_ids.values
y_actual_column[:] = y_actual.values
y_predicted_column[:] = combined_predictions

If no data was found on the tom, finish processing:

else:
    print('UDF test r{}_t{}: no test data on tom.\n'.format(rank_number, tom_number))

Call complete() to tell Kinetica the proc code is finished:

proc_data.complete()

Testing UDF Registration (register_execute_test.py)

To interact with Kinetica, an object of the GPUdb class is instantiated while providing the connection URL, including the host and port of the database server. Ensure the host address and port are correct for your setup:

if __name__ == "__main__":
    main(gpudb.GPUdb(encoding='BINARY', host='127.0.0.1', port='9191'))

To upload the dt_test.py, kinetica_proc.py, and test_environment.py files to Kinetica, they will first need to be read in as bytes and added to a file data map:

file_paths = ["dt_test.py", "../../kinetica_proc.py", "../../util/test_environment.py"]
files = {}
for script_path in file_paths:
    script_name = os.path.basename(script_path)
    with open(script_path, 'rb') as f:
        files[script_name] = f.read()

After the files are placed in a data map, the distributed distributed_test proc is created in Kinetica and the files are associated with it:

proc_name = 'distributed_test'
print("Registering proc...")
response = db_handle.create_proc(proc_name, 'distributed', files, 'python', [file_paths[0]], {})
print(response)

Note

The proc requires the proper command and args to be executed. In this case, the assembled command line would be:

python dt_test.py

Finally, after the proc is created, it is executed. The testing and results table created in the Database Setup section are passed in here:

TEST_INPUT_TABLE = te.LOAN_TEST_DATA_TABLE_NAME
TEST_OUTPUT_TABLE = te.LOAN_INFERENCE_TABLE_NAME
print("Executing proc...")
response = db_handle.execute_proc(proc_name, {}, {}, [TEST_INPUT_TABLE], {}, [TEST_OUTPUT_TABLE], {})
print(response)