The following is a complete example, using the Python UDF API, of a non-CUDA
UDF that demonstrates how to build a generalized linear model (GLM) using
H2O that detects
correlation between different types of loan data and if a loan is bad or not. A
GLM estimates regression analysis based on a given distribution. This example
(and others) can be found in the Python UDF API
repository; this repository comes with Kinetica by default (located in
/opt/gpudb/udf/api/python/
) or can be downloaded/cloned from
GitHub.
The general prerequisites for using UDFs in Kinetica can be found on the User-Defined Function Implementation page.
Important
This example cannot run on Mac OSX
The following items are also necessary:
Python 3
Miniconda
Note
Visit the Conda website to download the Miniconda installer for Python 3.
KiFS
Important
Review Kinetica File System (KiFS) for more information on configuring and using KiFS.
H2O
H2O must be installed on all cluster nodes for this UDF to run successfully.
Install all H2O dependencies on all cluster nodes:
/opt/gpudb/bin/gpudb_pip install requests
/opt/gpudb/bin/gpudb_pip install tabulate
/opt/gpudb/bin/gpudb_pip install "colorama>=0.3.8"
/opt/gpudb/bin/gpudb_pip install future
Install the H2O Python module on all cluster nodes:
/opt/gpudb/bin/gpudb_pip install -f http://h2o-release.s3.amazonaws.com/h2o/latest_stable_Py.html h2o
This example requires local access to the Python UDF API repository. In the
desired directory, run the following but be sure to replace
<kinetica-version>
with the name of the installed Kinetica version, e.g,
v6.2.0
:
git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-udf-api-python.git
There are four files associated with the H2O GLM UDF example, all of which can be found in the Python UDF API repo.
test_environment.py
) that is called from the
initialization scriptsetup_table.py
) that creates input tables and
inserts data into themregister_execute_train.py
) to register and execute the UDFh2o_glm_train.py
) to train a GLM on sample loan data and store the
model in KiFSThis example runs inside a Conda environment. The environment can be
automatically configured using the conda_env_py3.yml
file found in the
Python UDF API repository.
In the same directory you cloned the API, change directory into the root folder of the Python UDF API repository:
cd kinetica-udf-api-python/
Create the Conda environment, replacing <environment name>
with the
desired name:
conda env create --name <environment name> --file conda_env_py3.yml
Note
It may take a few minutes to create the environment.
Verify the environment was created properly:
conda info --envs
Activate the new environment:
source activate <environment name>
Install PyGDF:
conda install -c numba -c conda-forge -c gpuopenanalytics/label/dev -c defaults pygdf=0.1.0a2
Install the Kinetica Python API where n is the desired version of Kinetica from PyPI:
pip install gpudb==6.2.0.n
Add the Python UDF API repo's root directory to the PYTHONPATH:
export PYTHONPATH=$(pwd):$PYTHONPATH
Edit the util/test_environment.py
script for the correct host, port,
user, and password for your Kinetica instance:
HOST = 'http://localhost'
PORT = '9191'
USER = 'testuser'
PASSWORD = 'Testuser123!'
Change directory into the UDF H2O GLM directory:
cd examples/UDF_h2o_glm
Run the UDF initialization script:
python setup_table.py
Update the host and port in register_execute_train.py
as necessary:
main(gpudb.GPUdb(encoding='BINARY', host='127.0.0.1', port='9191'))
Run the execute script for the training UDF:
python register_execute_train.py
Verify the results in the /opt/gpudb/core/logs/gpudb-proc.log
file on the
head node and/or in KiFS (in the GLM_model
directory).
This example details a distributed UDF and H2O to create a GLM from sample
loan data. The training occurs against the example_loan_train_data
table,
the training table and input table for the h2o_glm_train
proc. The H2O
GLM determines correlation between the different types loan data and if
a loan is bad or not.
The training table contains 11 columns, each containing values from the
loan_sample
dataset (located in kinetica-udf-api-python/util/
):
loan_amnt
-- a float column representing the loan amountint_rate
-- a float column representing the loan's interest rateemp_length
-- a float column representing the borrower's employment
lengthannual_inc
-- a float column representing the borrower's annual incomedti
-- a float column representing the borrower's debt-to-income ratiodelinq_2yrs
-- a float column representing delinquent payments against
the loan for the last two yearsrevol_util
-- a float column representing the borrower's revolving line
utilization ratetotal_acc
-- a float column representing the borrower's total amount
of accountsbad_loan
-- a float column representing if the loan was bad (1
) or
not (0
)longest_credit_length
-- a float column representing the borrower's
longest credit line lengthrecord_id
-- a float column to identify the loanThe UDF executed, h2o_glm_train
, uses the training table to train an
H2O GLM and then saves the model in KiFS.
The setup script, setup_table.py
, which creates all the necessary tables for
the UDFs, imports the test_environment.py
script to access its methods:
from util import test_environment as te
The script calls one method from test_environment.py
:
te.prepare_loan_data()
The methods in test_environment.py
require a connection to Kinetica.
This is done by instantiating an object of the GPUdb
class with a
provided connection URL (host and port):
HOST = 'http://localhost'
PORT = '9191'
USER = 'testuser'
PASSWORD = 'Testuser123!'
DB_HANDLE = gpudb.GPUdb(encoding='BINARY', host=HOST, port=PORT)
The prepare_loan_data()
method creates the types and tables for the
testing (example_loan_test_data
), training, and results
(example_loan_inference_result
) tables, but the tables are removed first
if they already exist:
Important
Though this method creates several tables, only the training table is used in the UDF. The model in this example UDF is only trained against the dataset and not tested, so only one table is necessary.
LOAN_TRAIN_DATA_TABLE_NAME = "example_loan_train_data"
LOAN_TEST_DATA_TABLE_NAME = "example_loan_test_data"
LOAN_INFERENCE_TABLE_NAME = "example_loan_inference_result"
LOAN_DATA_TYPE = """
{
"type": "record",
"name": "loan_type",
"fields": [
{"name":"loan_amnt","type":"float"},
{"name":"int_rate","type":"float"},
{"name":"emp_length","type":"float"},
{"name":"annual_inc","type":"float"},
{"name":"dti","type":"float"},
{"name":"delinq_2yrs","type":"float"},
{"name":"revol_util","type":"float"},
{"name":"total_acc","type":"float"},
{"name":"bad_loan","type":"float"},
{"name":"longest_credit_length","type":"float"},
{"name":"record_id","type":"float"}
]
}"""
LOAN_INFERENCE_TYPE = """
{
"type": "record",
"name": "loan_type",
"fields": [
{"name":"record_id","type":"float"},
{"name":"bad_loan_actual","type":"float"},
{"name":"bad_loan_predicted","type":"float"}
]
}"""
if DB_HANDLE.has_table(table_name=LOAN_TRAIN_DATA_TABLE_NAME)['table_exists']:
DB_HANDLE.clear_table(LOAN_TRAIN_DATA_TABLE_NAME)
if DB_HANDLE.has_table(table_name=LOAN_TEST_DATA_TABLE_NAME)['table_exists']:
DB_HANDLE.clear_table(LOAN_TEST_DATA_TABLE_NAME)
if DB_HANDLE.has_table(table_name=LOAN_INFERENCE_TABLE_NAME)['table_exists']:
DB_HANDLE.clear_table(LOAN_INFERENCE_TABLE_NAME)
response = DB_HANDLE.create_type(type_definition=LOAN_DATA_TYPE, label=LOAN_TRAIN_DATA_TABLE_NAME)
type_id = response['type_id']
response = DB_HANDLE.create_table(table_name=LOAN_TRAIN_DATA_TABLE_NAME, type_id=type_id)
print("Create loan train data table response status: {}".format(response['status_info']['status']))
response = DB_HANDLE.create_table(table_name=LOAN_TEST_DATA_TABLE_NAME, type_id=type_id)
print("Create loan test data table response status: {}".format(response['status_info']['status']))
response = DB_HANDLE.create_type(type_definition=LOAN_INFERENCE_TYPE, label=LOAN_INFERENCE_TABLE_NAME)
type_id = response['type_id']
response = DB_HANDLE.create_table(table_name=LOAN_INFERENCE_TABLE_NAME, type_id=type_id)
print("Create loan inference table response status: {}".format(response['status_info']['status']))
Then the method converts the loan_sample
data into a pandas data frame. Two
lists are initialized, which will be used to insert records into the training
table and the testing table. A simple counter is initialized as well:
pythonpath = os.environ['PYTHONPATH'].split(os.pathsep)[0]
records = pd.read_csv(pythonpath + '/util/loan_sample.csv.zip', sep=',', quotechar='"').values
print('Inserting loan data, this may take a few minutes.')
encoded_obj_list_train = []
encoded_obj_list_test = []
i = 0
For each record in the dataset, its ordered values are assigned to the columns in the training and testing tables. From the dataset, 10% of it is added to the testing table's list; the other 90% is added to the training table's list. Once the batch size of 1000 records has been reached, the records are inserted in the database. The lists are cleared and the process repeats:
for record in records:
i += 1
datum = collections.OrderedDict()
datum['loan_amnt'] = float(record[0])
datum['int_rate'] = float(record[1])
datum['emp_length'] = float(record[2])
datum['annual_inc'] = float(record[3])
datum['dti'] = float(record[4])
datum['delinq_2yrs'] = float(record[5])
datum['revol_util'] = float(record[6])
datum['total_acc'] = float(record[7])
datum['bad_loan'] = float(record[8])
datum['longest_credit_length'] = float(record[9])
datum['record_id'] = float(i)
if i % 10 == 0:
encoded_obj_list_test.append(DB_HANDLE.encode_datum(LOAN_DATA_TYPE, datum))
else:
encoded_obj_list_train.append(DB_HANDLE.encode_datum(LOAN_DATA_TYPE, datum))
if i % 1000 == 0:
response = DB_HANDLE.insert_records(table_name=LOAN_TRAIN_DATA_TABLE_NAME, data=encoded_obj_list_train,
list_encoding='binary', options={})
if response['status_info']['status'] == "ERROR":
print("Insert train response: {}".format(response))
response = DB_HANDLE.insert_records(table_name=LOAN_TEST_DATA_TABLE_NAME, data=encoded_obj_list_test,
list_encoding='binary', options={})
if response['status_info']['status'] == "ERROR":
print("Insert test response: {}".format(response))
encoded_obj_list_train = []
encoded_obj_list_test = []
If there's not enough records in the lists at the end of the loop to make a full batch size, the records are flushed to the database:
response = DB_HANDLE.insert_records(table_name=LOAN_TRAIN_DATA_TABLE_NAME, data=encoded_obj_list_train,
list_encoding='binary')
if response['status_info']['status'] == "ERROR":
print("Insert response: {}".format(response))
response = DB_HANDLE.insert_records(table_name=LOAN_TEST_DATA_TABLE_NAME, data=encoded_obj_list_train,
list_encoding='binary')
if response['status_info']['status'] == "ERROR":
print("Insert response: {}".format(response))
print('\nAll data inserted.')
First, several packages are imported to access the Kinetica Python UDF API, H2O GLM estimator, and H2O:
from kinetica_proc import ProcData
from h2o.estimators.glm import H2OGeneralizedLinearEstimator
import h2o
Next, a connection to H2O is initialized using all CPUs available on the host:
h2o.init(nthreads=-1)
The file gets a handle to the ProcData()
class, converts the data
from the input table (example_loan_train_data
) to an H2O dataframe, and
displays the dataframe's shape
:
proc_data = ProcData()
h20_df = proc_data.to_h2odf()
print('h2o df shape: {}'.format(h20_df.shape))
The data is split into three sets: 70% of the data is part of the train
set, 15% of the data is part of the valid
set, and the remaining 15% is
part of the test
set:
splits = h20_df.split_frame(ratios=[0.7, 0.15], seed=1)
train = splits[0]
valid = splits[1]
test = splits[2]
Let y
be the response column. Let x
be the predictor columns, but remove
the response (y
), interest rate (int_rate
-- it's correlated with the
outcome), and record ID (record_id
-- it has no value as a predictor). The
predictor columns are displayed:
y = 'bad_loan'
x = list(h20_df.columns)
x.remove(y) # remove the response
x.remove('int_rate') # remove the interest rate column because it's correlated with the outcome
x.remove('record_id')
print('Predictor columns: {}'.format(x))
A GLM is fit based on the binomial modeling classification, then the GLM is trained on the predictors and response setup previously:
glm_fit1 = H2OGeneralizedLinearEstimator(family='binomial', model_id='glm_fit1')
glm_fit1.train(x=x, y=y, training_frame=train)
The model is then stored in KiFS:
tmp_model1_path = h2o.save_model(glm_fit1, '/opt/gpudb/kifs/mount/GLM_model', force=True)
The H2O instance is shutdown and complete()
is called to tell Kinetica
the proc code is finished:
h2o.cluster().shutdown()
proc_data.complete()
To interact with Kinetica, an object of the GPUdb
class is instantiated
while providing the connection URL, including the host and port of the database
server. Ensure the host address and port are correct for your setup:
if __name__ == "__main__":
main(gpudb.GPUdb(encoding='BINARY', host='127.0.0.1', port='9191'))
To upload the h2o_glm_train.py
and kinetica_proc.py
files to Kinetica,
they will first need to be read in as bytes and added to a file data map:
file_paths = ["h2o_glm_train.py", "../../kinetica_proc.py"] # put the main python script in the first place
files = {}
for script_path in file_paths:
script_name = os.path.basename(script_path)
with open(script_path, 'rb') as f:
files[script_name] = f.read()
After the files are placed in a data map, the distributed H2o_train
proc is
created in Kinetica and the files are associated with it:
proc_name = 'H2o_Train'
print("Registering proc...")
response = db_handle.create_proc(proc_name, 'distributed', files, 'python', [file_paths[0]], {})
print(response)
Note
The proc requires the proper command
and args
to be executed. In
this case, the assembled command line would be:
python h2o_glm_train.py
Finally, after the proc is created, it is executed. The training table created in the Database Setup section is passed in here:
print("Executing proc...")
response = db_handle.execute_proc(proc_name, {}, {}, [te.LOAN_TRAIN_DATA_TABLE_NAME], {}, [], {})
print(response)