Python UDF API

The information below includes all the information one needs to know to begin writing UDFs using the Python UDF API. For more information on running Python UDFs, see Running Python UDFs.

Dependencies

To begin writing Python UDFs, access to the Kinetica Python UDF API is required.

The API can be downloaded from the Python UDF API repo on GitHub. After downloading, see the README.md in the UDF API directory created for further setup instructions.

Instructions for installing & configuring the API can be found under UDF API Installation below.

The Python UDF API consists of one file, kinetica_proc.py. This needs to be included in the UDF source code and added to the PYTHONPATH. There are no external dependencies beyond the Python standard library.

To take advantage of GPU processing within a UDF, the CUDA Toolkit must be downloaded & installed from the NVIDIA Developer Zone.

Also, any Python packages the UDF may need to use should be installed on the development platform and be made available within the Kinetica cluster's UDF environment using function environments.

UDF API Installation

The Kinetica Python UDF API is accessible via GitHub.

  1. In the desired directory, run the following to download the Kinetica Python UDF API repository:

    git clone -b release/v7.2 --single-branch https://github.com/kineticadb/kinetica-udf-api-python.git
    
  2. Add the Python UDF API directory to the PYTHONPATH:

    export PYTHONPATH=$PYTHONPATH:$(cd kinetica-udf-api-python;pwd)
    

Python UDF Function Environments

Python UDF function environments provide a means for deploying Python packages needed by UDFs to the Kinetica cluster.

Each UDF executes in a default function environment running Python 3.10 with a set of Python packages pre-installed. Any user-created function environment will have that default set as well as any added by the user.

Function environments can be managed in SQL, the native APIs, and in Workbench.

Installing Function Environment Libraries

To install Python packages within the Kinetica UDF environment, create a function environment and install the packages within it.

SQL
1
2
3
4
CREATE FUNCTION ENVIRONMENT udfe_st;

ALTER FUNCTION ENVIRONMENT udfe_st
INSTALL PACKAGE 'urllib3==1.26.18 sentence-transformers'
Python
1
2
3
4
5
6
7
8
9
PROC_ENV_NAME = 'udfe_st'

kinetica.create_environment(environment_name = PROC_ENV_NAME)

kinetica.alter_environment(
    environment_name = PROC_ENV_NAME, 
    action = "install_package", 
    value = "urllib3==1.26.18 sentence-transformers"
)

Afterwards, when creating the UDF, assign the environment to the UDF.

SQL
1
2
3
4
5
6
CREATE FUNCTION udf_st
MODE = 'distributed'
RUN_COMMAND = 'python'
RUN_COMMAND_ARGS = 'udf/udf_st.py'
FILE PATHS 'kifs://udf/udf_st.py'
WITH OPTIONS (SET_ENVIRONMENT = 'udfe_st')
Python
1
2
3
4
5
6
7
8
response = kinetica.create_proc(
    proc_name = PROC_NAME,
    execution_mode = "distributed",
    files = file_map,
    command = "python",
    args = [PROC_FILE_NAME],
    options = {"set_environment": PROC_ENV_NAME}
)

Uninstalling Function Environment Libraries

To uninstall Python packages from a given function environment:

SQL
1
2
ALTER FUNCTION ENVIRONMENT udfe_st
UNINSTALL PACKAGE 'sentence-transformers urllib3'
Python
1
2
3
4
5
6
7
PROC_ENV_NAME = 'udfe_st'

kinetica.alter_environment(
    environment_name = PROC_ENV_NAME,
    action = "uninstall_package",
    value = "urllib3 sentence-transformers"
)

To delete a function environment:

SQL
1
DROP FUNCTION ENVIRONMENT udfe_st
Python
1
kinetica.drop_environment(environment_name = PROC_ENV_NAME, options = {"no_error_if_not_exists": "true"})

Note

Be sure to delete all UDFs that use a function environment before deleting it.

Pre-installed Libraries on the Cluster

Each cluster node comes pre-installed with the latest Kinetica APIs:

  • gpudb - Kinetica Python API
  • kinetica-proc - Kinetica Python UDF API

The following 3rd-party libraries are pre-installed on Kinetica cluster nodes, for use by UDFs:

LibraryVersion
bcrypt3.2.0
certifi2022.5.18
cffi1.14.5
charset_normalizer2.0.12
Cython0.29.32
future0.18.3
idna3.3
joblib1.1.0
numpy1.25.2
pandas1.2.4
Pillow9.1.1
pycparser2.20
python-dateutil2.8.1
python-snappy0.6.1
pytz2021.1
pyzmq25.1.2
requests2.27.1
scikit_learn1.0.2
scipy1.7.3
six1.16.0
threadpoolctl3.1.0
urllib31.26.9

Initializing

A UDF must first get a handle to the ProcData class imported. This will parse the primary control file and set up all the necessary structures. It will return a ProcData instance, which is used to access data. All configuration information is cached, so repeated calls to get a handle to ProcData will not reload any configuration files.

Important

When you get a handle to ProcData, the handle is actually to the data given to that instance (OS process) of the UDF on the Kinetica host; therefore, there will be a ProcData handle for every instance of the UDF on your Kinetica host

Column Types

Unlike the other Kinetica APIs, the Python UDF API does not process data using records or schemas, operating in terms of columns of data instead. The raw column values returned closely map to the data types used in the tables being accessed:

Column CategoryKinetica TypePython UDF Type
Numericbooleanbool
int8int
int16int
intint
longint
floatfloat
doublefloat
decimaldecimal.Decimal
Stringstringstr
char[N]str
ipv4int
uuiduuid.UUID
wktstr
Date/Timedatedatetime.date
datetimedatetime.datetime
timedatetime.time
timestampint
Binarybytesbytes
wkbstr

