Python UDF Guide

Step-by-step guide to creating UDFs with the Python API

The following guide provides step-by-step instructions to get started writing and running UDFs in Python. This particular example is a simple distributed UDF that copies data from one table to another using a CSV configuration file to determine from which processing node to copy data. Note that only copying data from some processing nodes typically would not have "real" applications and this exercise is purely to demonstrate the many facets of the UDF API.

References

Prerequisites

To set up this tutorial, the following prerequisites must be met:

UDF Tutorial Installation

This tutorial requires local access to the Python UDF tutorial repository hosted on GitHub. The Python UDF API & tutorial can be downloaded as follows.

In the desired directory, run the following but be sure to replace <kinetica-version> with the name of the installed Kinetica version, e.g., v7.1:

1
git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-tutorial-python-udf-api.git

Data Files

There are four files associated with the Python UDF tutorial. Each file can be found in the Python Tutorial Git Repo, which is cloned in the previous section.

  • An initialization script (written in Python) that creates the schema and input & output tables.
  • A UDF (written using the Python UDF API) that contains a table copying example.
  • An execute script (written in Python) that creates the proc and executes it.
  • A CSV input file

Development

Refer to the Python UDF API Reference page to begin writing your own UDF(s), or use the UDF already provided with the Python UDF tutorial repository. The steps below outline using the UDF Simulator with the UDF included with the Python UDF tutorial repository.

  1. Ensure that the Python UDF API directory is in the PYTHONPATH.

  2. Change directory into the newly downloaded Python UDF tutorial repository:

    1
    
    cd kinetica-tutorial-python-udf-api/
    
  3. Run the UDF initialization script, specifying the database URL and username & password:

    1
    
    $PYTHON_CMD udf_tc_py_init.py --url <kinetica-url> --username <kinetica-user> --password <kinetica-pass>
    
  4. In the native Python API directory, run the UDF simulator in execute mode with the following options to simulate running the UDF, where -i is the schema-qualified UDF input table, -o is the schema-qualified UDF output table, and -K is the Kinetica URL (using the appropriate values for your environment).

    1
    2
    3
    4
    
    $PYTHON_CMD ../kinetica-api-python/examples/udfsim.py execute -d \
        -i [<schema>.]<input-table> -o [<schema>.]<output-table> \
        -K http://<db.host>:9191 \
        -U <kinetica user> -P <kinetica pass>
    

    For instance:

    1
    2
    3
    4
    
    $PYTHON_CMD ../kinetica-api-python/examples/udfsim.py execute -d \
        -i tutorial_udf_python.udf_tc_py_in_table -o tutorial_udf_python.udf_tc_py_out_table \
        -K http://localhost:9191 \
        -U admin -P admin123
    
  5. Copy & execute the export command output by the previous command; this will prepare the execution environment for simulating the UDF:

    1
    
    export KINETICA_PCF=/tmp/udf-sim-control-files/kinetica-udf-sim-icf-xMGW32
    

    Important

    The export command shown above is an example of what the udfsim.py script will output--it should not be copied to the terminal in which this example is being run. Make sure to copy & execute the actual command output by udfsim.py in the previous step.

  6. Run the UDF:

    1
    
    $PYTHON_CMD udf_tc_py_proc.py
    
  7. Output the results to Kinetica (use the dry run flag -d to avoid writing to Kinetica), ensuring you replace the Kinetica URL with the appropriate values. The results map will be returned (even if there's nothing in it) as well as the amount of records that were (or will be in the case of a dry run) added to the given output table:

    1
    2
    3
    
    $PYTHON_CMD ../kinetica-api-python/examples/udfsim.py output \
        -K http://<db.host>:9191 \
        -U <kinetica user> -P <kinetica pass>
    

    For instance:

    1
    2
    3
    
    $PYTHON_CMD ../kinetica-api-python/examples/udfsim.py output \
        -K http://localhost:9191 \
        -U admin -P admin123
    

    This should output the following:

    No results
    Output:
    
    tutorial_udf_python.udf_tc_py_out_table: 10000 records
    
  8. Clean the control files output by the UDF simulator:

    1
    
    $PYTHON_CMD ../kinetica-api-python/examples/udfsim.py clean
    

    Important

    The clean command is only necessary if data was output to Kinetica; otherwise, the UDF simulator can be re-run as many times as desired without having to clean the output files and enter another export command.

Deployment

The UDF can be created and executed using the official UDF endpoints: /create/proc and /execute/proc (respectively).

  1. Set the Python command environment variable:

    1
    
    export PYTHON_CMD=python3
    
  2. Optionally, run the UDF init script to reset the example tables:

    1
    
    $PYTHON_CMD udf_tc_py_init.py --url <kinetica-url> --username <kinetica-user> --password <kinetica-pass>
    
  3. Run the UDF execute script:

    1
    
    $PYTHON_CMD udf_tc_py_exec.py --url <kinetica-url> --username <kinetica-user> --password <kinetica-pass>
    

Execution Detail

As mentioned previously, this section details a simple distributed UDF that copies data from one table to another. While the table copy UDF can run against multiple tables, the example run will use a single schema-qualified table, tutorial_udf_python.udf_tc_py_in_table, as input and a similar schema-qualified table, tutorial_udf_python.udf_tc_py_out_table, for output.

The input table will contain one int16 column (id) and two float columns (x and y). The id column will be an ordered integer field, with the first row containing 1, the second row containing 2, etc. Both float columns will contain 10,000 pairs of randomly-generated numbers:

+------+-----------+-----------+
| id   | x         | y         |
+======+===========+===========+
| 1    | 2.57434   | -3.357401 |
+------+-----------+-----------+
| 2    | 0.0996761 | 5.375546  |
+------+-----------+-----------+
| ...  | ...       | ...       |
+------+-----------+-----------+

The output table will also contain one int16 column (id) and two float columns (a and b). No data is inserted:

+------+-----------+-----------+
| id   | a         | b         |
+======+===========+===========+
|      |           |           |
+------+-----------+-----------+

The UDF will first read from a given CSV file to determine from which processing node container and processing node to copy data:

1
2
3
rank_num,tom_num
1,0
2,0

The tom_num column values refer to processing nodes that contains some of the many shards of data inside the database. The rank_num column values refer to processing node containers that hold some of the processing nodes for the database. For example, the given CSV file determines that the data from tutorial_udf_python.udf_tc_py_in_table on processing node container 1, processing node 0 and processing node container 2, processing node 0 will be copied to tutorial_udf_python.udf_tc_py_out_table.

Once the UDF is executed, a UDF instance (OS process) is spun up for each processing node to execute the given code against its assigned processing node. The UDF then determines if the processing node container/processing node pair it's currently running on matches one of the pairs of values in the CSV file. If there is a match, the UDF will loop through the input tables, match the output tables' size to the input tables', and copy the appropriate data from the input tables to the output tables. If there isn't a match, the code will complete.

Initialization (udf_tc_py_init.py)

To interact with Kinetica, you must first instantiate an object of the GPUdb class while providing the connection URL of the database server.

1
kinetica = gpudb.GPUdb(host=[args.url], username=args.username, password=args.password)

The schema is created if it doesn't already exist:

1
kinetica.create_schema(SCHEMA, options=OPTION_NO_CREATE_ERROR)

The input table is created.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
columns = [
    ["id", "int", "int16", "primary_key"],
    ["x", "float"],
    ["y", "float"]
]

if kinetica.has_table(table_name=INPUT_TABLE)['table_exists']:
    kinetica.clear_table(table_name=INPUT_TABLE)

input_table_obj = gpudb.GPUdbTable(
    _type = columns,
    name = INPUT_TABLE,
    db = kinetica
)

Next, sample data is generated and inserted into the new input table:

1
2
3
4
records = []
for val in range(1, MAX_RECORDS+1):
    records.append([val, random.gauss(1, 1), random.gauss(1, 2)])
input_table_obj.insert_records(records)

Lastly, an output table is created with a schema that is similar to the input table.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
columns = [
    ["id", "int", "int16", "primary_key"],
    ["a", "float"],
    ["b", "float"]
]

if kinetica.has_table(table_name=OUTPUT_TABLE)['table_exists']:
    kinetica.clear_table(table_name=OUTPUT_TABLE)

output_table_obj = gpudb.GPUdbTable(
    _type = columns,
    name = OUTPUT_TABLE,
    db = kinetica
)

UDF (udf_tc_py_proc.py)

First, the csv package is imported to access local CSV files:

1
import csv

Next, the file gets a handle to the ProcData() class:

1
proc_data = ProcData()

Then, the CSV file mentioned in Data Files is read (skipping the header). For each row in the file, set each row's first value as rank_num; set each row's second value as tom_num.

1
2
3
4
5
rank_tom_info = csv.reader(open("rank_tom.csv"))
rank_tom_info.next()
for row in rank_tom_info:
    rank_num = row[0]
    tom_num = row[1]

Determine if the rank and tom number found in the request info map pointing to the current instance of the UDF matches the values in the CSV file:

1
2
if (proc_data.request_info["rank_number"] == rank_num and
        proc_data.request_info["tom_number"] == tom_num):

For each input and output table found in the input_data and output_data objects (respectively), set the output tables' size to the input tables' size.

1
2
3
for in_table, out_table in zip(
        proc_data.input_data, proc_data.output_data):
    out_table.size = in_table.size

For each input column in the input table(s) and for each output column in the output table(s), copy the input columns' values to the output columns.

1
2
for in_column, out_column in zip(in_table, out_table):
    out_column.extend(in_column)

If no matches were found, finish processing.

1
2
else:
    print "No rank or tom matches"

Call complete() to tell Kinetica the proc code is finished.

1
2
# Inform Kinetica that the proc has finished successfully
proc_data.complete()

Execution (udf_tc_py_exec.py)

To interact with Kinetica, you must first instantiate an object of the GPUdb class while providing the connection URL of the database server.

1
kinetica = gpudb.GPUdb(host=[args.url], username=args.username, password=args.password)

To upload the udf_tc_py_proc.py and rank_tom.csv files to Kinetica, they will first need to be read in as bytes and added to a file data map:

1
2
3
4
5
file_names = (csv_file_name, proc_file_name)
files = {}
for file_name in file_names:
    with open(file_name, 'rb') as file:
        files[file_name] = file.read()

After the files are placed in a data map, the distributed udf_tc_py_proc proc can be created in Kinetica and the files associated with it. Note the proc requires the proper command and args to execute the proc, in this case, the assembled command line would be python udf_tc_py_proc.py:

1
2
3
4
5
6
7
8
response = kinetica.create_proc(
    proc_name = proc_name,
    execution_mode = "distributed",
    files = files,
    command = "python",
    args = [proc_file_name],
    options = {}
)

Finally, after the proc is created, it can be executed. The input table and output table created in Initialization (udf_tc_py_init.py) are passed in here.

1
2
3
4
5
6
7
8
9
response = kinetica.execute_proc(
    proc_name = proc_name,
    params = {},
    bin_params = {},
    input_table_names = [INPUT_TABLE],
    input_column_names = {},
    output_table_names = [OUTPUT_TABLE],
    options = {}
)