Version:

Example UDF (CUDA) - Sum of Squares

The following implements the sum-of-squares algorithm as a distributed UDF using the CUDA-based UDF C++ API. The initialization script will be run separately from the UDF itself, and the execution script will need to target the UDF C++ binary.

This example will take a list of input tables and corresponding output tables (must be the same number) and, for each record of each input table, sums the squares of all input table columns and saves the result to the first column of the corresponding output table record; i.e.:

in.a2 + in.b2 + ... + in.n2 -> out.a

This example will contain the following scripts:

Note

All commands should be run as the gpudb user.

The example can be run as follows:

$ /opt/gpudb/core/bin/gpudb_env.sh python udf_sos_init.py
$ make
$ /opt/gpudb/core/bin/gpudb_env.sh python udf_sos_exec.py

This setup assumes the UDF is being developed on the Kinetica host (or head node host, if a multi-node Kinetica cluster); and that the Python database API is available at /opt/gpudb/api/python and the UDF C++ API is available at /opt/gpudb/udf/api/cpp.

udf_sos_init.py

import sys
import collections
import gpudb

KINETICA_HOST = '127.0.0.1'
KINETICA_PORT = '9191'
INPUT_TABLE = 'udf_sos_in_table'
OUTPUT_TABLE = 'udf_sos_out_table'


## Connect to Kinetica
h_db = gpudb.GPUdb(encoding = 'BINARY', host = KINETICA_HOST, port = KINETICA_PORT)


## Create input data table
input_type = """
{
   "type": "record",
   "name": "input_type",
   "fields": [
      {"name":"x1","type":"float"},
      {"name":"x2","type":"float"}
   ]
}  """.replace(' ','').replace('\n','')

type_id = h_db.create_type(type_definition = input_type, label = INPUT_TABLE + '_lbl', properties = {})['type_id']
if h_db.has_table(table_name = INPUT_TABLE)['table_exists']:
   h_db.clear_table(table_name = INPUT_TABLE)
h_db.create_table(table_name = INPUT_TABLE, type_id = type_id)


## Insert input data
import random

encoded_obj_list = []
for val in range(10000):
   datum = collections.OrderedDict()
   datum["x1"] = random.gauss(1,1)
   datum["x2"] = random.gauss(1,2)
   encoded_obj_list.append(h_db.encode_datum(input_type, datum))
h_db.insert_records(table_name = INPUT_TABLE, data = encoded_obj_list, list_encoding = 'binary', options = {})


## Create output data table
output_type = """
{
   "type": "record",
   "name": "out_type",
   "fields": [
      {"name":"y","type":"float"}
   ]
}  """.replace(' ','').replace('\n','')

type_id = h_db.create_type(type_definition = output_type, label = OUTPUT_TABLE + '_lbl', properties = {})['type_id']
if h_db.has_table(table_name = OUTPUT_TABLE)['table_exists']:
   h_db.clear_table(table_name = OUTPUT_TABLE)
h_db.create_table(table_name = OUTPUT_TABLE, type_id = type_id)

udf_sos_proc.cu

#include "Proc.hpp"
#include <iostream>

// Customize as appropriate
#define CUDA_THREADS 256

inline void cudaCheck(cudaError_t err)
{
    if (err != cudaSuccess)
    {
        throw std::runtime_error(cudaGetErrorString(err));
    }
}

__global__
void sos(size_t recordCount, float* d_input, float* d_output)
{
    size_t i = blockIdx.x * blockDim.x + threadIdx.x;

    if (i < recordCount)
    {
        d_output[i] += d_input[i] * d_input[i];
    }
}

int main(int argc, char *argv[])
{
    try
    {
        kinetica::ProcData* procData = kinetica::ProcData::get();
        const kinetica::ProcData::InputDataSet& inputData = procData->getInputData();
        kinetica::ProcData::OutputDataSet& outputData = procData->getOutputData();

        // Loop through input/output table pairs

        for (size_t table = 0; table < inputData.getTableCount(); ++table)
        {
            const kinetica::ProcData::InputTable& inputTable = inputData[table];
            kinetica::ProcData::OutputTable& outputTable = outputData[table];
            kinetica::ProcData::OutputColumn& outputColumn = outputTable[0];

            size_t columnCount = inputTable.getColumnCount();
            size_t recordCount = inputTable.getSize();
            size_t cudaBlocks = recordCount / CUDA_THREADS + 1;

            // Set the output table size

            outputTable.setSize(recordCount);

            // Allocate input and output device vectors on the GPU

            float* d_input;
            cudaCheck(cudaMalloc(&d_input, recordCount * sizeof(float)));

            float* d_output;
            cudaCheck(cudaMalloc(&d_output, recordCount * sizeof(float)));

            // initialize output device vector values to 0

            cudaCheck(cudaMemset(d_output, 0, recordCount * sizeof(float)));

            // Copy input data from each input column to device vector and run
            //   the calculation; saving result to output device vector

            for (size_t column = 0; column < columnCount; ++column)
            {
                const kinetica::ProcData::InputColumn& inputColumn = inputTable[column];
                cudaCheck(cudaMemcpy(d_input, inputColumn.getData<float>(), recordCount * sizeof(float), cudaMemcpyHostToDevice));
                sos<<<cudaBlocks, CUDA_THREADS>>>(recordCount, d_input, d_output);
                cudaCheck(cudaPeekAtLastError());
            }
   
            // Copy output data from device vector to output column

            cudaCheck(cudaMemcpy(outputColumn.getData<float>(), d_output, recordCount * sizeof(float), cudaMemcpyDeviceToHost));

            // Free input and output device vectors

            cudaCheck(cudaFree(d_input));
            cudaCheck(cudaFree(d_output));
        }
        
        procData->complete();
    }   
    catch (const std::exception& ex)
    {
        std::cerr << ex.what() << std::endl;
        return 1;
    }

    return 0;
}

makefile

UDF_LIB=/opt/gpudb/udf/api/cpp/kinetica
TARGET=udf_sos_proc

CUDADIR := /usr/local/cuda
PATH := ${CUDADIR}/bin:${PATH}
LD_LIBRARY_PATH := ${CUDADIR}/lib64/:${LD_LIBRARY_PATH}

all: ${TARGET}

${TARGET}: makefile ${TARGET}.cu ${UDF_LIB}/Proc.cpp ${UDF_LIB}/Proc.hpp
	nvcc -o ${TARGET} ${TARGET}.cu ${UDF_LIB}/Proc.cpp -I${UDF_LIB} -m64

udf_sos_exec.py

import sys
import gpudb

KINETICA_HOST = '127.0.0.1'
KINETICA_PORT = '9191'
INPUT_TABLE = 'udf_sos_in_table'
OUTPUT_TABLE = 'udf_sos_out_table'
proc_name = 'udf_sos_proc'
file_name = proc_name


# Read proc code in as bytes and add to a file data array
files = {}
with open(file_name, 'rb') as file:
    files[file_name] = file.read()


# Connect to Kinetica
h_db = gpudb.GPUdb(encoding = 'BINARY', host = KINETICA_HOST, port = KINETICA_PORT)


# Remove proc if it exists from a prior registration
if h_db.has_proc(proc_name)['proc_exists']:
    h_db.delete_proc(proc_name)


print "Registering proc..."
response = h_db.create_proc(proc_name, 'distributed', files, './' + file_name, [], {})
print response

print "Executing proc..."
response = h_db.execute_proc(proc_name, {}, {}, [INPUT_TABLE], {}, [OUTPUT_TABLE], {})
print response