Version:

Example UDF (Non-CUDA) - H2O Generalized Linear Model (GLM)

The following is a complete example, using the Python UDF API, of a non-CUDA UDF that demonstrates how to build a generalized linear model (GLM) using H2O that detects correlation between different types of loan data and if a loan is bad or not. A GLM estimates regression analysis based on a given distribution. This example (and others) can be found in the Python UDF API repository; this repository comes with Kinetica by default (located in /opt/gpudb/udf/api/python/) or can be downloaded/cloned from GitHub.

References

Prerequisites

The general prerequisites for using UDFs in Kinetica can be found on the User-Defined Function Implementation page.

Important

This example cannot run on Mac OSX

The following items are also necessary:

  • Python 3

  • Miniconda

    Note

    Visit the Conda website to download the Miniconda installer for Python 3.

  • KiFS

    Important

    Review Kinetica File System (KiFS) for more information on configuring and using KiFS.

  • H2O

Install H2O

H2O must be installed on all cluster nodes for this UDF to run successfully.

  1. Install all H2O dependencies on all cluster nodes:

    /opt/gpudb/bin/gpudb_pip install requests
    /opt/gpudb/bin/gpudb_pip install tabulate
    /opt/gpudb/bin/gpudb_pip install "colorama>=0.3.8"
    /opt/gpudb/bin/gpudb_pip install future
    
  2. Install the H2O Python module on all cluster nodes:

    /opt/gpudb/bin/gpudb_pip install -f http://h2o-release.s3.amazonaws.com/h2o/latest_stable_Py.html h2o
    

UDF API Download

This example requires local access to the Python UDF API repository. In the desired directory, run the following but be sure to replace <kinetica-version> with the name of the installed Kinetica version, e.g, v6.2.0:

git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-udf-api-python.git

Relevant Scripts

There are four files associated with the H2O GLM UDF example, all of which can be found in the Python UDF API repo.

  • A database setup script (test_environment.py) that is called from the initialization script
  • An initialization script (setup_table.py) that creates input tables and inserts data into them
  • A script (register_execute_train.py) to register and execute the UDF
  • A UDF (h2o_glm_train.py) to train a GLM on sample loan data and store the model in KiFS

Prepare Environment

This example runs inside a Conda environment. The environment can be automatically configured using the conda_env_py3.yml file found in the Python UDF API repository.

  1. In the same directory you cloned the API, change directory into the root folder of the Python UDF API repository:

    cd kinetica-udf-api-python/
    
  2. Create the Conda environment, replacing <environment name> with the desired name:

    conda env create --name <environment name> --file conda_env_py3.yml
    

    Note

    It may take a few minutes to create the environment.

  3. Verify the environment was created properly:

    conda info --envs
    
  4. Activate the new environment:

    source activate <environment name>
    
  5. Install PyGDF:

    conda install -c numba -c conda-forge -c gpuopenanalytics/label/dev -c defaults pygdf=0.1.0a2
    
  6. Install the Kinetica Python API where n is the desired version of Kinetica from PyPI:

    pip install gpudb==6.2.0.n
    
  7. Add the Python UDF API repo's root directory to the PYTHONPATH:

    export PYTHONPATH=$(pwd):$PYTHONPATH
    
  8. Edit the util/test_environment.py script for the correct host, port, user, and password for your Kinetica instance:

    HOST = 'http://localhost'
    PORT = '9191'
    USER = 'testuser'
    PASSWORD = 'Testuser123!'
    

UDF Deployment

  1. Change directory into the UDF H2O GLM directory:

    cd examples/UDF_h2o_glm
    
  2. Run the UDF initialization script:

    python setup_table.py
    
  3. Update the host and port in register_execute_train.py as necessary:

    main(gpudb.GPUdb(encoding='BINARY', host='127.0.0.1', port='9191'))
    
  4. Run the execute script for the training UDF:

    python register_execute_train.py
    
  5. Verify the results in the /opt/gpudb/core/logs/gpudb-proc.log file on the head node and/or in KiFS (in the GLM_model directory).

Execution Detail

This example details a distributed UDF and H2O to create a GLM from sample loan data. The training occurs against the example_loan_train_data table, the training table and input table for the h2o_glm_train proc. The H2O GLM determines correlation between the different types loan data and if a loan is bad or not.

The training table contains 11 columns, each containing values from the loan_sample dataset (located in kinetica-udf-api-python/util/):

  • loan_amnt -- a float column representing the loan amount
  • int_rate -- a float column representing the loan's interest rate
  • emp_length -- a float column representing the borrower's employment length
  • annual_inc -- a float column representing the borrower's annual income
  • dti -- a float column representing the borrower's debt-to-income ratio
  • delinq_2yrs -- a float column representing delinquent payments against the loan for the last two years
  • revol_util -- a float column representing the borrower's revolving line utilization rate
  • total_acc -- a float column representing the borrower's total amount of accounts
  • bad_loan -- a float column representing if the loan was bad (1) or not (0)
  • longest_credit_length -- a float column representing the borrower's longest credit line length
  • record_id -- a float column to identify the loan