Column data values can be accessed through array indices:

column[i]

For example, to retrieve the value for the 10th record:

column[9]

Reading Input Data

Accessing the request information for the UDF, the parameters passed into the UDF, or the input data can be completed using the following calls:

CallTypeDescription
proc_data.input_dataObjectReturns an InputDataSet object for accessing input table data that was passed into the UDF
proc_data.request_infoMap of strings to strings

Returns a map of basic information about the execute_proc() request, map values being accessed using:

proc_data.request_info[<map_key>]
proc_data.paramsMap of strings to stringsReturns a map of string-valued parameters that were passed into the UDF
proc_data.bin_paramsMap of strings to bytesReturns a map of binary-valued parameters that were passed into the UDF

Accessing Input Values

The InputDataSet object returned from proc_data.input_data contains the InputTable object, which in turn contains InputColumn, holding the actual data set. Tables and columns can be accessed by index or by name. For example, given a customer table at InputDataSet index 5 and a name column at that InputTable's index 1, either of the following calls will retrieve the column values associated with customer.name:

proc_data.input_data["customer"]["name"]
proc_data.input_data[5][1]

Request Info Keys

The request info keys are returned from calling proc_data.request_info. These keys include a variety of details about the executing UDF from the request information map made available to each running UDF.

General Information

Map KeyDescription
proc_nameThe name of the UDF being executed.
run_idThe run ID of the UDF being executed.
rank_numberThe processing node container number on which the current UDF instance is executing. For distributed UDFs, [1..n]; for non-distributed UDFs, 0.
tom_numberThe processing node number within the processing node container on which the current UDF instance is executing. For distributed UDFs, [0..n-1], where n is the number of processing nodes per processing node container. For non-distributed UDFs it is not provided, since these do not run on a processing node.
<option_name>Any options passed in the options map in the /execute/proc request will also be in the request info map.

CUDA Information

When executing UDFs that utilize CUDA, additional request information is returned.

Map KeyDescription
cuda_devicesThe number of CUDA devices currently in use.
cuda_freeThe amount of CUDA memory available.

Data Segment Information

Data is passed into UDFs in segments. Each segment consists of the entirety of the data on a single TOM and is processed by the UDF instance executing on that TOM. Thus, there is a 1-to-1 mapping of data segment and executing UDF instance, though this relationship may change in the future.

Running the same UDF multiple times should result in the same set of segments, assuming the same environment and system state across runs.

Map KeyDescription
data_segment_idA unique identifier for the segment of the currently executing UDF instance.
data_segment_countThe total cluster-wide count of data segments for distributed UDFs; for non-distributed UDFs, 1.
data_segment_numberThe number of the current data segment or executing UDF instance [0..data_segment_count-1].

Kinetica API Connection Parameters

These can be used to connect back to Kinetica using the regular API endpoint calls. Use with caution in distributed UDFs, particularly in large clusters, to avoid overwhelming the head node. Also note, multi-head ingest may not work from a UDF in some cases without overriding the worker URLs to use internal IP addresses.

Map KeyDescription
head_urlThe URL to connect to.
usernameRandomly generated temporary username used to execute the UDF.
passwordRandomly generated temporary password used to execute the UDF.

Important

Since username and password are randomly-generated temporary credentials, for security reasons, they should not be printed or output to logs.

Writing Output Data

To output data to a table, the size of the table must be set in order to allocate enough space in all of the columns to hold the correct number of values. To do this, call:

table.size = <total number of output records>

Table column values can then be assigned to each OutputColumn of each OutputTable in the OutputDataSet.

The following calls are available to assist with writing data to Kinetica:

CallTypeDescription
proc_data.output_dataObjectReturns an OutputDataSet object for writing output table data that will be written to the database
proc_data.resultsMap of strings to stringsReturns a map that can be populated with string-valued results to be returned from the UDF
proc_data.bin_resultsMap of strings to bytesReturns a map that can be populated with binary-valued results to be returned from the UDF

Setting Output Values

The OutputDataSet object returned from proc_data.output_data contains the OutputTable object, which in turn contains OutputColumn, holding the actual data set. Tables and columns are accessed the same way as InputDataSet, by name, index, or a combination of the two:

proc_data.output_data["customer"]["name"]
proc_data.output_data[5][1]

To assign fixed-width output values:

proc_data.output_data["customer"][1][2] = 12.34

To assign variable-width output values:

proc_data.output_data[5]["name"][4].append("Joe")

Status Reporting

The proc_data.status property can be set to a string value to help convey status information during UDF execution, e.g., proc_data.status="25% complete". The show_proc_status() function will return any status messages set for each data segment -- one data segment per processing node if in distributed mode, or one data segment total in non-distributed mode. These messages are subject to the following scenarios:

  • If the user-provided UDF code is executing and has set a status message, show_proc_status() will return the last message that was set.

  • If the user-provided UDF code finishes executing successfully, the status message is cleared.

    Note

    The UDF may not show as "complete" yet since any data written by the UDF (in distributed mode) still has to be inserted into the database, but the status set by the UDF code isn't relevant to this process

  • If the UDF is killed while executing user-provided UDF code, show_proc_status() will return the last message that was set.

  • If the user-provided UDF code errors out, show_proc_status() will return the error message and the last status message that was set in parentheses.

Complete

The UDF must finish with a call to proc_data.complete. This writes out some final control information to indicate that the UDF completed successfully.

Note

If this call is not made, the database will assume that the UDF didn’t finish and will return an error to the caller.

Logging

Any output from the UDF to is written to AWS CloudWatch.