Version:

Example UDF (Non-CUDA) - H2O Random Forest Model

The following is a complete example, using the Python UDF API, of a non-CUDA UDF that demonstrates how to build a model using the random forest method via H2O . This model will learn to detect if a loan is bad or not depending on related loan data. The random forest method trains multiple models on small subsets of a large dataset and then combines the models' inference output. This example (and others) can be found in the Python UDF API repository; this repository comes with Kinetica by default (located in /opt/gpudb/udf/api/python/) or can be downloaded/cloned from GitHub.

References

Prerequisites

The general prerequisites for using UDFs in Kinetica can be found on the User-Defined Function Implementation page.

Important

This example cannot run on Mac OSX

The following items are also necessary:

  • Python 3

  • Miniconda

    Note

    Visit the Conda website to download the Miniconda installer for Python 3.

  • KiFS

    Important

    Review Kinetica File System (KiFS) for more information on configuring and using KiFS.

  • H2O

Install H2O

H2O must be installed on all cluster nodes for this UDF to run successfully.

  1. Install all H2O dependencies on all cluster nodes:

    /opt/gpudb/bin/gpudb_pip install requests
    /opt/gpudb/bin/gpudb_pip install tabulate
    /opt/gpudb/bin/gpudb_pip install "colorama>=0.3.8"
    /opt/gpudb/bin/gpudb_pip install future
    
  2. Install the H2O Python module on all cluster nodes:

    /opt/gpudb/bin/gpudb_pip install -f http://h2o-release.s3.amazonaws.com/h2o/latest_stable_Py.html h2o
    

UDF API Download

This example requires local access to the Python UDF API repository. In the desired directory, run the following but be sure to replace <kinetica-version> with the name of the installed Kinetica version, e.g, v7.0:

git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-udf-api-python.git

Relevant Scripts

There are four files associated with the H2O random forest UDF example, all of which can be found in the Python UDF API repo.

  • A database setup script (test_environment.py) that is called from the initialization script
  • An initialization script (setup_table.py) that creates input tables and inserts data into them
  • A script (register_execute_train_test.py) to register and execute the UDF
  • A UDF (h2o_rf_train_test.py) to train a model using the random forest method on sample loan data, store the model in KiFS, and then test the model

Prepare Environment

This example runs inside a Conda environment. The environment can be automatically configured using the conda_env_py3.yml file found in the Python UDF API repository.

  1. In the same directory you cloned the API, change directory into the root folder of the Python UDF API repository:

    cd kinetica-udf-api-python/
    
  2. Create the Conda environment, replacing <environment name> with the desired name:

    conda env create --name <environment name> --file conda_env_py3.yml
    

    Note

    It may take a few minutes to create the environment.

  3. Verify the environment was created properly:

    conda info --envs
    
  4. Activate the new environment:

    source activate <environment name>
    
  5. Install PyGDF:

    conda install -c numba -c conda-forge -c gpuopenanalytics/label/dev -c defaults pygdf=0.1.0a2
    
  6. Install the Kinetica Python API:

    pip install gpudb~=7.0.0
    
  7. Add the Python UDF API repo's root directory to the PYTHONPATH:

    export PYTHONPATH=$(pwd):$PYTHONPATH
    
  8. Edit the util/test_environment.py script for the correct host, port, user, and password for your Kinetica instance:

    HOST = 'http://localhost'
    PORT = '9191'
    USER = 'testuser'
    PASSWORD = 'Testuser123!'
    

UDF Deployment

  1. Change directory into the UDF H2O random forest directory:

    cd examples/UDF_h2o_rf
    
  2. Run the UDF initialization script:

    python setup_table.py
    
  3. Update the host and port in register_execute_train_test.py as necessary:

    HOST_IP = "127.0.0.1"
    
    if __name__ == "__main__":
        main(gpudb.GPUdb(encoding='BINARY', host=HOST_IP, port='9191'))
    
  4. Run the execute script for the training UDF:

    python register_execute_train_test.py
    
  5. Verify the results in the /opt/gpudb/core/logs/gpudb-proc.log file on the head node.