The UDF executed, h2o_glm_train, uses the training table to train an H2O GLM and then saves the model in KiFS.

Database Setup

The setup script, setup_table.py, which creates all the necessary tables for the UDFs, imports the test_environment.py script to access its methods:

from util import test_environment as te

The script calls one method from test_environment.py:

te.prepare_loan_data()

The methods in test_environment.py require a connection to Kinetica. This is done by instantiating an object of the GPUdb class with a provided connection URL (host and port):

HOST = 'http://localhost'
PORT = '9191'
USER = 'testuser'
PASSWORD = 'Testuser123!'
DB_HANDLE = gpudb.GPUdb(encoding='BINARY', host=HOST, port=PORT)

The prepare_loan_data() method creates the types and tables for the testing (example_loan_test_data), training, and results (example_loan_inference_result) tables, but the tables are removed first if they already exist:

Important

Though this method creates several tables, only the training table is used in the UDF. The model in this example UDF is only trained against the dataset and not tested, so only one table is necessary.

LOAN_TRAIN_DATA_TABLE_NAME = "example_loan_train_data"
LOAN_TEST_DATA_TABLE_NAME = "example_loan_test_data"
LOAN_INFERENCE_TABLE_NAME = "example_loan_inference_result"
LOAN_DATA_TYPE = """
    {
        "type": "record",
        "name": "loan_type",
        "fields": [
            {"name":"loan_amnt","type":"float"},
            {"name":"int_rate","type":"float"},
            {"name":"emp_length","type":"float"},
            {"name":"annual_inc","type":"float"},
            {"name":"dti","type":"float"},
            {"name":"delinq_2yrs","type":"float"},
            {"name":"revol_util","type":"float"},
            {"name":"total_acc","type":"float"},
            {"name":"bad_loan","type":"float"},
            {"name":"longest_credit_length","type":"float"},
            {"name":"record_id","type":"float"}
        ]
    }"""
LOAN_INFERENCE_TYPE = """
{
        "type": "record",
        "name": "loan_type",
        "fields": [
            {"name":"record_id","type":"float"},
            {"name":"bad_loan_actual","type":"float"},
            {"name":"bad_loan_predicted","type":"float"}
        ]
    }"""
if DB_HANDLE.has_table(table_name=LOAN_TRAIN_DATA_TABLE_NAME)['table_exists']:
    DB_HANDLE.clear_table(LOAN_TRAIN_DATA_TABLE_NAME)
if DB_HANDLE.has_table(table_name=LOAN_TEST_DATA_TABLE_NAME)['table_exists']:
    DB_HANDLE.clear_table(LOAN_TEST_DATA_TABLE_NAME)
if DB_HANDLE.has_table(table_name=LOAN_INFERENCE_TABLE_NAME)['table_exists']:
    DB_HANDLE.clear_table(LOAN_INFERENCE_TABLE_NAME)
response = DB_HANDLE.create_type(type_definition=LOAN_DATA_TYPE, label=LOAN_TRAIN_DATA_TABLE_NAME)
type_id = response['type_id']
response = DB_HANDLE.create_table(table_name=LOAN_TRAIN_DATA_TABLE_NAME, type_id=type_id)
print("Create loan train data table response status: {}".format(response['status_info']['status']))
response = DB_HANDLE.create_table(table_name=LOAN_TEST_DATA_TABLE_NAME, type_id=type_id)
print("Create loan test data table response status: {}".format(response['status_info']['status']))
response = DB_HANDLE.create_type(type_definition=LOAN_INFERENCE_TYPE, label=LOAN_INFERENCE_TABLE_NAME)
type_id = response['type_id']
response = DB_HANDLE.create_table(table_name=LOAN_INFERENCE_TABLE_NAME, type_id=type_id)
print("Create loan inference table response status: {}".format(response['status_info']['status']))

Then the method converts the loan_sample data into a pandas data frame. Two lists are initialized, which will be used to insert records into the training table and the testing table. A simple counter is initialized as well:

pythonpath = os.environ['PYTHONPATH'].split(os.pathsep)[0]
records = pd.read_csv(pythonpath + '/util/loan_sample.csv.zip', sep=',', quotechar='"').values
print('Inserting loan data, this may take a few minutes.')
encoded_obj_list_train = []
encoded_obj_list_test = []
i = 0

For each record in the dataset, its ordered values are assigned to the columns in the training and testing tables. From the dataset, 10% of it is added to the testing table's list; the other 90% is added to the training table's list. Once the batch size of 1000 records has been reached, the records are inserted in the database. The lists are cleared and the process repeats:

