Version:

Example UDF (Non-CUDA) - Sum of Squares

The following is a complete example, using the Python API, of a non-CUDA UDF that takes a list of input tables and corresponding output tables (must be the same number) and, for each record of each input table, sums the squares of all input table columns and saves the result to the first column of the corresponding output table record; i.e.:

in.a2 + in.b2 + ... + in.n2 -> out.a

This setup assumes the UDF is being developed on the Kinetica host (or head node host, if a multi-node Kinetica cluster); and that the Python database API is available at /opt/gpudb/api/python and the Python UDF API is available at /opt/gpudb/udf/api/python.

This example will contain the following Python scripts:

Note

All commands should be run as the gpudb user.

The example can be run as follows:

$ /opt/gpudb/core/bin/gpudb_env.sh python udf_sos_init.py
$ /opt/gpudb/core/bin/gpudb_env.sh python udf_sos_exec.py

udf_sos_init.py

import sys
import collections
import gpudb

KINETICA_HOST = '127.0.0.1'
KINETICA_PORT = '9191'
INPUT_TABLE = 'udf_sos_in_table'
OUTPUT_TABLE = 'udf_sos_out_table'


## Connect to Kinetica
h_db = gpudb.GPUdb(encoding = 'BINARY', host = KINETICA_HOST, port = KINETICA_PORT)


## Create input data table
input_type = """
{
   "type": "record",
   "name": "input_type",
   "fields": [
      {"name":"x1","type":"float"},
      {"name":"x2","type":"float"}
   ]
}  """.replace(' ','').replace('\n','')

type_id = h_db.create_type(type_definition = input_type, label = INPUT_TABLE + '_lbl', properties = {})['type_id']
if h_db.has_table(table_name = INPUT_TABLE)['table_exists']:
   h_db.clear_table(table_name = INPUT_TABLE)
h_db.create_table(table_name = INPUT_TABLE, type_id = type_id)


## Insert input data
import random

encoded_obj_list = []
for val in range(10000):
   datum = collections.OrderedDict()
   datum["x1"] = random.gauss(1,1)
   datum["x2"] = random.gauss(1,2)
   encoded_obj_list.append(h_db.encode_datum(input_type, datum))
h_db.insert_records(table_name = INPUT_TABLE, data = encoded_obj_list, list_encoding = 'binary', options = {})


## Create output data table
output_type = """
{
   "type": "record",
   "name": "out_type",
   "fields": [
      {"name":"y","type":"float"}
   ]
}  """.replace(' ','').replace('\n','')

type_id = h_db.create_type(type_definition = output_type, label = OUTPUT_TABLE + '_lbl', properties = {})['type_id']
if h_db.has_table(table_name = OUTPUT_TABLE)['table_exists']:
   h_db.clear_table(table_name = OUTPUT_TABLE)
h_db.create_table(table_name = OUTPUT_TABLE, type_id = type_id)

udf_sos_proc.py

################################################################################
#                                                                              #
# Kinetica UDF Sum of Squares Example UDF                                      #
# ---------------------------------------------------------------------------- #
# This UDF takes pairs of input & output tables, computing the sum of the      #
# squares of all the columns for each input table and saving the resulting     #
# sums to the first column of the corresponding output table.                  #
#                                                                              #
################################################################################
import sys
import math
from kinetica_proc import ProcData


proc_data = ProcData()

# For each pair of input & output tables, calculate the sum of squares of input
#    columns and save results to first output table column
for in_table, out_table in zip(proc_data.input_data, proc_data.output_data):

   # Extend the output table by the number of record entries in the input table
   out_table.size = in_table.size

   # Use the first column in the output table as the output column
   y = out_table[0];

   # Loop through all the input table columns
   for in_column in in_table:
      # For every value in the column...
      for i in xrange(0, in_table.size):
         # Add the square of that value to the corresponding output column
         y[i] += in_column[i] ** 2

proc_data.complete()

udf_sos_exec.py

import sys
import gpudb

KINETICA_HOST = '127.0.0.1'
KINETICA_PORT = '9191'
INPUT_TABLE = 'udf_sos_in_table'
OUTPUT_TABLE = 'udf_sos_out_table'
proc_name = 'udf_sos_proc'
file_name = proc_name + '.py'


# Read proc code in as bytes and add to a file data array
files = {}
with open(file_name, 'rb') as file:
    files[file_name] = file.read()


# Connect to Kinetica
h_db = gpudb.GPUdb(encoding = 'BINARY', host = KINETICA_HOST, port = KINETICA_PORT)


# Remove proc if it exists from a prior registration
if h_db.has_proc(proc_name)['proc_exists']:
    h_db.delete_proc(proc_name)


print "Registering proc..."
response = h_db.create_proc(proc_name, 'distributed', files, 'python', [file_name], {})
print response

print "Executing proc..."
response = h_db.execute_proc(proc_name, {}, {}, [INPUT_TABLE], {}, [OUTPUT_TABLE], {})
print response