Running Python UDFs

The information below includes all information one needs to know to begin running Python UDFs. For more information on writing Python UDFs, see Python UDF API; for more information on simulating running UDFs, see UDF Simulator. Example Python UDFs can be found here.

Important

Though any of the native APIs or SQL can be used for deploying and executing UDFs written in any UDF API language, all the examples below are written using the native Python API for convenience.

Deployment

Calling the create_proc() method (CREATE FUNCTION in SQL) will deploy the specified UDF to the Kinetica execution environment, to every server in the cluster. The method takes the following parameters:

ParameterDescription
proc_nameA system-wide unique name for the UDF
execution_modeAn execution mode; either distributed or nondistributed
files

A set of files composing the UDF package, including the names of the files and the binary data for those files. The files specified will be created on the target Kinetica servers in the UDF directory, with the given data and filenames; if the filenames contain subdirectories, that structure will be copied to the target servers.

Files in KiFS can also be used here. To use a KiFS file, pass the KiFS URI as the name of the file and set the binary data portion to b''. In this case, the KiFS files will be copied into the UDF execution environment under their respective UDF directories for use when execute_proc() is called.

For example, a file uploaded to (and referenced in the files mapping as):

kifs://udf/file.ext

Will be made available to the UDF and need to be referenced in the args listing as:

udf/file.ext

Note

Uploading files using the files parameter should be reserved for smaller files; larger files should be uploaded to KiFS and referenced there instead.

commandThe name of the command to run, which can be a file within the deployed UDF fileset, or any command able to be executed within the host environment, e.g., python. If a host environment command is specified, the host environment must be properly configured to support that command's execution.
args

A list of command-line arguments to pass to the specified command; e.g., ./<file name>.py

Note

Any files referenced here that were uploaded to KiFS will need to be prefixed with their corresponding KiFS directory names; i.e., <kifs dir>/<file name>.<ext>

options

Optional parameters for UDF creation, including the function environment to use for the Python UDF.

See create_proc() for more details.


Creating a UDF - Direct File Passing

To deploy a Python UDF using the native Python API, a local proc file (udf_st.py) can be read in as bytes and then passed into the create_proc() call as a files map, with the key as the name of the file and the value as the byte array.

Create UDF Example - Filename Constants
1
2
PROC_NAME = 'udf_st'
PROC_FILE_NAME = PROC_NAME + '.py'
Create UDF Example - File Map Loading
1
2
3
file_map = {}
with open(PROC_FILE_NAME, 'rb') as file:
    file_map[PROC_FILE_NAME] = file.read()
Create UDF Example - create_proc() Call
1
2
3
4
5
6
7
8
response = kinetica.create_proc(
    proc_name = PROC_NAME,
    execution_mode = "distributed",
    files = file_map,
    command = "python",
    args = [PROC_FILE_NAME],
    options = {"set_environment": PROC_ENV_NAME}
)

Creating a UDF - Uploading to KiFS

To deploy a Python UDF using the native Python API, a local proc file (udf_st.py) can be uploaded to a KiFS directory and then referenced in the create_proc() call, with the key as the KiFS path to the file and the value as an empty byte array, b''.

Create UDF Example - KiFS File Uploading
1
2
3
4
5
6
7
from gpudb_file_handler import GPUdbFileHandler

PROC_NAME = 'udf_st'
PROC_FILE_NAME = PROC_NAME + '.py'
KIFS_DIR = 'udf'

GPUdbFileHandler(kinetica).upload_file(PROC_FILE_NAME, KIFS_DIR)
Create UDF Example - KiFS create_proc() Call
1
2
3
4
5
6
7
8
response = kinetica.create_proc(
    proc_name = PROC_NAME,
    execution_mode = "distributed",
    files = {f'kifs://{KIFS_DIR}/{PROC_FILE_NAME}': b''},
    command = "python",
    args = [f'{KIFS_DIR}/{PROC_FILE_NAME}'],
    options = {"set_environment": PROC_ENV_NAME}
)

Concurrency Limits

The max_concurrency_per_node setting is available in the options map of the /create/proc. This option allows you to define a per-Kinetica- host concurrency limit for a UDF, i.e. no more than n OS processes (UDF instances) in charge of evaluating the UDF will be permitted to execute concurrently on a single Kinetica host. You may want to set a concurrency limit if you have limited resources (like GPUs) and want to avoid the risks of continually exhausting your resources. This setting is particularly useful for distributed UDFs, but it will also work for non-distributed UDFs.

The default value for the setting is 0, which results in no limits. If you set the value to 4, only 4 instances of the UDF will be queued to execute the UDF. This holds true across all invocations of the proc; this means that even if /execute/proc is called eight times, only 4 processes will be running. Another instance will be queued as soon as one instance finishes processing. This process will repeat, only allowing 4 instances of the UDF to run at a time, until all instances have completed or the UDF is killed.

Execution

Calling the execute_proc() method (see Executing Functions in SQL) will execute the specified UDF within the targeted Kinetica execution environment. The method takes the following parameters:

ParameterDescription
proc_nameThe system-wide unique name for the UDF
paramsSet of string-to-string key/value paired parameters to pass to the UDF
bin_paramsSet of string-to-binary key/value paired parameters to pass to the UDF
input_table_namesInput data table names, to be processed by the UDF
input_column_namesMapping of input data table names to their respective column names, to be processed as input data by the UDF
output_table_namesOutput data table names, where processed data is to be appended
optionsOptional parameters for UDF execution; see execute_proc() for details

The call is asynchronous and will return immediately with a run_id, which is a string that can be used in subsequent checks of the execution status.

For example, to execute a UDF that's already been created (udf_st) using existing input (udf_st_in) and output (udf_st_out) tables:

Execute UDF Example - Parameter Constants
1
2
3
PROC_NAME = 'udf_st'
input_table = 'udf_st_in'
output_table = 'udf_st_out'
Execute UDF Example - execute_proc() Call
1
2
3
4
5
6
7
8
9
response = kinetica.execute_proc(
    proc_name = PROC_NAME,
    params = {'sentence': 'I live in a neighborhood in a small town.'},
    bin_params = {},
    input_table_names = [input_table],
    input_column_names = {},
    output_table_names = [output_table],
    options = {}
)

Management

UDFs can be managed using SQL or through one of the native API calls:

Native APISQL CommandDescription
delete_proc()DROP FUNCTIONRemoves the given UDF definition from the system; needs to be called before create_proc() when recreating a UDF
has_proc() Returns whether the given UDF exists
kill_proc() Terminates a running UDF (or UDFs)
show_proc()SHOW FUNCTIONReturns the parameter values used in creating the UDF
show_proc_status()SHOW FUNCTION STATUSReturns whether the UDF (or UDFs) is still running, has completed, or has exited with an error, along with any processed results