Execution Detail

This example details using a distributed UDF and H2O to create a random forest model from sample loan data and then test the model against a smaller subset of the loan data. The training and testing occurs against the data in the example_loan_train_data table, which is the training table and input table for the h2o_rf_train_test proc. The model determines correlation between the different types loan data and if a loan is bad or not.

The training table contains 11 columns, each containing values from the loan_sample dataset (located in kinetica-udf-api-python/util/):

  • loan_amnt -- a float column representing the loan amount
  • int_rate -- a float column representing the loan's interest rate
  • emp_length -- a float column representing the borrower's employment length
  • annual_inc -- a float column representing the borrower's annual income
  • dti -- a float column representing the borrower's debt-to-income ratio
  • delinq_2yrs -- a float column representing delinquent payments against the loan for the last two years
  • revol_util -- a float column representing the borrower's revolving line utilization rate
  • total_acc -- a float column representing the borrower's total amount of accounts
  • bad_loan -- a float column representing if the loan was bad (1) or not (0)
  • longest_credit_length -- a float column representing the borrower's longest credit line length
  • record_id -- a float column to identify the loan

The UDF executed, h2o_rf_train_test, splits the data in the training table into three sets: train, valid, and test. The model is trained against the train set, saved in KiFS, and then tested against the test set.

Database Setup

The setup script, setup_table.py, which creates all the necessary tables for the UDFs, imports the test_environment.py script to access its methods:

from util import test_environment as te

The script calls one method from test_environment.py:

te.prepare_loan_data()

The methods in test_environment.py require a connection to Kinetica. This is done by instantiating an object of the GPUdb class with a provided connection URL (host and port):

HOST = 'http://localhost'
PORT = '9191'
USER = 'testuser'
PASSWORD = 'Testuser123!'
DB_HANDLE = gpudb.GPUdb(encoding='BINARY', host=HOST, port=PORT)

The prepare_loan_data() method creates the types and tables for the testing (example_loan_test_data), training, and results (example_loan_inference_result) tables, but the tables are removed first if they already exist:

Important

Though this method creates several tables, only the training table is used in the UDF. The model in this example UDF is trained and tested against this dataset, so only one table is necessary.

LOAN_TRAIN_DATA_TABLE_NAME = "example_loan_train_data"
LOAN_TEST_DATA_TABLE_NAME = "example_loan_test_data"
LOAN_INFERENCE_TABLE_NAME = "example_loan_inference_result"
LOAN_DATA_TYPE = """
    {
        "type": "record",
        "name": "loan_type",
        "fields": [
            {"name":"loan_amnt","type":"float"},
            {"name":"int_rate","type":"float"},
            {"name":"emp_length","type":"float"},
            {"name":"annual_inc","type":"float"},
            {"name":"dti","type":"float"},
            {"name":"delinq_2yrs","type":"float"},
            {"name":"revol_util","type":"float"},
            {"name":"total_acc","type":"float"},
            {"name":"bad_loan","type":"float"},
            {"name":"longest_credit_length","type":"float"},
            {"name":"record_id","type":"float"}
        ]
    }"""
LOAN_INFERENCE_TYPE = """
{
        "type": "record",
        "name": "loan_type",
        "fields": [
            {"name":"record_id","type":"float"},
            {"name":"bad_loan_actual","type":"float"},
            {"name":"bad_loan_predicted","type":"float"}
        ]
    }"""
if DB_HANDLE.has_table(table_name=LOAN_TRAIN_DATA_TABLE_NAME)['table_exists']:
    DB_HANDLE.clear_table(LOAN_TRAIN_DATA_TABLE_NAME)
if DB_HANDLE.has_table(table_name=LOAN_TEST_DATA_TABLE_NAME)['table_exists']:
    DB_HANDLE.clear_table(LOAN_TEST_DATA_TABLE_NAME)
