Version:

Example UDF (Non-CUDA) - Pandas

The following is a complete example, using the Python UDF API, of a non-CUDA UDF that demonstrates how to create pandas dataframes and insert them into tables in Kinetica. This example (and others) can be found in the Python UDF API repository; this repository comes with Kinetica by default (located in /opt/gpudb/udf/api/python/) or can be downloaded/cloned from GitHub.

References

Prerequisites

The general prerequisites for using UDFs in Kinetica can be found on the User-Defined Function Implementation page.

Important

This example cannot run on Mac OSX

The following items are also necessary:

  • Python 3
  • Miniconda

Note

Visit the Conda website to download the Miniconda installer for Python 3.

UDF API Download

This example requires local access to the Python UDF API repository. In the desired directory, run the following but be sure to replace <kinetica-version> with the name of the installed Kinetica version, e.g, v6.2.0:

git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-udf-api-python.git

Relevant Scripts

There are four files associated with the pandas UDF example, all of which can be found in the Python UDF API repo.

  • A database setup script (test_environment.py) that is called from the initialization script
  • An initialization script (setup_db.py) that creates the output table
  • A script (register_execute_UDF.py) to register and execute the UDF
  • A UDF (df_to_output_UDF.py) that creates a pandas dataframe and inserts it into the output table

Prepare Environment

This example runs inside a Conda environment. The environment can be automatically configured using the conda_env_py3.yml file found in the Python UDF API repository.

  1. In the same directory you cloned the API, change directory into the root folder of the Python UDF API repository:

    cd kinetica-udf-api-python/
    
  2. Create the Conda environment, replacing <environment name> with the desired name:

    conda env create --name <environment name> --file conda_env_py3.yml
    

    Note

    It may take a few minutes to create the environment.

  3. Verify the environment was created properly:

    conda info --envs
    
  4. Activate the new environment:

    source activate <environment name>
    
  5. Install PyGDF:

    conda install -c numba -c conda-forge -c gpuopenanalytics/label/dev -c defaults pygdf=0.1.0a2
    
  6. Install the Kinetica Python API where n is the desired version of Kinetica from PyPI:

    pip install gpudb==6.2.0.n
    
  7. Add the Python UDF API repo's root directory to the PYTHONPATH:

    export PYTHONPATH=$(pwd):$PYTHONPATH
    
  8. Edit the util/test_environment.py script for the correct host, port, user, and password for your Kinetica instance:

    HOST = 'http://localhost'
    PORT = '9191'
    USER = 'testuser'
    PASSWORD = 'Testuser123!'
    

UDF Deployment

  1. Change directory into the UDF Pandas directory:

    cd examples/UDF_pandas
    
  2. Run the UDF initialization script:

    python setup_db.py
    
  3. Update the host and port in register_execute_UDF.py as necessary:

    if __name__ == "__main__":
        main(gpudb.GPUdb(encoding='BINARY', host='127.0.0.1', port='9191'))
    
  4. Run the execute script for the training UDF:

    python register_execute_UDF.py
    
  5. Verify the results in the /opt/gpudb/core/logs/gpudb-proc.log file on the head node and/or in unittest_df_output output table in GAdmin.

Execution Detail

This example details using a distributed UDF to create and ingest a pandas dataframe into Kinetica. The df_to_output_UDF proc creates the dataframe and inserts it into the output table, unittest_df_output.

The dataframe has a shape of (3, 3) and will get inserted into the output table n number of times, where n is equal to the number of processing nodes available in each processing node container registered in Kinetica.

The output table contains 3 columns:

  • id -- an integer column
  • value_long -- a long column
  • value_float -- a float column

Database Setup

The setup script, setup_db.py, which creates the output table for the UDF, imports the test_environment.py script to access its methods:

from util import test_environment as te

The script calls one method from test_environment.py, passing in the values for the output table's name and schema:

te.create_test_output_table(te.TEST_OUTPUT_TABLE_NAME, te.TEST_DATA_TYPE_1)

The methods in test_environment.py require a connection to Kinetica. This is done by instantiating an object of the GPUdb class with a provided connection URL (host and port):

HOST = 'http://localhost'
PORT = '9191'
USER = 'testuser'
PASSWORD = 'Testuser123!'
DB_HANDLE = gpudb.GPUdb(encoding='BINARY', host=HOST, port=PORT)

The create_test_output_table() method creates the type and table for the output table, but the table is removed first if it already exists:

TEST_DATA_TABLE_NAME_1 = 'unittest_toy_data'
TEST_DATA_TYPE_1 = [
    ['id', gpudb.GPUdbRecordColumn._ColumnType.INT],
    ['value_long', gpudb.GPUdbRecordColumn._ColumnType.LONG],
    ['value_float', gpudb.GPUdbRecordColumn._ColumnType.FLOAT]
]
def create_test_output_table(table_name, table_type):
    if DB_HANDLE.has_table(table_name=table_name)['table_exists']:
        DB_HANDLE.clear_table(table_name)
    test_data_table = gpudb.GPUdbTable(table_type, table_name, db=DB_HANDLE)

UDF (df_to_output_UDF.py)

First, packages are imported to access the Kinetica Python UDF API and pandas:

from kinetica_proc import ProcData
import pandas as pd

Next, the file gets a handle to the ProcData() class:

proc_data = ProcData()

To output the number of values found on each processing node and processing node container, the rank and tom number in the request info map pointing to the current instance of the UDF are mapped and displayed:

rank_number = proc_data.request_info['rank_number']
tom_number = proc_data.request_info['tom_number']
print('\nUDF pandas proc output test r{}_t{}: instantiated.'.format(rank_number, tom_number))

The dataframe is created:

data = {'id': pd.Series([1, 12, 123]), 'value_long': pd.Series([2, 23, 24]), 'value_float': pd.Series([0.2, 2.3, 2.34])}
df = pd.DataFrame(data)

Get a handle to the output table from the proc (unittest_df_output). Its size is expanded to match the shape of the dataframe; this will allocated enough memory to copy all records in the dataframe to the output table. Then the dataframe is assigned to the output table:

output_table = proc_data.output_data[0]
output_table.size = df.shape[0]
proc_data.from_df(df, output_table)

UDF Registration (register_execute_UDF.py)

To interact with Kinetica, an object of the GPUdb class is instantiated while providing the connection URL, including the host and port of the database server. Ensure the host address and port are correct for your setup:

if __name__ == "__main__":
    main(gpudb.GPUdb(encoding='BINARY', host='127.0.0.1', port='9191'))

To upload the df_to_output_UDF.py and kinetica_proc.py files to Kinetica, they will first need to be read in as bytes and added to a file data map:

file_paths = ["df_to_output_UDF.py", "../../kinetica_proc.py"]
files = {}
for script_path in file_paths:
    script_name = os.path.basename(script_path)
    with open(script_path, 'rb') as f:
        files[script_name] = f.read()

After the files are placed in a data map, the distributed Pandas_df_output proc is created in Kinetica and the files are associated with it:

proc_name = 'Pandas_df_output'
response = db_handle.create_proc(proc_name, 'distributed', files, 'python', [file_paths[0]], {})
print(response)

Note

The proc requires the proper command and args to be executed. In this case, the assembled command line would be:

python df_to_output_UDF.py

Finally, after the proc is created, it is executed. The output table created in the Database Setup section is passed in here:

response = db_handle.execute_proc(proc_name, {}, {}, [], {}, [te.TEST_OUTPUT_TABLE_NAME], {})
print(response)