The following implements the sum-of-squares algorithm as a distributed UDF using the CUDA-based UDF C++ API. The initialization script will be run separately from the UDF itself, and the execution script will need to target the UDF C++ binary.
This example will take a list of input tables and corresponding output tables (must be the same number) and, for each record of each input table, sums the squares of all input table columns and saves the result to the first column of the corresponding output table record; i.e.:
in.a2 + in.b2 + ... + in.n2 -> out.a
This example will contain the following scripts:
Note
All commands should be run as the gpudb
user.
The example can be run as follows:
$ /opt/gpudb/core/bin/gpudb_env.sh python udf_sos_init.py
$ make
$ /opt/gpudb/core/bin/gpudb_env.sh python udf_sos_exec.py
This setup assumes the UDF is being developed on the Kinetica host (or head
node host, if a multi-node Kinetica cluster); and that the Python database
API is available at /opt/gpudb/api/python
and the UDF C++ API is
available at /opt/gpudb/udf/api/cpp
.
import sys
import collections
import gpudb
KINETICA_HOST = '127.0.0.1'
KINETICA_PORT = '9191'
INPUT_TABLE = 'udf_sos_in_table'
OUTPUT_TABLE = 'udf_sos_out_table'
## Connect to Kinetica
h_db = gpudb.GPUdb(encoding = 'BINARY', host = KINETICA_HOST, port = KINETICA_PORT)
## Create input data table
input_type = """
{
"type": "record",
"name": "input_type",
"fields": [
{"name":"x1","type":"float"},
{"name":"x2","type":"float"}
]
} """.replace(' ','').replace('\n','')
type_id = h_db.create_type(type_definition = input_type, label = INPUT_TABLE + '_lbl', properties = {})['type_id']
if h_db.has_table(table_name = INPUT_TABLE)['table_exists']:
h_db.clear_table(table_name = INPUT_TABLE)
h_db.create_table(table_name = INPUT_TABLE, type_id = type_id)
## Insert input data
import random
encoded_obj_list = []
for val in range(10000):
datum = collections.OrderedDict()
datum["x1"] = random.gauss(1,1)
datum["x2"] = random.gauss(1,2)
encoded_obj_list.append(h_db.encode_datum(input_type, datum))
h_db.insert_records(table_name = INPUT_TABLE, data = encoded_obj_list, list_encoding = 'binary', options = {})
## Create output data table
output_type = """
{
"type": "record",
"name": "out_type",
"fields": [
{"name":"y","type":"float"}
]
} """.replace(' ','').replace('\n','')
type_id = h_db.create_type(type_definition = output_type, label = OUTPUT_TABLE + '_lbl', properties = {})['type_id']
if h_db.has_table(table_name = OUTPUT_TABLE)['table_exists']:
h_db.clear_table(table_name = OUTPUT_TABLE)
h_db.create_table(table_name = OUTPUT_TABLE, type_id = type_id)
#include "Proc.hpp"
#include <iostream>
// Customize as appropriate
#define CUDA_THREADS 256
inline void cudaCheck(cudaError_t err)
{
if (err != cudaSuccess)
{
throw std::runtime_error(cudaGetErrorString(err));
}
}
__global__
void sos(size_t recordCount, float* d_input, float* d_output)
{
size_t i = blockIdx.x * blockDim.x + threadIdx.x;
if (i < recordCount)
{
d_output[i] += d_input[i] * d_input[i];
}
}
int main(int argc, char *argv[])
{
try
{
kinetica::ProcData* procData = kinetica::ProcData::get();
const kinetica::ProcData::InputDataSet& inputData = procData->getInputData();
kinetica::ProcData::OutputDataSet& outputData = procData->getOutputData();
// Loop through input/output table pairs
for (size_t table = 0; table < inputData.getTableCount(); ++table)
{
const kinetica::ProcData::InputTable& inputTable = inputData[table];
kinetica::ProcData::OutputTable& outputTable = outputData[table];
kinetica::ProcData::OutputColumn& outputColumn = outputTable[0];
size_t columnCount = inputTable.getColumnCount();
size_t recordCount = inputTable.getSize();
size_t cudaBlocks = recordCount / CUDA_THREADS + 1;
// Set the output table size
outputTable.setSize(recordCount);
// Allocate input and output device vectors on the GPU
float* d_input;
cudaCheck(cudaMalloc(&d_input, recordCount * sizeof(float)));
float* d_output;
cudaCheck(cudaMalloc(&d_output, recordCount * sizeof(float)));
// initialize output device vector values to 0
cudaCheck(cudaMemset(d_output, 0, recordCount * sizeof(float)));
// Copy input data from each input column to device vector and run
// the calculation; saving result to output device vector
for (size_t column = 0; column < columnCount; ++column)
{
const kinetica::ProcData::InputColumn& inputColumn = inputTable[column];
cudaCheck(cudaMemcpy(d_input, inputColumn.getData<float>(), recordCount * sizeof(float), cudaMemcpyHostToDevice));
sos<<<cudaBlocks, CUDA_THREADS>>>(recordCount, d_input, d_output);
cudaCheck(cudaPeekAtLastError());
}
// Copy output data from device vector to output column
cudaCheck(cudaMemcpy(outputColumn.getData<float>(), d_output, recordCount * sizeof(float), cudaMemcpyDeviceToHost));
// Free input and output device vectors
cudaCheck(cudaFree(d_input));
cudaCheck(cudaFree(d_output));
}
procData->complete();
}
catch (const std::exception& ex)
{
std::cerr << ex.what() << std::endl;
return 1;
}
return 0;
}
UDF_LIB=/opt/gpudb/udf/api/cpp/kinetica
TARGET=udf_sos_proc
CUDADIR := /usr/local/cuda
PATH := ${CUDADIR}/bin:${PATH}
LD_LIBRARY_PATH := ${CUDADIR}/lib64/:${LD_LIBRARY_PATH}
all: ${TARGET}
${TARGET}: makefile ${TARGET}.cu ${UDF_LIB}/Proc.cpp ${UDF_LIB}/Proc.hpp
nvcc -o ${TARGET} ${TARGET}.cu ${UDF_LIB}/Proc.cpp -I${UDF_LIB} -m64
import sys
import gpudb
KINETICA_HOST = '127.0.0.1'
KINETICA_PORT = '9191'
INPUT_TABLE = 'udf_sos_in_table'
OUTPUT_TABLE = 'udf_sos_out_table'
proc_name = 'udf_sos_proc'
file_name = proc_name
# Read proc code in as bytes and add to a file data array
files = {}
with open(file_name, 'rb') as file:
files[file_name] = file.read()
# Connect to Kinetica
h_db = gpudb.GPUdb(encoding = 'BINARY', host = KINETICA_HOST, port = KINETICA_PORT)
# Remove proc if it exists from a prior registration
if h_db.has_proc(proc_name)['proc_exists']:
h_db.delete_proc(proc_name)
print "Registering proc..."
response = h_db.create_proc(proc_name, 'distributed', files, './' + file_name, [], {})
print response
print "Executing proc..."
response = h_db.execute_proc(proc_name, {}, {}, [INPUT_TABLE], {}, [OUTPUT_TABLE], {})
print response