if DB_HANDLE.has_table(table_name=LOAN_INFERENCE_TABLE_NAME)['table_exists']:
    DB_HANDLE.clear_table(LOAN_INFERENCE_TABLE_NAME)
response = DB_HANDLE.create_type(type_definition=LOAN_DATA_TYPE, label=LOAN_TRAIN_DATA_TABLE_NAME)
type_id = response['type_id']
response = DB_HANDLE.create_table(table_name=LOAN_TRAIN_DATA_TABLE_NAME, type_id=type_id)
print("Create loan train data table response status: {}".format(response['status_info']['status']))
response = DB_HANDLE.create_table(table_name=LOAN_TEST_DATA_TABLE_NAME, type_id=type_id)
print("Create loan test data table response status: {}".format(response['status_info']['status']))
response = DB_HANDLE.create_type(type_definition=LOAN_INFERENCE_TYPE, label=LOAN_INFERENCE_TABLE_NAME)
type_id = response['type_id']
response = DB_HANDLE.create_table(table_name=LOAN_INFERENCE_TABLE_NAME, type_id=type_id)
print("Create loan inference table response status: {}".format(response['status_info']['status']))

Then the method converts the loan_sample data into a pandas data frame. Two lists are initialized, which will be used to insert records into the training table and the testing table. A simple counter is initialized as well:

pythonpath = os.environ['PYTHONPATH'].split(os.pathsep)[0]
records = pd.read_csv(pythonpath + '/util/loan_sample.csv.zip', sep=',', quotechar='"').values
print('Inserting loan data, this may take a few minutes.')
encoded_obj_list_train = []
encoded_obj_list_test = []
i = 0

For each record in the dataset, its ordered values are assigned to the columns in the training and testing tables. From the dataset, 10% of it is added to the testing table's list; the other 90% is added to the training table's list. Once the batch size of 1000 records has been reached, the records are inserted in the database. The lists are cleared and the process repeats:

for record in records:
    i += 1
    datum = collections.OrderedDict()
    datum['loan_amnt'] = float(record[0])
    datum['int_rate'] = float(record[1])
    datum['emp_length'] = float(record[2])
    datum['annual_inc'] = float(record[3])
    datum['dti'] = float(record[4])
    datum['delinq_2yrs'] = float(record[5])
    datum['revol_util'] = float(record[6])
    datum['total_acc'] = float(record[7])
    datum['bad_loan'] = float(record[8])
    datum['longest_credit_length'] = float(record[9])
    datum['record_id'] = float(i)
    if i % 10 == 0:
        encoded_obj_list_test.append(DB_HANDLE.encode_datum(LOAN_DATA_TYPE, datum))
    else:
        encoded_obj_list_train.append(DB_HANDLE.encode_datum(LOAN_DATA_TYPE, datum))
    if i % 1000 == 0:
        response = DB_HANDLE.insert_records(table_name=LOAN_TRAIN_DATA_TABLE_NAME, data=encoded_obj_list_train,
                                            list_encoding='binary', options={})
        if response['status_info']['status'] == "ERROR":
            print("Insert train response: {}".format(response))
        response = DB_HANDLE.insert_records(table_name=LOAN_TEST_DATA_TABLE_NAME, data=encoded_obj_list_test,
                                            list_encoding='binary', options={})
        if response['status_info']['status'] == "ERROR":
            print("Insert test response: {}".format(response))
        encoded_obj_list_train = []
        encoded_obj_list_test = []

If there's not enough records in the lists at the end of the loop to make a full batch size, the records are flushed to the database:

response = DB_HANDLE.insert_records(table_name=LOAN_TRAIN_DATA_TABLE_NAME, data=encoded_obj_list_train,
                                    list_encoding='binary')
if response['status_info']['status'] == "ERROR":
    print("Insert response: {}".format(response))
response = DB_HANDLE.insert_records(table_name=LOAN_TEST_DATA_TABLE_NAME, data=encoded_obj_list_train,
                                    list_encoding='binary')