for record in records:
    i += 1
    datum = collections.OrderedDict()
    datum['loan_amnt'] = float(record[0])
    datum['int_rate'] = float(record[1])
    datum['emp_length'] = float(record[2])
    datum['annual_inc'] = float(record[3])
    datum['dti'] = float(record[4])
    datum['delinq_2yrs'] = float(record[5])
    datum['revol_util'] = float(record[6])
    datum['total_acc'] = float(record[7])
    datum['bad_loan'] = float(record[8])
    datum['longest_credit_length'] = float(record[9])
    datum['record_id'] = float(i)
    if i % 10 == 0:
        encoded_obj_list_test.append(DB_HANDLE.encode_datum(LOAN_DATA_TYPE, datum))
    else:
        encoded_obj_list_train.append(DB_HANDLE.encode_datum(LOAN_DATA_TYPE, datum))
    if i % 1000 == 0:
        response = DB_HANDLE.insert_records(table_name=LOAN_TRAIN_DATA_TABLE_NAME, data=encoded_obj_list_train,
                                            list_encoding='binary', options={})
        if response['status_info']['status'] == "ERROR":
            print("Insert train response: {}".format(response))
        response = DB_HANDLE.insert_records(table_name=LOAN_TEST_DATA_TABLE_NAME, data=encoded_obj_list_test,
                                            list_encoding='binary', options={})
        if response['status_info']['status'] == "ERROR":
            print("Insert test response: {}".format(response))
        encoded_obj_list_train = []
        encoded_obj_list_test = []

If there's not enough records in the lists at the end of the loop to make a full batch size, the records are flushed to the database:

response = DB_HANDLE.insert_records(table_name=LOAN_TRAIN_DATA_TABLE_NAME, data=encoded_obj_list_train,
                                    list_encoding='binary')
if response['status_info']['status'] == "ERROR":
    print("Insert response: {}".format(response))
response = DB_HANDLE.insert_records(table_name=LOAN_TEST_DATA_TABLE_NAME, data=encoded_obj_list_train,
                                    list_encoding='binary')
if response['status_info']['status'] == "ERROR":
    print("Insert response: {}".format(response))
print('\nAll data inserted.')

Model Training UDF (h2o_glm_train.py)

First, several packages are imported to access the Kinetica Python UDF API, H2O GLM estimator, and H2O:

from kinetica_proc import ProcData
from h2o.estimators.glm import H2OGeneralizedLinearEstimator
import h2o

Next, a connection to H2O is initialized using all CPUs available on the host:

h2o.init(nthreads=-1)

The file gets a handle to the ProcData() class, converts the data from the input table (example_loan_train_data) to an H2O dataframe, and displays the dataframe's shape:

proc_data = ProcData()
h20_df = proc_data.to_h2odf()

print('h2o df shape: {}'.format(h20_df.shape))

The data is split into three sets: 70% of the data is part of the train set, 15% of the data is part of the valid set, and the remaining 15% is part of the test set:

splits = h20_df.split_frame(ratios=[0.7, 0.15], seed=1)
train = splits[0]
valid = splits[1]
test = splits[2]

Let y be the response column. Let x be the predictor columns, but remove the response (y), interest rate (int_rate -- it's correlated with the outcome), and record ID (record_id -- it has no value as a predictor). The predictor columns are displayed:

y = 'bad_loan'
x = list(h20_df.columns)
x.remove(y)  # remove the response
x.remove('int_rate')  # remove the interest rate column because it's correlated with the outcome
x.remove('record_id')
print('Predictor columns: {}'.format(x))

A GLM is fit based on the binomial modeling classification, then the GLM is trained on the predictors and response setup previously:

glm_fit1 = H2OGeneralizedLinearEstimator(family='binomial', model_id='glm_fit1')
glm_fit1.train(x=x, y=y, training_frame=train)

The model is then stored in KiFS:

tmp_model1_path = h2o.save_model(glm_fit1, '/opt/gpudb/kifs/mount/GLM_model', force=True)

The H2O instance is shutdown and complete() is called to tell Kinetica the proc code is finished:

h2o.cluster().shutdown()
proc_data.complete()

Training UDF Registration (register_execute_train.py)

To interact with Kinetica, an object of the GPUdb class is instantiated while providing the connection URL, including the host and port of the database server. Ensure the host address and port are correct for your setup:

if __name__ == "__main__":
    main(gpudb.GPUdb(encoding='BINARY', host='127.0.0.1', port='9191'))

To upload the h2o_glm_train.py and kinetica_proc.py files to Kinetica, they will first need to be read in as bytes and added to a file data map:

file_paths = ["h2o_glm_train.py", "../../kinetica_proc.py"]  # put the main python script in the first place
files = {}
for script_path in file_paths:
    script_name = os.path.basename(script_path)
    with open(script_path, 'rb') as f:
        files[script_name] = f.read()

After the files are placed in a data map, the distributed H2o_train proc is created in Kinetica and the files are associated with it:

proc_name = 'H2o_Train'
print("Registering proc...")
response = db_handle.create_proc(proc_name, 'distributed', files, 'python', [file_paths[0]], {})
print(response)

Note

The proc requires the proper command and args to be executed. In this case, the assembled command line would be:

python h2o_glm_train.py

Finally, after the proc is created, it is executed. The training table created in the Database Setup section is passed in here:

print("Executing proc...")
response = db_handle.execute_proc(proc_name, {}, {}, [te.LOAN_TRAIN_DATA_TABLE_NAME], {}, [], {})
print(response)