The following is a complete example, using the Python UDF API, of a non-CUDA
UDF that demonstrates how to build a model using the random forest method via
H2O . This model
will learn to detect if a loan is bad or not depending on related loan data. The
random forest method trains multiple models on small subsets of a large dataset
and then combines the models' inference output. This example (and
others) can be found in the Python UDF API
repository; this repository comes with Kinetica by default (located in
/opt/gpudb/udf/api/python/
) or can be downloaded/cloned from
GitHub.
The general prerequisites for using UDFs in Kinetica can be found on the User-Defined Function Implementation page.
Important
This example cannot run on Mac OSX
The following items are also necessary:
Python 3
Miniconda
Note
Visit the Conda website to download the Miniconda installer for Python 3.
KiFS
Important
Review Kinetica File System (KiFS) for more information on configuring and using KiFS.
H2O
H2O must be installed on all cluster nodes for this UDF to run successfully.
Install all H2O dependencies on all cluster nodes:
/opt/gpudb/bin/gpudb_pip install requests
/opt/gpudb/bin/gpudb_pip install tabulate
/opt/gpudb/bin/gpudb_pip install "colorama>=0.3.8"
/opt/gpudb/bin/gpudb_pip install future
Install the H2O Python module on all cluster nodes:
/opt/gpudb/bin/gpudb_pip install -f http://h2o-release.s3.amazonaws.com/h2o/latest_stable_Py.html h2o
This example requires local access to the Python UDF API repository. In the
desired directory, run the following but be sure to replace
<kinetica-version>
with the name of the installed Kinetica version, e.g,
v7.0
:
git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-udf-api-python.git
There are four files associated with the H2O random forest UDF example, all of which can be found in the Python UDF API repo.
test_environment.py
) that is called from the
initialization scriptsetup_table.py
) that creates input
tables and inserts data into themregister_execute_train_test.py
) to register and execute the UDFh2o_rf_train_test.py
) to train a model using the random forest
method on sample loan data, store the model in KiFS, and then test the modelThis example runs inside a Conda environment. The environment can be
automatically configured using the conda_env_py3.yml
file found in the
Python UDF API repository.
In the same directory you cloned the API, change directory into the root folder of the Python UDF API repository:
cd kinetica-udf-api-python/
Create the Conda environment, replacing <environment name>
with the
desired name:
conda env create --name <environment name> --file conda_env_py3.yml
Note
It may take a few minutes to create the environment.
Verify the environment was created properly:
conda info --envs
Activate the new environment:
source activate <environment name>
Install PyGDF:
conda install -c numba -c conda-forge -c gpuopenanalytics/label/dev -c defaults pygdf=0.1.0a2
Install the Kinetica Python API:
pip install gpudb~=7.0.0
Add the Python UDF API repo's root directory to the PYTHONPATH:
export PYTHONPATH=$(pwd):$PYTHONPATH
Edit the util/test_environment.py
script for the correct host, port,
user, and password for your Kinetica instance:
HOST = 'http://localhost'
PORT = '9191'
USER = 'testuser'
PASSWORD = 'Testuser123!'
Change directory into the UDF H2O random forest directory:
cd examples/UDF_h2o_rf
Run the UDF initialization script:
python setup_table.py
Update the host and port in register_execute_train_test.py
as necessary:
HOST_IP = "127.0.0.1"
if __name__ == "__main__":
main(gpudb.GPUdb(encoding='BINARY', host=HOST_IP, port='9191'))
Run the execute script for the training UDF:
python register_execute_train_test.py
Verify the results in the /opt/gpudb/core/logs/gpudb-proc.log
file on the
head node.
This example details using a distributed UDF and H2O to create a random
forest model from sample loan data and then test the model against a smaller
subset of the loan data. The training and testing occurs against the data in the
example_loan_train_data
table, which is the training table and input table
for the h2o_rf_train_test
proc. The model determines correlation between the
different types loan data and if a loan is bad or not.
The training table contains 11 columns, each containing values from the
loan_sample
dataset (located in kinetica-udf-api-python/util/
):
loan_amnt
-- a float column representing the loan amountint_rate
-- a float column representing the loan's interest rateemp_length
-- a float column representing the borrower's employment
lengthannual_inc
-- a float column representing the borrower's annual incomedti
-- a float column representing the borrower's debt-to-income ratiodelinq_2yrs
-- a float column representing delinquent payments against
the loan for the last two yearsrevol_util
-- a float column representing the borrower's revolving line
utilization ratetotal_acc
-- a float column representing the borrower's total amount
of accountsbad_loan
-- a float column representing if the loan was bad (1
) or
not (0
)longest_credit_length
-- a float column representing the borrower's
longest credit line lengthrecord_id
-- a float column to identify the loanThe UDF executed, h2o_rf_train_test
, splits the data in the
training table into three sets: train, valid, and test. The model is
trained against the train set, saved in KiFS, and then tested against the
test set.
The setup script, setup_table.py
, which creates all the necessary tables for
the UDFs, imports the test_environment.py
script to access its methods:
from util import test_environment as te
The script calls one method from test_environment.py
:
te.prepare_loan_data()
The methods in test_environment.py
require a connection to Kinetica.
This is done by instantiating an object of the GPUdb
class with a
provided connection URL (host and port):
HOST = 'http://localhost'
PORT = '9191'
USER = 'testuser'
PASSWORD = 'Testuser123!'
DB_HANDLE = gpudb.GPUdb(encoding='BINARY', host=HOST, port=PORT)
The prepare_loan_data()
method creates the types and tables for the
testing (example_loan_test_data
), training, and results
(example_loan_inference_result
) tables, but the tables are removed first
if they already exist:
Important
Though this method creates several tables, only the training table is used in the UDF. The model in this example UDF is trained and tested against this dataset, so only one table is necessary.
LOAN_TRAIN_DATA_TABLE_NAME = "example_loan_train_data"
LOAN_TEST_DATA_TABLE_NAME = "example_loan_test_data"
LOAN_INFERENCE_TABLE_NAME = "example_loan_inference_result"
LOAN_DATA_TYPE = """
{
"type": "record",
"name": "loan_type",
"fields": [
{"name":"loan_amnt","type":"float"},
{"name":"int_rate","type":"float"},
{"name":"emp_length","type":"float"},
{"name":"annual_inc","type":"float"},
{"name":"dti","type":"float"},
{"name":"delinq_2yrs","type":"float"},
{"name":"revol_util","type":"float"},
{"name":"total_acc","type":"float"},
{"name":"bad_loan","type":"float"},
{"name":"longest_credit_length","type":"float"},
{"name":"record_id","type":"float"}
]
}"""
LOAN_INFERENCE_TYPE = """
{
"type": "record",
"name": "loan_type",
"fields": [
{"name":"record_id","type":"float"},
{"name":"bad_loan_actual","type":"float"},
{"name":"bad_loan_predicted","type":"float"}
]
}"""
if DB_HANDLE.has_table(table_name=LOAN_TRAIN_DATA_TABLE_NAME)['table_exists']:
DB_HANDLE.clear_table(LOAN_TRAIN_DATA_TABLE_NAME)
if DB_HANDLE.has_table(table_name=LOAN_TEST_DATA_TABLE_NAME)['table_exists']:
DB_HANDLE.clear_table(LOAN_TEST_DATA_TABLE_NAME)
if DB_HANDLE.has_table(table_name=LOAN_INFERENCE_TABLE_NAME)['table_exists']:
DB_HANDLE.clear_table(LOAN_INFERENCE_TABLE_NAME)
response = DB_HANDLE.create_type(type_definition=LOAN_DATA_TYPE, label=LOAN_TRAIN_DATA_TABLE_NAME)
type_id = response['type_id']
response = DB_HANDLE.create_table(table_name=LOAN_TRAIN_DATA_TABLE_NAME, type_id=type_id)
print("Create loan train data table response status: {}".format(response['status_info']['status']))
response = DB_HANDLE.create_table(table_name=LOAN_TEST_DATA_TABLE_NAME, type_id=type_id)
print("Create loan test data table response status: {}".format(response['status_info']['status']))
response = DB_HANDLE.create_type(type_definition=LOAN_INFERENCE_TYPE, label=LOAN_INFERENCE_TABLE_NAME)
type_id = response['type_id']
response = DB_HANDLE.create_table(table_name=LOAN_INFERENCE_TABLE_NAME, type_id=type_id)
print("Create loan inference table response status: {}".format(response['status_info']['status']))
Then the method converts the loan_sample
data into a pandas data frame. Two
lists are initialized, which will be used to insert records into the training
table and the testing table. A simple counter is initialized as well:
pythonpath = os.environ['PYTHONPATH'].split(os.pathsep)[0]
records = pd.read_csv(pythonpath + '/util/loan_sample.csv.zip', sep=',', quotechar='"').values
print('Inserting loan data, this may take a few minutes.')
encoded_obj_list_train = []
encoded_obj_list_test = []
i = 0
For each record in the dataset, its ordered values are assigned to the columns in the training and testing tables. From the dataset, 10% of it is added to the testing table's list; the other 90% is added to the training table's list. Once the batch size of 1000 records has been reached, the records are inserted in the database. The lists are cleared and the process repeats:
for record in records:
i += 1
datum = collections.OrderedDict()
datum['loan_amnt'] = float(record[0])
datum['int_rate'] = float(record[1])
datum['emp_length'] = float(record[2])
datum['annual_inc'] = float(record[3])
datum['dti'] = float(record[4])
datum['delinq_2yrs'] = float(record[5])
datum['revol_util'] = float(record[6])
datum['total_acc'] = float(record[7])
datum['bad_loan'] = float(record[8])
datum['longest_credit_length'] = float(record[9])
datum['record_id'] = float(i)
if i % 10 == 0:
encoded_obj_list_test.append(DB_HANDLE.encode_datum(LOAN_DATA_TYPE, datum))
else:
encoded_obj_list_train.append(DB_HANDLE.encode_datum(LOAN_DATA_TYPE, datum))
if i % 1000 == 0:
response = DB_HANDLE.insert_records(table_name=LOAN_TRAIN_DATA_TABLE_NAME, data=encoded_obj_list_train,
list_encoding='binary', options={})
if response['status_info']['status'] == "ERROR":
print("Insert train response: {}".format(response))
response = DB_HANDLE.insert_records(table_name=LOAN_TEST_DATA_TABLE_NAME, data=encoded_obj_list_test,
list_encoding='binary', options={})
if response['status_info']['status'] == "ERROR":
print("Insert test response: {}".format(response))
encoded_obj_list_train = []
encoded_obj_list_test = []
If there's not enough records in the lists at the end of the loop to make a full batch size, the records are flushed to the database:
response = DB_HANDLE.insert_records(table_name=LOAN_TRAIN_DATA_TABLE_NAME, data=encoded_obj_list_train,
list_encoding='binary')
if response['status_info']['status'] == "ERROR":
print("Insert response: {}".format(response))
response = DB_HANDLE.insert_records(table_name=LOAN_TEST_DATA_TABLE_NAME, data=encoded_obj_list_train,
list_encoding='binary')
if response['status_info']['status'] == "ERROR":
print("Insert response: {}".format(response))
print('\nAll data inserted.')
First, several packages are imported to access the Kinetica Python UDF API, H2O, and the H2O random forest estimator:
from kinetica_proc import ProcData
import h2o
from h2o.estimators.random_forest import H2ORandomForestEstimator
Next, a connection to H2O is initialized using all CPUs available on the host:
h2o.init(nthreads=-1)
The file gets a handle to the ProcData()
class, converts the data
from the input table (example_loan_train_data
) to an H2O dataframe, and
displays the dataframe's shape
:
proc_data = ProcData()
h20_df = proc_data.to_h2odf()
print('h2o df shape: {}'.format(h20_df.shape))
The data is split into three sets: 70% of the data is part of the train
set, 15% of the data is part of the valid
set, and the remaining 15% is
part of the test
set:
splits = h20_df.split_frame(ratios=[0.7, 0.15], seed=1)
train = splits[0]
valid = splits[1]
test = splits[2]
Let y
be the response column. Let x
be the predictor columns, but remove
the response (y
), interest rate (int_rate
-- it's correlated with the
outcome), and record ID (record_id
-- it has no value as a predictor). The
predictor columns are displayed:
y = 'bad_loan'
x = list(h20_df.columns)
x.remove(y) # remove the response
x.remove('int_rate') # remove the interest rate column because it's correlated with the outcome
x.remove('record_id')
print('Predictor columns: {}'.format(x))
A random forest model is fit and then trained on the predictors and response setup previously:
rf_fit1 = H2ORandomForestEstimator(model_id='rf_fit', seed=1)
rf_fit1.train(x=x, y=y, training_frame=train)
The KiFS mount point in the request info map is mapped and displayed. The model is stored in KiFS and its path is saved and displayed:
kifs_path = proc_data.request_info['kifs_mount_point']
print('KiFS mount point: {}'.format(kifs_path))
model1_path = h2o.save_model(rf_fit1, kifs_path+'/RF_model', force=True)
print('Saved model file to: {}'.format(model1_path))
The model is loaded from KiFS:
model1 = h2o.load_model(model1_path)
The model is tested against the test
set and its performance is displayed.
Performance is measured using several different metrics:
MSE
(Mean Square Error)RMSE
(Root Mean Squared Error)MAE
(Mean Absolute Error)RMSLE
(Root Mean Squared Logarithmic Error)performance = model1.model_performance(test)
print('Performance of model 1: {}'.format(performance))
To interact with Kinetica, an object of the GPUdb
class is instantiated
while providing the connection URL, including the host and port of the database
server. Ensure the host address and port are correct for your setup:
HOST_IP = "127.0.0.1"
if __name__ == "__main__":
main(gpudb.GPUdb(encoding='BINARY', host=HOST_IP, port='9191'))
To upload the h2o_rf_train_test.py
and kinetica_proc.py
files to
Kinetica, they will first need to be read in as bytes and added to a file data
map:
file_paths = ["h2o_rf_train_test.py", "../../kinetica_proc.py"] # put the main python script in the first place
files = {}
for script_path in file_paths:
script_name = os.path.basename(script_path)
with open(script_path, 'rb') as f:
files[script_name] = f.read()
After the files are placed in a data map, the distributed H2o_rf_train_test
proc is created in Kinetica and the files are associated with it:
proc_name = 'H2o_rf_train_test'
response = db_handle.create_proc(proc_name, 'distributed', files, 'python', [file_paths[0]], {})
print(response)
Note
The proc requires the proper command
and args
to be executed. In
this case, the assembled command line would be:
python h2o_rf_train_test.py
Finally, after the proc is created, it is executed. The training table created in the Database Setup section is passed in here:
response = db_handle.execute_proc(proc_name, {}, {}, [te.LOAN_TRAIN_DATA_TABLE_NAME], {}, [], {})
print(response)