if response['status_info']['status'] == "ERROR":
    print("Insert response: {}".format(response))
print('\nAll data inserted.')

Model Training UDF (h2o_rf_train_test.py)

First, several packages are imported to access the Kinetica Python UDF API, H2O, and the H2O random forest estimator:

from kinetica_proc import ProcData
import h2o
from h2o.estimators.random_forest import H2ORandomForestEstimator

Next, a connection to H2O is initialized using all CPUs available on the host:

h2o.init(nthreads=-1)

The file gets a handle to the ProcData() class, converts the data from the input table (example_loan_train_data) to an H2O dataframe, and displays the dataframe's shape:

proc_data = ProcData()
h20_df = proc_data.to_h2odf()
print('h2o df shape: {}'.format(h20_df.shape))

The data is split into three sets: 70% of the data is part of the train set, 15% of the data is part of the valid set, and the remaining 15% is part of the test set:

splits = h20_df.split_frame(ratios=[0.7, 0.15], seed=1)
train = splits[0]
valid = splits[1]
test = splits[2]

Let y be the response column. Let x be the predictor columns, but remove the response (y), interest rate (int_rate -- it's correlated with the outcome), and record ID (record_id -- it has no value as a predictor). The predictor columns are displayed:

y = 'bad_loan'
x = list(h20_df.columns)
x.remove(y)  # remove the response
x.remove('int_rate')  # remove the interest rate column because it's correlated with the outcome
x.remove('record_id')
print('Predictor columns: {}'.format(x))

A random forest model is fit and then trained on the predictors and response setup previously:

rf_fit1 = H2ORandomForestEstimator(model_id='rf_fit', seed=1)
rf_fit1.train(x=x, y=y, training_frame=train)

The KiFS mount point in the request info map is mapped and displayed. The model is stored in KiFS and its path is saved and displayed:

kifs_path = proc_data.request_info['kifs_mount_point']
print('KiFS mount point: {}'.format(kifs_path))
model1_path = h2o.save_model(rf_fit1, kifs_path+'/RF_model', force=True)
print('Saved model file to: {}'.format(model1_path))

The model is loaded from KiFS:

model1 = h2o.load_model(model1_path)

The model is tested against the test set and its performance is displayed. Performance is measured using several different metrics:

  • MSE (Mean Square Error)
  • RMSE (Root Mean Squared Error)
  • MAE (Mean Absolute Error)
  • RMSLE (Root Mean Squared Logarithmic Error)
  • Mean Residual Deviance
performance = model1.model_performance(test)
print('Performance of model 1: {}'.format(performance))

Training UDF Registration (register_execute_train_test.py)

To interact with Kinetica, an object of the GPUdb class is instantiated while providing the connection URL, including the host and port of the database server. Ensure the host address and port are correct for your setup:

HOST_IP = "127.0.0.1"

if __name__ == "__main__":
    main(gpudb.GPUdb(encoding='BINARY', host=HOST_IP, port='9191'))

To upload the h2o_rf_train_test.py and kinetica_proc.py files to Kinetica, they will first need to be read in as bytes and added to a file data map:

file_paths = ["h2o_rf_train_test.py", "../../kinetica_proc.py"]  # put the main python script in the first place
files = {}
for script_path in file_paths:
    script_name = os.path.basename(script_path)
    with open(script_path, 'rb') as f:
        files[script_name] = f.read()

After the files are placed in a data map, the distributed H2o_rf_train_test proc is created in Kinetica and the files are associated with it:

proc_name = 'H2o_rf_train_test'
response = db_handle.create_proc(proc_name, 'distributed', files, 'python', [file_paths[0]], {})
print(response)

Note

The proc requires the proper command and args to be executed. In this case, the assembled command line would be:

python h2o_rf_train_test.py

Finally, after the proc is created, it is executed. The training table created in the Database Setup section is passed in here:

response = db_handle.execute_proc(proc_name, {}, {}, [te.LOAN_TRAIN_DATA_TABLE_NAME], {}, [], {})
print(response)