The following is a complete example, using the Python UDF API, of a non-CUDA UDF that demonstrates how to build multiple models based on distributed data and combine them into an ensemble. This example (and others) can be found in the Python UDF API repository; this repository can be downloaded/cloned from GitHub.
References
- Python UDF Reference -- detailed description of the entire UDF API
- Running UDFs -- detailed description on running Python UDFs
- Example UDFs -- example UDFs written in Python
Prerequisites
The general prerequisites for using UDFs in Kinetica can be found under UDF Prerequisites.
Important
This example cannot run on Mac OSX
The following items are also necessary:
- Python 3
- Miniconda
Note
Visit the Conda website to download the Miniconda installer for Python 3.
UDF API Download
This example requires local access to the Python UDF API repository. In the desired directory, run the following but be sure to replace <kinetica-version> with the name of the installed Kinetica version, e.g, v7.1:
git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-udf-api-python.git
Relevant Scripts
There are six files associated with the distributed model UDF example, all of which can be found in the Python UDF API repo.
- A database setup script (test_environment.py) that is called from the initialization script
- An initialization script (setup_db.py) that creates input tables and inserts data into them and creates an output ensemble table
- Two scripts to register and execute the UDFs:
- a register_execute_train.py script to register and execute the model training UDF
- a register_execute_test.py script to register and execute the prediction model combination UDF
- Two UDFs to train and test the model:
- a dt_train UDF that trains and stores a decision tree model
- a dt_test UDF that combines model predictions into an ensemble
This example runs inside a Conda environment. The environment can be automatically configured using the conda_env_py3.yml file found in the Python UDF API repository.
In the same directory you cloned the API, change directory into the root folder of the Python UDF API repository:
cd kinetica-udf-api-python/
Create the Conda environment, replacing <environment name> with the desired name:
conda env create --name <environment name> --file conda_env_py3.yml
Note
It may take a few minutes to create the environment.
Verify the environment was created properly:
conda info --envs
Activate the new environment:
conda activate <environment name>
Install PyGDF:
conda install -c numba -c conda-forge -c gpuopenanalytics/label/dev -c defaults pygdf=0.1.0a2
Install the Kinetica Python API:
pip install gpudb~=7.1.0
Add the Python UDF API repo's root directory to the PYTHONPATH:
export PYTHONPATH=$(pwd):$PYTHONPATH
Edit the util/test_environment.py script for the correct database url, user, and password for your Kinetica instance:
URL = 'https://abcdefg.cloud.kinetica.com/hijklmn/gpudb-0;CombinePrepareAndExecute=1;RowsPerFetch=20000' USER = 'kadmin' PASSWORD = 'kadmin123'
UDF Deployment
Change directory into the UDF distributed model example directory:
cd examples/UDF_distributed_model
Run the UDF initialization script:
python setup_db.py
Run the execute script for the training UDF:
python register_execute_train.py --url <kinetica-url> --username <kinetica-user> --password <kinetica-pass>
Run the execute script for the model combination UDF:
python register_execute_test.py --url <kinetica-url> --username <kinetica-user> --password <kinetica-pass>
Verify the results in the example_loan_inference_result output table in Workbench.
Execution Detail
This example details two distributed UDFs to train and test model prediction accuracy. The training and testing occurs using the following schema and tables:
- example_udf_python -- the schema containing all of the below tables
- example_loan_train_data -- the training table and input table for the dt_train proc. The algorithm to predict if a loan is bad or not is trained against this table.
- ensemble_models -- the intermediary model storage table that stores all the models after they've been trained against the training table
- example_loan_test_data -- the testing table and input table for the dt_test proc. The trained algorithm is tested against this table and the accuracy of the algorithm's predictions are calculated.
- example_loan_inference_result -- the results table and output table for the dt_test proc. The actual bad loan boolean and predicted bad loan boolean are compared per loan in this table.
The training table contains 11 columns, each containing values from the loan_sample dataset (located in kinetica-udf-api-python/util/):
- loan_amnt -- a float column representing the loan amount
- int_rate -- a float column representing the loan's interest rate
- emp_length -- a float column representing the borrower's employment length
- annual_inc -- a float column representing the borrower's annual income
- dti -- a float column representing the borrower's debt-to-income ratio
- delinq_2yrs -- a float column representing delinquent payments against the loan for the last two years
- revol_util -- a float column representing the borrower's revolving line utilization rate
- total_acc -- a float column representing the borrower's total amount of accounts
- bad_loan -- a float column representing if the loan was bad (1) or not (0)
- longest_credit_length -- a float column representing the borrower's longest credit line length
- record_id -- a float column to identify the loan
The testing table has the same columns as the training table.
The results table contains three columns:
- record_id -- a float column to identify the loan
- bad_loan_actual -- a float column that is the bad_loan value from the test table
- bad_loan_predicted -- a float column that is the bad_loan value from the models
The first UDF executed, dt_train, uses the training table to train decision tree models, evaluate the models' accuracy against the training table, and then store the models in the model storage table in Kinetica.
The model storage table contains six columns:
- model -- a bytes column that stores the model
- name -- an unrestricted string column that is the name of the model
- rank -- an int column that represents the processing node container that holds the processing nodes for the database
- tom -- an int column that represents the processing node that contains the many shards of data inside the database
- num_input_data -- an int column that is the number of records used as input data for the model
- date_created -- a datetime column that is the date and time the model was created
The second UDF executed, dt_test, combines all the models in the model storage table and evaluates the combined ensemble accuracy and predictions against the testing table. The average accuracy of the individual models and the ensemble are calculated. The predictions vs. actual values against each loan are then stored in the results table.
Database Setup
The setup script, setup_db.py, which creates all the necessary tables for the UDFs, imports the test_environment.py script to access its methods:
|
|
The script calls three methods from test_environment.py:
|
|
The three methods in test_environment.py require a connection to Kinetica. This is done by instantiating an object of the GPUdb class with a provided connection URL. See Connecting via API for details on the URL format and how to look it up.
|
|
The create_schema() method creates the schema that will contain all tables used in the example:
|
|
|
|
The prepare_loan_data() method creates the types and tables for the testing, training, and results tables, but the tables are removed first if they already exist:
|
|
|
|
Then the method converts the loan_sample data into a pandas data frame. Two lists are initialized, which will be used to insert records into the training table and the testing table. A simple counter is initialized as well:
|
|
For each record in the dataset, its ordered values are assigned to the columns in the training and testing tables. From the dataset, 10% of it is added to the testing table's list; the other 90% is added to the training table's list. Once the batch size of 1000 records has been reached, the records are inserted in the database. The lists are cleared and the process repeats:
|
|
If there's not enough records in the lists at the end of the loop to make a full batch size, the records are flushed to the database:
|
|
The create_ensemble_model_table() method creates the type and table for the model storage table, but the table is removed first if it already exists:
|
|
|
|
Model Training UDF (dt_train.py)
First, several packages are imported to access the Kinetica Python UDF API, decision tree modeling, object serialization, test_environment.py methods, and an accuracy score calculation:
|
|
Next, the file gets a handle to the ProcData() class:
|
|
To output the number of values found on each processing node and processing node container, the rank and tom number in the request info map pointing to the current instance of the UDF are mapped and displayed:
|
|
The data from the input table (example_loan_train_data)--excluding the records with NaN or null values--is converted to a pandas dataframe. The number of input records is derived from the dataframe. Let y be the label with the bad_loan column values; let X be the attributes with rest of the column values:
|
|
The decision tree is created:
|
|
As long as there are more than 10 values stored in the attributes group, the model is created by training the tree algorithm on X and y:
|
|
The accuracy of the new algorithm is calculated by comparing y (actual bad_loan values) to the model's predictions based on the values in X. The accuracy of the algorithm is then output:
|
|
The models are converted to byte arrays and stored as records in the model storage table:
|
|
|
|
Call complete() to tell Kinetica the proc code is finished:
|
|
Training UDF Registration (register_execute_train.py)
To interact with Kinetica, an object of the GPUdb class is instantiated while providing the connection URL of the database server.
|
|
To upload the dt_train.py, kinetica_proc.py, and test_environment.py files to Kinetica, they will first need to be read in as bytes and added to a file data map:
|
|
After the files are placed in a data map, the distributed distributed_train proc is created in Kinetica and the files are associated with it:
|
|
Note
The proc requires the proper command and args to be executed. In this case, the assembled command line would be:
python dt_train.py
Finally, after the proc is created, it is executed. The training table created in the Database Setup section is passed in here:
|
|
Model Testing UDF (dt_test.py)
First, several packages are imported to access the Kinetica Python UDF API, object serialization, test_environment.py methods, an accuracy score calculation, and numpy math methods:
|
|
Next, the file gets a handle to the ProcData() class:
|
|
To output the number of values found on each processing node and processing node container, the rank and tom number in the request info map pointing to the current instance of the UDF are mapped and displayed:
|
|
The data from the input table (example_loan_test_data)--excluding the records with NaN or null values--is converted to a pandas dataframe. The number of input records is derived from the dataframe. Let y_actual be the label with the bad_loan column values; let X be the attributes with the rest of the column values. Let record_ids represent the loan IDs:
|
|
Get a handle to the output table from the proc (example_loan_inference_result). Its size is expanded to match the number of records from the dataframe; this will allocated enough memory to copy all input records to the output table. Handles are created for the columns that will be written to: record_id, bad_loan_actual, and bad_loan_predicted:
|
|
If the number of input records is greater than 0, the models from the model storage table are retrieved, an array is created to hold a number of 0. values equal to the number of input records, and an empty list is initialized:
|
|
The load_ensemble_models method from test_environment.py gets the models, decodes the records, and places them in a list:
|
|
For each model, the object is deserialized and loaded into the UDF. The models' predictions are stored and the accuracy of the prediction against the testing table's bad_loan values is calculated. The calculated accuracy is added to the accuracies list, and the prediction value is added to the summed predictions array:
|
|
The prediction value from the combined list is calculated by dividing the sum of each model's prediction by the number of models. The models' combined accuracy is also calculated and displayed. The record IDs, actual bad_loan value, and predicted bad_loan value are added as values to the matching output table columns.
|
|
If no data was found on the tom, finish processing:
|
|
Call complete() to tell Kinetica the proc code is finished:
|
|
Testing UDF Registration (register_execute_test.py)
To interact with Kinetica, an object of the GPUdb class is instantiated while providing the connection URL of the database server.
|
|
To upload the dt_test.py, kinetica_proc.py, and test_environment.py files to Kinetica, they will first need to be read in as bytes and added to a file data map:
|
|
After the files are placed in a data map, the distributed distributed_test proc is created in Kinetica and the files are associated with it:
|
|
Note
The proc requires the proper command and args to be executed. In this case, the assembled command line would be:
python dt_test.py
Finally, after the proc is created, it is executed. The testing and results table created in the Database Setup section are passed in here:
|
|
|
|