> ## Documentation Index
> Fetch the complete documentation index at: https://docs.kinetica.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Example UDF (Non-CUDA) - Distributed Model

The following is a complete example, using the *Python UDF* API, of a non-CUDA
*UDF* that demonstrates how to build multiple models based on distributed data
and combine them into an ensemble. This example (and
[others](/content/udf/python/examples)) can be found in the *Python UDF* API
repository; this repository can be downloaded/cloned from
[GitHub](https://github.com/kineticadb/kinetica-udf-api-python).

## References

* [Python UDF Reference](/content/udf/python/writing) -- detailed
  description of the entire *UDF* API
* [Running UDFs](/content/udf/python/running) -- detailed description on
  running *Python* UDFs
* [Example UDFs](/content/udf/python/examples) -- example *UDFs* written in
  *Python*

## Prerequisites

The general prerequisites for using *UDFs* in *Kinetica* can be found under
[UDF Prerequisites](/content/udf#udf-prereqs).

<Note>
  This example cannot run on Mac OSX
</Note>

The following items are also necessary:

* *Python 3*
* *Miniconda*

<Info>
  Visit the *Conda* [website](https://conda.io/miniconda.html) to download
  the *Miniconda* installer for *Python 3*.
</Info>

## UDF API Download

This example requires local access to the *Python UDF* API repository. In the
desired directory, run the following but be sure to replace
`<kinetica-version>` with the name of the installed Kinetica version, e.g,
`v7.2`:

```
git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-udf-api-python.git
```

## Relevant Scripts

There are six files associated with the distributed model *UDF* example, all
of which can be found in the *Python UDF API* repo.

* A database setup script (`test_environment.py`) that is called from the
  initialization script

* An initialization script (`setup_db.py`) that creates input tables and
  inserts data into them and creates an output ensemble table

* Two scripts to register and execute the UDFs:

  * a `register_execute_train.py` script to register and execute the
    model training UDF
  * a `register_execute_test.py` script to register and execute the
    prediction model combination UDF

* Two UDFs to train and test the model:

  * a `dt_train` UDF that trains and stores a decision tree model
  * a `dt_test` UDF that combines model predictions into an ensemble

This example runs inside a *Conda* environment. The environment can be
automatically configured using the `conda_env_py3.yml` file found in the
*Python UDF* API repository.

1. In the same directory you cloned the API, change directory into the root
   folder of the *Python UDF* API repository:

   ```
   cd kinetica-udf-api-python/
   ```

2. Create the *Conda* environment, replacing `<environment name>` with the
   desired name:

   ```
   conda env create --name <environment name> --file conda_env_py3.yml
   ```

   <Info>
     It may take a few minutes to create the environment.
   </Info>

3. Verify the environment was created properly:

   ```
   conda info --envs
   ```

4. Activate the new environment:

   ```
   conda activate <environment name>
   ```

5. Install *PyGDF*:

   ```
   conda install -c numba -c conda-forge -c gpuopenanalytics/label/dev -c defaults pygdf=0.1.0a2
   ```

6. Install the *Kinetica Python API*:

   ```
   pip install gpudb~=7.2.0
   ```

7. Add the *Python UDF* API repo's root directory to the PYTHONPATH:

   ```
   export PYTHONPATH=$(pwd):$PYTHONPATH
   ```

8. Edit the `util/test_environment.py` script for the correct database url,
   user, and password for your *Kinetica* instance:

   ```
   URL = 'http://localhost:9191;CombinePrepareAndExecute=1;RowsPerFetch=20000'
   USER = 'admin'
   PASSWORD = 'admin123'
   ```

## UDF Deployment

1. Change directory into the *UDF* distributed model example directory:

   ```
   cd examples/UDF_distributed_model
   ```
2. Run the *UDF* initialization script:

   ```
   python setup_db.py
   ```
3. Run the execute script for the training *UDF*:

   ```
   python register_execute_train.py --url <kinetica-url> --username <kinetica-user> --password <kinetica-pass>
   ```
4. Run the execute script for the model combination *UDF*:

   ```
   python register_execute_test.py --url <kinetica-url> --username <kinetica-user> --password <kinetica-pass>
   ```
5. Verify the results in [GAdmin](/content/admin/gadmin), either on the
   [logging page](/content/admin/gadmin/cluster#cluster-logging) or in the
   `example_loan_inference_result` output table on the
   [table view page](/content/admin/gadmin/data).

## Execution Detail

This example details two distributed *UDFs* to train and test model prediction
accuracy. The training and testing occurs using the following schema and tables:

* `example_udf_python` -- the *schema* containing all of the below
  tables
* `example_loan_train_data` -- the *training table* and input table for the
  `dt_train` proc. The algorithm to predict if a loan is bad or not is
  trained against this table.
* `ensemble_models` -- the intermediary *model storage table* that stores all
  the models after they've been trained against the *training table*
* `example_loan_test_data` -- the *testing table* and input table for the
  `dt_test` proc. The trained algorithm is tested against this table
  and the accuracy of the algorithm's predictions are calculated.
* `example_loan_inference_result` -- the *results table* and output table for
  the `dt_test` proc. The actual bad loan boolean and predicted bad loan
  boolean are compared per loan in this table.

The *training table* contains 11 columns, each containing values from the
`loan_sample` dataset (located in `kinetica-udf-api-python/util/`):

* `loan_amnt` -- a *float* column representing the loan amount
* `int_rate` -- a *float* column representing the loan's interest rate
* `emp_length`  -- a *float* column representing the borrower's employment
  length
* `annual_inc` -- a *float* column representing the borrower's annual income
* `dti` -- a *float* column representing the borrower's debt-to-income ratio
* `delinq_2yrs` -- a *float* column representing delinquent payments against
  the loan for the last two years
* `revol_util` -- a *float* column representing the borrower's revolving line
  utilization rate
* `total_acc` -- a *float* column representing the borrower's total amount
  of accounts
* `bad_loan` -- a *float* column representing if the loan was bad (`1`) or
  not (`0`)
* `longest_credit_length` -- a *float* column representing the borrower's
  longest credit line length
* `record_id` -- a *float* column to identify the loan

The *testing table* has the same columns as the *training table*.

The *results table* contains three columns:

* `record_id` -- a *float* column to identify the loan
* `bad_loan_actual` -- a *float* column that is the `bad_loan` value from
  the test table
* `bad_loan_predicted` -- a *float* column that is the `bad_loan` value from
  the models

The first *UDF* executed, `dt_train`, uses the *training table* to train
decision tree models, evaluate the models' accuracy against the
*training table*, and then store the models in the *model storage table* in
*Kinetica*.

The *model storage table* contains six columns:

* `model` -- a *bytes* column that stores the model
* `name` -- an unrestricted *string* column that is the name of the model
* `rank` -- an *int* column that represents the processing node container that
  holds the processing nodes for the database
* `tom` -- an *int* column that represents the processing node that contains
  the many shards of data inside the database
* `num_input_data` -- an *int* column that is the number of records used as
  input data for the model
* `date_created` -- a *datetime* column that is the date and time the model
  was created

The second *UDF* executed, `dt_test`, combines all the models
in the *model storage table* and evaluates the combined ensemble accuracy
and predictions against the *testing table*. The average accuracy of the
individual models and the ensemble are calculated. The predictions vs.
actual values against each loan are then stored in the *results table*.
Any output to the system log can be viewed in the *GAdmin*
[logging page](/content/admin/gadmin/cluster#cluster-logging).

### Database Setup

The setup script, `setup_db.py`, which creates all the necessary tables for
the *UDFs*, imports the `test_environment.py` script to access its methods:

```python theme={null}
from util import test_environment as te
```

The script calls three methods from `test_environment.py`:

```python theme={null}
te.create_schema()
te.prepare_loan_data()
te.create_ensemble_model_table()
```

The three methods in `test_environment.py` require a connection to *Kinetica*.
This is done by instantiating an object of the `GPUdb` class with a provided
connection URL.  See [Connecting via API](/content/api/concepts#api-conn) for details on the URL format and
how to look it up.

```python theme={null}
URL = 'http://localhost:9191'
USER = 'admin'
PASSWORD = 'admin123'
DB_HANDLE = gpudb.GPUdb(host=[URL], username=USER, password=PASSWORD)
```

The `create_schema()` method creates the schema that will contain all tables
used in the example:

```python theme={null}
SCHEMA = 'example_udf_python'
```

```python theme={null}
DB_HANDLE.create_schema(SCHEMA, options={'no_error_if_exists': 'true'})
```

The `prepare_loan_data()` method creates the types and tables for the
*testing*, *training*, and *results tables*, but the *tables* are removed first
if they already exist:

```python theme={null}
LOAN_TRAIN_DATA_TABLE_NAME = SCHEMA + ".example_loan_train_data"
LOAN_TEST_DATA_TABLE_NAME = SCHEMA + ".example_loan_test_data"
LOAN_INFERENCE_TABLE_NAME = SCHEMA + ".example_loan_inference_result"
LOAN_DATA_TYPE = """
    {
        "type": "record",
        "name": "loan_type",
        "fields": [
            {"name":"loan_amnt","type":"float"},
            {"name":"int_rate","type":"float"},
            {"name":"emp_length","type":"float"},
            {"name":"annual_inc","type":"float"},
            {"name":"dti","type":"float"},
            {"name":"delinq_2yrs","type":"float"},
            {"name":"revol_util","type":"float"},
            {"name":"total_acc","type":"float"},
            {"name":"bad_loan","type":"float"},
            {"name":"longest_credit_length","type":"float"},
            {"name":"record_id","type":"float"}
        ]
    }"""
LOAN_INFERENCE_TYPE = """
{
        "type": "record",
        "name": "loan_type",
        "fields": [
            {"name":"record_id","type":"float"},
            {"name":"bad_loan_actual","type":"float"},
            {"name":"bad_loan_predicted","type":"float"}
        ]
    }"""
```

```python theme={null}
if DB_HANDLE.has_table(table_name=LOAN_TRAIN_DATA_TABLE_NAME)['table_exists']:
    DB_HANDLE.clear_table(LOAN_TRAIN_DATA_TABLE_NAME)
if DB_HANDLE.has_table(table_name=LOAN_TEST_DATA_TABLE_NAME)['table_exists']:
    DB_HANDLE.clear_table(LOAN_TEST_DATA_TABLE_NAME)
if DB_HANDLE.has_table(table_name=LOAN_INFERENCE_TABLE_NAME)['table_exists']:
    DB_HANDLE.clear_table(LOAN_INFERENCE_TABLE_NAME)
response = DB_HANDLE.create_type(type_definition=LOAN_DATA_TYPE, label=LOAN_TRAIN_DATA_TABLE_NAME)
type_id = response['type_id']
response = DB_HANDLE.create_table(table_name=LOAN_TRAIN_DATA_TABLE_NAME, type_id=type_id)
print("Create loan train data table response status: {}".format(response['status_info']['status']))
response = DB_HANDLE.create_table(table_name=LOAN_TEST_DATA_TABLE_NAME, type_id=type_id)
print("Create loan test data table response status: {}".format(response['status_info']['status']))
response = DB_HANDLE.create_type(type_definition=LOAN_INFERENCE_TYPE, label=LOAN_INFERENCE_TABLE_NAME)
type_id = response['type_id']
response = DB_HANDLE.create_table(table_name=LOAN_INFERENCE_TABLE_NAME, type_id=type_id)
print("Create loan inference table response status: {}".format(response['status_info']['status']))
```

Then the method converts the `loan_sample` data into a pandas data frame. Two
lists are initialized, which will be used to insert records into the *training*
*table* and the *testing table*. A simple counter is initialized as well:

```python theme={null}
pythonpath = os.environ['PYTHONPATH'].split(os.pathsep)[0]
records = pd.read_csv(pythonpath + '/util/loan_sample.csv.zip', sep=',', quotechar='"').values
print('Inserting loan data, this may take a few minutes.')
encoded_obj_list_train = []
encoded_obj_list_test = []
i = 0
```

For each record in the dataset, its ordered values are assigned to the columns
in the *training* and *testing tables*. From the dataset, 10% of it is added to
the *testing table's* list; the other 90% is added to the *training table's*
list. Once the batch size of 1000 records has been reached, the records are
inserted in the database. The lists are cleared and the process repeats:

```python theme={null}
for record in records:
    i += 1
    datum = collections.OrderedDict()
    datum['loan_amnt'] = float(record[0])
    datum['int_rate'] = float(record[1])
    datum['emp_length'] = float(record[2])
    datum['annual_inc'] = float(record[3])
    datum['dti'] = float(record[4])
    datum['delinq_2yrs'] = float(record[5])
    datum['revol_util'] = float(record[6])
    datum['total_acc'] = float(record[7])
    datum['bad_loan'] = float(record[8])
    datum['longest_credit_length'] = float(record[9])
    datum['record_id'] = float(i)
    if i % 10 == 0:
        encoded_obj_list_test.append(DB_HANDLE.encode_datum(LOAN_DATA_TYPE, datum))
    else:
        encoded_obj_list_train.append(DB_HANDLE.encode_datum(LOAN_DATA_TYPE, datum))
    if i % 1000 == 0:
        response = DB_HANDLE.insert_records(table_name=LOAN_TRAIN_DATA_TABLE_NAME, data=encoded_obj_list_train,
                                            list_encoding='binary', options={})
        if response['status_info']['status'] == "ERROR":
            print("Insert train response: {}".format(response))
        response = DB_HANDLE.insert_records(table_name=LOAN_TEST_DATA_TABLE_NAME, data=encoded_obj_list_test,
                                            list_encoding='binary', options={})
        if response['status_info']['status'] == "ERROR":
            print("Insert test response: {}".format(response))
        encoded_obj_list_train = []
        encoded_obj_list_test = []
```

If there's not enough records in the lists at the end of the loop to make a full
batch size, the records are flushed to the database:

```python theme={null}
response = DB_HANDLE.insert_records(table_name=LOAN_TRAIN_DATA_TABLE_NAME, data=encoded_obj_list_train,
                                    list_encoding='binary')
if response['status_info']['status'] == "ERROR":
    print("Insert response: {}".format(response))
response = DB_HANDLE.insert_records(table_name=LOAN_TEST_DATA_TABLE_NAME, data=encoded_obj_list_train,
                                    list_encoding='binary')
if response['status_info']['status'] == "ERROR":
    print("Insert response: {}".format(response))
print('\nAll data inserted.')
```

The `create_ensemble_model_table()` method creates the type and table for the
*model storage table*, but the *table* is removed first if it already exists:

```python theme={null}
ENSEMBLE_MODEL_TABLE_NAME = SCHEMA + ".ensemble_models"
ENSEMBLE_MODEL_TYPE = MODEL_TYPE = """
    {
        "type": "record",
        "name": "ensemble_mode_type",
        "fields": [
            {"name":"model","type":"bytes"},
            {"name":"name","type":"string"},
            {"name":"rank","type":"int"},
            {"name":"tom","type":"int"},
            {"name":"num_input_data","type":"int"},
            {"name":"date_time_created","type":"string"}
        ]
    }"""
ENSEMBLE_MODEL_TYPE_PROPERTIES = {"date_time_created": ["datetime"]}
```

```python theme={null}
if DB_HANDLE.has_table(table_name=ENSEMBLE_MODEL_TABLE_NAME)['table_exists']:
    DB_HANDLE.clear_table(ENSEMBLE_MODEL_TABLE_NAME)
response = DB_HANDLE.create_type(type_definition=ENSEMBLE_MODEL_TYPE, label=ENSEMBLE_MODEL_TABLE_NAME,
                                 properties=ENSEMBLE_MODEL_TYPE_PROPERTIES)
response = DB_HANDLE.create_table(table_name=ENSEMBLE_MODEL_TABLE_NAME, type_id=response['type_id'])
print("Create model_table response status: {}".format(response['status_info']['status']))
```

### Model Training UDF (dt\_train.py)

First, several packages are imported to access the *Kinetica Python UDF* API,
decision tree modeling, object serialization, `test_environment.py` methods,
and an accuracy score calculation:

```python theme={null}
from kinetica_proc import ProcData
from sklearn import tree
import pickle
import test_environment as te
from sklearn.metrics import accuracy_score
```

Next, the file gets a handle to the `ProcData()` class:

```python theme={null}
proc_data = ProcData()
```

To output the number of values found on each processing node and processing node
container, the rank and tom number in the request info map pointing to the
current instance of the UDF are mapped and displayed:

```python theme={null}
rank_number = proc_data.request_info['rank_number']
tom_number = proc_data.request_info['tom_number']
print('\nUDF train r{}_t{}: instantiated.'.format(rank_number, tom_number))
```

The data from the input table (`example_loan_train_data`)--excluding the
records with *NaN* or *null* values--is converted to a pandas dataframe. The
number of input records is derived from the dataframe. Let `y` be the label
with the `bad_loan` column values; let `X` be the attributes with
rest of the column values:

```python theme={null}
training_data = proc_data.to_df().dropna()  # only use non-NAN rows
num_input_data = training_data.shape[0]
X = training_data[['loan_amnt', 'int_rate', 'emp_length', 'annual_inc', 'dti', 'delinq_2yrs', 'revol_util', 'total_acc',
                   'longest_credit_length']]
y = training_data[['bad_loan']]
```

The decision tree is created:

```python theme={null}
print('UDF train r{}_t{}: learning model on {} data points.'.format(rank_number, tom_number, num_input_data))
tree_classifier = tree.DecisionTreeClassifier(class_weight=None, criterion='entropy', max_depth=7, max_features=None,
                                              max_leaf_nodes=None, min_samples_leaf=5, min_samples_split=2,
                                              min_weight_fraction_leaf=0.0, presort=False, random_state=100,
                                              splitter='best')
```

As long as there are more than 10 values stored in the attributes group, the
model is created by training the tree algorithm on `X` and `y`:

```python theme={null}
if X.shape[0] > 10:
    model = tree_classifier.fit(X, y)
```

The accuracy of the new algorithm is calculated by comparing `y` (actual
`bad_loan` values) to the model's predictions based on the values
in `X`. The accuracy of the algorithm is then output:

```python theme={null}
acc = accuracy_score(y, model.predict(X))
print('UDF train r{}_t{}: training accuracy: {}'.format(rank_number, tom_number, acc))
```

The models are converted to byte arrays and stored as records in the
*model storage table*:

```python theme={null}
print('UDF train r{}_t{}: storing model.'.format(rank_number, tom_number))
model_byte_array = pickle.dumps(model)
te.store_ensemble_model(model_byte_array, 'sklearn_dt', rank_number, tom_number, num_input_data, acc)
```

```python theme={null}
def store_ensemble_model(model_binary, name, rank, tom, num_input_data, train_acc):
    """Store serialized model object into table"""
    encoded_object_list = []
    datum = collections.OrderedDict()
    datum['model'] = model_binary
    datum['name'] = str(name)
    datum['rank'] = int(rank)
    datum['tom'] = int(tom)
    datum['num_input_data'] = int(num_input_data)

    datum['date_time_created'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
    encoded_object_list.append(DB_HANDLE.encode_datum(ENSEMBLE_MODEL_TYPE, datum))
    response = DB_HANDLE.insert_records(table_name=ENSEMBLE_MODEL_TABLE_NAME, data=encoded_object_list,
                                        list_encoding='binary')
    print(response)
```

Call `complete()` to tell *Kinetica* the proc code is finished:

```python theme={null}
proc_data.complete()
```

### Training UDF Registration (register\_execute\_train.py)

To interact with *Kinetica*, an object of the `GPUdb` class is instantiated
while providing the connection URL of the database server.

```python theme={null}
main(gpudb.GPUdb(host=[args.url], username=args.username, password=args.password))
```

To upload the `dt_train.py`, `kinetica_proc.py`, and `test_environment.py`
files to *Kinetica*, they will first need to be read in as bytes and added to a
file data map:

```python theme={null}
file_paths = ["dt_train.py", "../../kinetica_proc.py", "../../util/test_environment.py"]
files = {}
for script_path in file_paths:
    script_name = os.path.basename(script_path)
    with open(script_path, 'rb') as f:
        files[script_name] = f.read()
```

After the files are placed in a data map, the distributed `distributed_train`
proc is created in *Kinetica* and the files are associated with it:

```python theme={null}
proc_name = 'distributed_train'
print("Registering proc...")
response = db_handle.create_proc(proc_name, 'distributed', files, 'python', [file_paths[0]], {})
print(response)
```

<Info>
  The proc requires the proper `command` and `args` to be executed. In
  this case, the assembled command line would be:

  ```
  python dt_train.py
  ```
</Info>

Finally, after the proc is created, it is executed. The *training table* created
in the [Database Setup](#database-setup) section is passed in here:

```python theme={null}
print("Executing proc...")
response = db_handle.execute_proc(proc_name, {}, {}, [te.LOAN_TRAIN_DATA_TABLE_NAME], {}, [], {})
print(response)
```

### Model Testing UDF (dt\_test.py)

First, several packages are imported to access the *Kinetica Python UDF* API,
object serialization, `test_environment.py` methods, an accuracy score
calculation, and `numpy` math methods:

```python theme={null}
from kinetica_proc import ProcData
import pickle
import test_environment as te
from sklearn.metrics import accuracy_score
import numpy as np
```

Next, the file gets a handle to the `ProcData()` class:

```python theme={null}
proc_data = ProcData()
```

To output the number of values found on each processing node and processing node
container, the rank and tom number in the request info map pointing to the
current instance of the UDF are mapped and displayed:

```python theme={null}
rank_number = proc_data.request_info['rank_number']
tom_number = proc_data.request_info['tom_number']
print('\nUDF test r{}_t{}: instantiated.'.format(rank_number, tom_number))
```

The data from the input table (`example_loan_test_data`)--excluding the
records with *NaN* or *null* values--is converted to a pandas dataframe. The
number of input records is derived from the dataframe. Let `y_actual`
be the label with the `bad_loan` column values; let `X` be the attributes
with the rest of the column values. Let `record_ids` represent the loan IDs:

```python theme={null}
test_data = proc_data.to_df().dropna()
num_test_data = test_data.shape[0]
X = test_data[['loan_amnt', 'int_rate', 'emp_length', 'annual_inc', 'dti', 'delinq_2yrs', 'revol_util', 'total_acc',
               'longest_credit_length']]
y_actual = test_data[['bad_loan']]
record_ids = test_data[['record_id']]
```

Get a handle to the output table from the proc
(`example_loan_inference_result`). Its size is expanded to match the number of
records from the dataframe; this will allocated enough memory to copy all
input records to the output table. Handles are created for the columns that
will be written to: `record_id`, `bad_loan_actual`, and
`bad_loan_predicted`:

```python theme={null}
out_table = proc_data.output_data[0]
out_table.size = num_test_data
record_id_column, y_actual_column, y_predicted_column = out_table[0], out_table[1], out_table[2]
```

If the number of input records is greater than 0, the models from the
*model storage table* are retrieved, an array is created to hold a number of
`0.` values equal to the number of input records, and an empty list is
initialized:

```python theme={null}
if num_test_data > 0:
    models_from_db = te.load_ensemble_models('sklearn_dt')
    sum_predictions = np.full(num_test_data, 0.)
    accuracies = list()
```

The `load_ensemble_models` method from `test_environment.py` gets the
models, decodes the records, and places them in a list:

```python theme={null}
def load_ensemble_models(name, limit=100):
    """Load serialized ensemble model objects that share the same name from table."""
    response = DB_HANDLE.get_records(table_name=ENSEMBLE_MODEL_TABLE_NAME, offset=0, limit=limit,
                                     options={"expression": "(name='" + str(name) + "')"})
    res_decoded = gpudb.GPUdbRecord.decode_binary_data(response["type_schema"], response["records_binary"])
    result_dict_list = []
    for current_result in res_decoded:
        current_dict = dict()
        current_dict['model'] = current_result['model']
        current_dict['num_input_data'] = current_result['num_input_data']
        current_dict['rank'] = current_result['rank']
        current_dict['tom'] = current_result['tom']
        current_dict['date_time_created'] = current_result['date_time_created']
        result_dict_list.append(current_dict)
    return result_dict_list
```

For each model, the object is deserialized and loaded into the UDF. The models'
predictions are stored and the accuracy
of the prediction against the *testing table's* `bad_loan`
values is calculated. The calculated accuracy is added to the `accuracies`
list, and the prediction value is added to the summed predictions array:

```python theme={null}
for current_model_dict in models_from_db:
    model = pickle.loads(current_model_dict['model'])
    y_test_predict = model.predict(X.values)
    test_accuracy = accuracy_score(y_actual, y_test_predict)
    accuracies.append(test_accuracy)
    sum_predictions += y_test_predict
```

The prediction value from the combined list is calculated by dividing the sum
of each model's prediction by the number of models. The models' combined
accuracy is also calculated and displayed. The record IDs, actual `bad_loan`
value, and predicted `bad_loan` value are added as values to the matching
output table columns.

```python theme={null}
combined_predictions = np.around(sum_predictions / len(models_from_db))
combined_accuracy = accuracy_score(y_actual, combined_predictions)
print('Average accuracies of {} models: {}'.format(len(accuracies), (sum(accuracies) / float(len(accuracies)))))
print('Ensemble accuracy: {}'.format(combined_accuracy))
record_id_column[:] = record_ids.values
y_actual_column[:] = y_actual.values
y_predicted_column[:] = combined_predictions
```

If no data was found on the tom, finish processing:

```python theme={null}
else:
    print('UDF test r{}_t{}: no test data on tom.\n'.format(rank_number, tom_number))
```

Call `complete()` to tell *Kinetica* the proc code is finished:

```python theme={null}
proc_data.complete()
```

### Testing UDF Registration (register\_execute\_test.py)

To interact with *Kinetica*, an object of the `GPUdb` class is instantiated
while providing the connection URL of the database server.

```python theme={null}
main(gpudb.GPUdb(host=[args.url], username=args.username, password=args.password))
```

To upload the `dt_test.py`, `kinetica_proc.py`, and `test_environment.py`
files to *Kinetica*, they will first need to be read in as bytes and added to a
file data map:

```python theme={null}
file_paths = ["dt_test.py", "../../kinetica_proc.py", "../../util/test_environment.py"]
files = {}
for script_path in file_paths:
    script_name = os.path.basename(script_path)
    with open(script_path, 'rb') as f:
        files[script_name] = f.read()
```

After the files are placed in a data map, the distributed `distributed_test`
proc is created in *Kinetica* and the files are associated with it:

```python theme={null}
proc_name = 'distributed_test'
print("Registering proc...")
response = db_handle.create_proc(proc_name, 'distributed', files, 'python', [file_paths[0]], {})
print(response)
```

<Info>
  The proc requires the proper `command` and `args` to be executed. In
  this case, the assembled command line would be:

  ```
  python dt_test.py
  ```
</Info>

Finally, after the proc is created, it is executed. The *testing* and
*results table* created in the [Database Setup](#database-setup) section are passed in here:

```python theme={null}
TEST_INPUT_TABLE = te.LOAN_TEST_DATA_TABLE_NAME
TEST_OUTPUT_TABLE = te.LOAN_INFERENCE_TABLE_NAME
```

```python theme={null}
print("Executing proc...")
response = db_handle.execute_proc(proc_name, {}, {}, [TEST_INPUT_TABLE], {}, [TEST_OUTPUT_TABLE], {})
print(response)
```
