Example UDF (Non-CUDA) - Pandas

The following is a complete example, using the Python UDF API, of a non-CUDA UDF that demonstrates how to create pandas dataframes and insert them into tables in Kinetica. This example (and others) can be found in the Python UDF API repository; this repository can be downloaded/cloned from GitHub.

References

Prerequisites

The general prerequisites for using UDFs in Kinetica can be found under UDF Prerequisites.

Important

This example cannot run on Mac OSX

The following items are also necessary:

  • Python 3
  • Miniconda

Note

Visit the Conda website to download the Miniconda installer for Python 3.

UDF API Download

This example requires local access to the Python UDF API repository. In the desired directory, run the following but be sure to replace <kinetica-version> with the name of the installed Kinetica version, e.g, v7.1:

git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-udf-api-python.git

Relevant Scripts

There are four files associated with the pandas UDF example, all of which can be found in the Python UDF API repo.

  • A database setup script (test_environment.py) that is called from the initialization script
  • An initialization script (setup_db.py) that creates the output table
  • A script (register_execute_UDF.py) to register and execute the UDF
  • A UDF (df_to_output_UDF.py) that creates a pandas dataframe and inserts it into the output table

This example runs inside a Conda environment. The environment can be automatically configured using the conda_env_py3.yml file found in the Python UDF API repository.

  1. In the same directory you cloned the API, change directory into the root folder of the Python UDF API repository:

    cd kinetica-udf-api-python/
    
  2. Create the Conda environment, replacing <environment name> with the desired name:

    conda env create --name <environment name> --file conda_env_py3.yml
    

    Note

    It may take a few minutes to create the environment.

  3. Verify the environment was created properly:

    conda info --envs
    
  4. Activate the new environment:

    conda activate <environment name>
    
  5. Install PyGDF:

    conda install -c numba -c conda-forge -c gpuopenanalytics/label/dev -c defaults pygdf=0.1.0a2
    
  6. Install the Kinetica Python API:

    pip install gpudb~=7.1.0
    
  7. Add the Python UDF API repo's root directory to the PYTHONPATH:

    export PYTHONPATH=$(pwd):$PYTHONPATH
    
  8. Edit the util/test_environment.py script for the correct database url, user, and password for your Kinetica instance:

    URL = 'https://abcdefg.cloud.kinetica.com/hijklmn/gpudb-0;CombinePrepareAndExecute=1;RowsPerFetch=20000'
    USER = 'kadmin'
    PASSWORD = 'kadmin123'
    

UDF Deployment

  1. Change directory into the UDF Pandas directory:

    cd examples/UDF_pandas
    
  2. Run the UDF initialization script:

    python setup_db.py
    
  3. Run the execute script for the training UDF:

    python register_execute_UDF.py --url <kinetica-url> --username <kinetica-user> --password <kinetica-pass>
    
  4. Verify the results in the unittest_df_output output table in Workbench.

Execution Detail

This example details using a distributed UDF to create and ingest a pandas dataframe into Kinetica. The df_to_output_UDF proc creates the dataframe and inserts it into the output table, unittest_df_output.

The dataframe has a shape of (3, 3) and will get inserted into the output table n number of times, where n is equal to the number of processing nodes available in each processing node container registered in Kinetica.

The output table contains 3 columns:

  • id -- an integer column
  • value_long -- a long column
  • value_float -- a float column

Database Setup

The setup script, setup_db.py, which creates the output table for the UDF, imports the test_environment.py script to access its methods:

1
from util import test_environment as te

The script calls two methods from test_environment.py: one to create the schema used to contain the example tables and one to create the output tables where the values for the output table's name and type are passed in:

1
2
te.create_schema()
te.create_test_output_table(te.TEST_OUTPUT_TABLE_NAME, te.TEST_DATA_TYPE_1)

The methods in test_environment.py require a connection to Kinetica. This is done by instantiating an object of the GPUdb class with a provided connection URL. See Connecting via API for details on the URL format and how to look it up.

1
2
3
4
URL = 'http://localhost:9191'
USER = 'admin'
PASSWORD = 'admin123'
DB_HANDLE = gpudb.GPUdb(host=[URL], username=USER, password=PASSWORD)

The create_schema() method creates the schema that will contain the table used in the example:

1
SCHEMA = 'example_udf_python'
1
DB_HANDLE.create_schema(SCHEMA, options={'no_error_if_exists': 'true'})

The create_test_output_table() method creates the type and table for the output table, but the table is removed first if it already exists:

1
2
3
4
5
6
TEST_DATA_TABLE_NAME_1 = SCHEMA + '.unittest_toy_data'
TEST_DATA_TYPE_1 = [
    ['id', gpudb.GPUdbRecordColumn._ColumnType.INT],
    ['value_long', gpudb.GPUdbRecordColumn._ColumnType.LONG],
    ['value_float', gpudb.GPUdbRecordColumn._ColumnType.FLOAT]
]
1
2
3
if DB_HANDLE.has_table(table_name=table_name)['table_exists']:
    DB_HANDLE.clear_table(table_name)
test_data_table = gpudb.GPUdbTable(table_type, table_name, db=DB_HANDLE)

UDF (df_to_output_UDF.py)

First, packages are imported to access the Kinetica Python UDF API and pandas:

1
2
from kinetica_proc import ProcData
import pandas as pd

Next, the file gets a handle to the ProcData() class:

1
proc_data = ProcData()

To output the number of values found on each processing node and processing node container, the rank and tom number in the request info map pointing to the current instance of the UDF are mapped and displayed:

1
2
3
rank_number = proc_data.request_info['rank_number']
tom_number = proc_data.request_info['tom_number']
print('\nUDF pandas proc output test r{}_t{}: instantiated.'.format(rank_number, tom_number))

The dataframe is created:

1
2
data = {'id': pd.Series([1, 12, 123]), 'value_long': pd.Series([2, 23, 24]), 'value_float': pd.Series([0.2, 2.3, 2.34])}
df = pd.DataFrame(data)

Get a handle to the output table from the proc (unittest_df_output). Its size is expanded to match the shape of the dataframe; this will allocated enough memory to copy all records in the dataframe to the output table. Then the dataframe is assigned to the output table:

1
2
3
output_table = proc_data.output_data[0]
output_table.size = df.shape[0]
proc_data.from_df(df, output_table)

UDF Registration (register_execute_UDF.py)

To interact with Kinetica, an object of the GPUdb class is instantiated while providing the connection URL of the database server.

1
main(gpudb.GPUdb(host=[args.url], username=args.username, password=args.password))

To upload the df_to_output_UDF.py and kinetica_proc.py files to Kinetica, they will first need to be read in as bytes and added to a file data map:

1
2
3
4
5
6
file_paths = ["df_to_output_UDF.py", "../../kinetica_proc.py"]
files = {}
for script_path in file_paths:
    script_name = os.path.basename(script_path)
    with open(script_path, 'rb') as f:
        files[script_name] = f.read()

After the files are placed in a data map, the distributed Pandas_df_output proc is created in Kinetica and the files are associated with it:

1
2
3
4
proc_name = 'Pandas_df_output'
print("Registering proc...")
response = db_handle.create_proc(proc_name, 'distributed', files, 'python', [file_paths[0]], {})
print(response)

Note

The proc requires the proper command and args to be executed. In this case, the assembled command line would be:

python df_to_output_UDF.py

Finally, after the proc is created, it is executed. The output table created in the Database Setup section is passed in here:

1
2
3
print("Executing proc...")
response = db_handle.execute_proc(proc_name, {}, {}, [], {}, [te.TEST_OUTPUT_TABLE_NAME], {})
print(response)