The following is a complete example, using the Python UDF API, of a non-CUDA
UDF that demonstrates how to create pandas dataframes and insert them into
tables in Kinetica. This example (and
others) can be found in the Python UDF API
repository; this repository comes with Kinetica by default (located in
/opt/gpudb/udf/api/python/
) or can be downloaded/cloned from
GitHub.
The general prerequisites for using UDFs in Kinetica can be found on the User-Defined Function Implementation page.
Important
This example cannot run on Mac OSX
The following items are also necessary:
Note
Visit the Conda website to download the Miniconda installer for Python 3.
This example requires local access to the Python UDF API repository. In the
desired directory, run the following but be sure to replace
<kinetica-version>
with the name of the installed Kinetica version, e.g,
v6.2.0
:
git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-udf-api-python.git
There are four files associated with the pandas UDF example, all of which can be found in the Python UDF API repo.
test_environment.py
) that is called from the
initialization scriptsetup_db.py
) that creates the output tableregister_execute_UDF.py
) to register and execute the UDFdf_to_output_UDF.py
) that creates a pandas dataframe and inserts
it into the output tableThis example runs inside a Conda environment. The environment can be
automatically configured using the conda_env_py3.yml
file found in the
Python UDF API repository.
In the same directory you cloned the API, change directory into the root folder of the Python UDF API repository:
cd kinetica-udf-api-python/
Create the Conda environment, replacing <environment name>
with the
desired name:
conda env create --name <environment name> --file conda_env_py3.yml
Note
It may take a few minutes to create the environment.
Verify the environment was created properly:
conda info --envs
Activate the new environment:
source activate <environment name>
Install PyGDF:
conda install -c numba -c conda-forge -c gpuopenanalytics/label/dev -c defaults pygdf=0.1.0a2
Install the Kinetica Python API where n is the desired version of Kinetica from PyPI:
pip install gpudb==6.2.0.n
Add the Python UDF API repo's root directory to the PYTHONPATH:
export PYTHONPATH=$(pwd):$PYTHONPATH
Edit the util/test_environment.py
script for the correct host, port,
user, and password for your Kinetica instance:
HOST = 'http://localhost'
PORT = '9191'
USER = 'testuser'
PASSWORD = 'Testuser123!'
Change directory into the UDF Pandas directory:
cd examples/UDF_pandas
Run the UDF initialization script:
python setup_db.py
Update the host and port in register_execute_UDF.py
as necessary:
if __name__ == "__main__":
main(gpudb.GPUdb(encoding='BINARY', host='127.0.0.1', port='9191'))
Run the execute script for the training UDF:
python register_execute_UDF.py
Verify the results in the /opt/gpudb/core/logs/gpudb-proc.log
file on the
head node and/or in unittest_df_output
output table in
GAdmin.
This example details using a distributed UDF to create and ingest a pandas
dataframe into Kinetica. The df_to_output_UDF
proc creates the dataframe
and inserts it into the output table, unittest_df_output
.
The dataframe has a shape of (3, 3)
and will get inserted into the
output table n number of times, where n is equal to the number of
processing nodes available in each processing node container registered in
Kinetica.
The output table contains 3 columns:
id
-- an integer columnvalue_long
-- a long columnvalue_float
-- a float columnThe setup script, setup_db.py
, which creates the output table for the UDF,
imports the test_environment.py
script to access its methods:
from util import test_environment as te
The script calls one method from test_environment.py
, passing in the values
for the output table's name and schema:
te.create_test_output_table(te.TEST_OUTPUT_TABLE_NAME, te.TEST_DATA_TYPE_1)
The methods in test_environment.py
require a connection to Kinetica.
This is done by instantiating an object of the GPUdb
class with a
provided connection URL (host and port):
HOST = 'http://localhost'
PORT = '9191'
USER = 'testuser'
PASSWORD = 'Testuser123!'
DB_HANDLE = gpudb.GPUdb(encoding='BINARY', host=HOST, port=PORT)
The create_test_output_table()
method creates the type and table for the
output table, but the table is removed first if it already exists:
TEST_DATA_TABLE_NAME_1 = 'unittest_toy_data'
TEST_DATA_TYPE_1 = [
['id', gpudb.GPUdbRecordColumn._ColumnType.INT],
['value_long', gpudb.GPUdbRecordColumn._ColumnType.LONG],
['value_float', gpudb.GPUdbRecordColumn._ColumnType.FLOAT]
]
def create_test_output_table(table_name, table_type):
if DB_HANDLE.has_table(table_name=table_name)['table_exists']:
DB_HANDLE.clear_table(table_name)
test_data_table = gpudb.GPUdbTable(table_type, table_name, db=DB_HANDLE)
First, packages are imported to access the Kinetica Python UDF API and pandas:
from kinetica_proc import ProcData
import pandas as pd
Next, the file gets a handle to the ProcData()
class:
proc_data = ProcData()
To output the number of values found on each processing node and processing node container, the rank and tom number in the request info map pointing to the current instance of the UDF are mapped and displayed:
rank_number = proc_data.request_info['rank_number']
tom_number = proc_data.request_info['tom_number']
print('\nUDF pandas proc output test r{}_t{}: instantiated.'.format(rank_number, tom_number))
The dataframe is created:
data = {'id': pd.Series([1, 12, 123]), 'value_long': pd.Series([2, 23, 24]), 'value_float': pd.Series([0.2, 2.3, 2.34])}
df = pd.DataFrame(data)
Get a handle to the output table from the proc
(unittest_df_output
). Its size is expanded to match the shape of the
dataframe; this will allocated enough memory to copy all records in the
dataframe to the output table. Then the dataframe is assigned to the output
table:
output_table = proc_data.output_data[0]
output_table.size = df.shape[0]
proc_data.from_df(df, output_table)
To interact with Kinetica, an object of the GPUdb
class is instantiated
while providing the connection URL, including the host and port of the database
server. Ensure the host address and port are correct for your setup:
if __name__ == "__main__":
main(gpudb.GPUdb(encoding='BINARY', host='127.0.0.1', port='9191'))
To upload the df_to_output_UDF.py
and kinetica_proc.py
files to
Kinetica, they will first need to be read in as bytes and added to a file data
map:
file_paths = ["df_to_output_UDF.py", "../../kinetica_proc.py"]
files = {}
for script_path in file_paths:
script_name = os.path.basename(script_path)
with open(script_path, 'rb') as f:
files[script_name] = f.read()
After the files are placed in a data map, the distributed Pandas_df_output
proc is created in Kinetica and the files are associated with it:
proc_name = 'Pandas_df_output'
response = db_handle.create_proc(proc_name, 'distributed', files, 'python', [file_paths[0]], {})
print(response)
Note
The proc requires the proper command
and args
to be executed. In
this case, the assembled command line would be:
python df_to_output_UDF.py
Finally, after the proc is created, it is executed. The output table created in the Database Setup section is passed in here:
response = db_handle.execute_proc(proc_name, {}, {}, [], {}, [te.TEST_OUTPUT_TABLE_NAME], {})
print(response)