Version:

Example UDF (CUDA) - CUBLAS

The following is a complete example, using the Python API, of a CUDA-based UDF that performs various computations using the scikit-CUDA interface. It will take two vectors and one matrix of data loaded from a Kinetica table and perform various operations in both NumPy & cuBLAS, writing the comparison output to /opt/gpudb/core/logs/gpudb.log.

This setup assumes the UDF is being developed on the Kinetica host (or head node host, if a multi-node Kinetica cluster); and that the Python database API is available at /opt/gpudb/api/python and the Python UDF API is available at /opt/gpudb/udf/api/python.

This example will contain the following Python scripts:

Note

All commands should be run as the gpudb user.

Since this example relies on the Scikit-CUDA package, that package must first be installed.

Ensure that the CUDA path is set:

$ export CUDADIR=/usr/local/cuda
$ export PATH=$PATH:$CUDADIR/bin
$ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$CUDADIR/lib64/:$LD_LIBRARY_PATH

Install the Scikit-CUDA package:

$ /opt/gpudb/udf/api/python/gpudb-pip.sh install scikit-cuda

The example can be run as follows:

$ /opt/gpudb/core/bin/gpudb_env.sh python udf_cublas_init.py
$ /opt/gpudb/core/bin/gpudb_env.sh python udf_cublas_exec.py

udf_cublas_init.py

import sys
import collections
import gpudb

KINETICA_HOST = '127.0.0.1'
KINETICA_PORT = '9191'
INPUT_TABLE = 'udf_cublas_in_table'


## Connect to Kinetica
h_db = gpudb.GPUdb(encoding = 'BINARY', host = KINETICA_HOST, port = KINETICA_PORT)


## Create input data table
columns = []
columns.append(gpudb.GPUdbRecordColumn("x", gpudb.GPUdbRecordColumn._ColumnType.FLOAT))
columns.append(gpudb.GPUdbRecordColumn("y", gpudb.GPUdbRecordColumn._ColumnType.FLOAT))
columns.append(gpudb.GPUdbRecordColumn("z", gpudb.GPUdbRecordColumn._ColumnType.FLOAT))

test_record_type = gpudb.GPUdbRecordType(columns, label = INPUT_TABLE + '_lbl')
test_record_type.create_type(h_db)
type_id = test_record_type.type_id

if h_db.has_table(table_name = INPUT_TABLE)['table_exists']:
   h_db.clear_table(table_name = INPUT_TABLE)
h_db.create_table(table_name = INPUT_TABLE, type_id = type_id, options = {"is_replicated":"true"})


## Insert input data
import random

encoded_obj_list = []
for val in range(10):
   test_record_data = [random.uniform(0,10), random.uniform(0,10), random.uniform(0,10)]
   encoded_obj_list.append(gpudb.GPUdbRecord(test_record_type, test_record_data).binary_data)
h_db.insert_records(table_name = INPUT_TABLE, data = encoded_obj_list, list_encoding = 'binary', options = {})

udf_cublas_proc.py

import sys,os,os.path
from kinetica_proc import ProcData
from skcuda import cublas
import numpy as np
import pycuda.autoinit
import pycuda.gpuarray as gpuarray

# Update PATH with location of nvcc compiler
CUDA_BIN = '/usr/local/cuda/bin'
os.environ['PATH']=os.environ['PATH'] + os.pathsep + os.path.join(os.sep,os.sep.join(CUDA_BIN.split('/')))

def cublas_max_element_index(h,x):
   x_gpu = gpuarray.to_gpu(x)
   return cublas.cublasIsamax(h, x.size, x_gpu.gpudata,1)

def cublas_swap_vectors(h,x,y):
   x_gpu = gpuarray.to_gpu(x)
   y_gpu = gpuarray.to_gpu(y)
   cublas.cublasSswap(h, x.size, x_gpu.gpudata, 1, y_gpu.gpudata, 1)
   return x_gpu.get(), y_gpu.get()

def cublas_add_vectors(h,x,y):
   x_gpu = gpuarray.to_gpu(x)
   y_gpu = gpuarray.to_gpu(y)
   cublas.cublasSaxpy(h, x.size, 1.0, x_gpu.gpudata, 1, y_gpu.gpudata, 1)
   return y_gpu.get()

def cublas_matrix_vector_product(h,M,x):
   M_gpu = gpuarray.to_gpu(M)
   x_gpu = gpuarray.to_gpu(x)
   y1_gpu = gpuarray.empty((M.shape[1], 1), np.float32)
   cublas.cublasSgemv(h, 'n', M.shape[1], M.shape[0], np.float32(1.0), M_gpu.gpudata, M.shape[1], x_gpu.gpudata, 1, np.float32(0.0), y1_gpu.gpudata, 1)
   return y1_gpu.get()

def cublas_vector_transpose_product(h,x):
   x_gpu = gpuarray.to_gpu(x)
   A_gpu = gpuarray.zeros((x.shape[0], x.shape[0]), np.float32)
   cublas.cublasSsyr(h, 'u', x.shape[0], 1.0, x_gpu.gpudata, 1, A_gpu.gpudata, x.shape[0])
   return A_gpu.get()

def cublas_matrix_transpose_product(h,A):
   A_gpu = gpuarray.to_gpu(A)
   C_gpu = gpuarray.zeros((A.shape[0], A.shape[0]), np.float32)
   cublas.cublasSsyrk(h, 'u', 't', A.shape[0], A.shape[1], 1.0, A_gpu.gpudata, A.shape[1], 0.0, C_gpu.gpudata, A.shape[0])
   return C_gpu.get()


def example(pd):

   np.set_printoptions(linewidth=200)

   in_table = pd.input_data[0]
   
   x = np.ndarray(shape=(in_table.size, 1), dtype=float).astype(np.float32)
   y = np.ndarray(shape=(in_table.size, 1), dtype=float).astype(np.float32)
   M = np.ndarray(shape=(in_table.size, 3), dtype=float).astype(np.float32)

   # Initialize vectors & matrix with database values
   for i in xrange(0, in_table.size):
      x[i,0] = in_table['x'][i]
      y[i,0] = in_table['y'][i]
      M[i,0] = in_table['x'][i]
      M[i,1] = in_table['y'][i]
      M[i,2] = in_table['z'][i]


   h = cublas.cublasCreate()
   print "x = \n%s" % x
   print "y = \n%s" % y
   print "M = \n%s" % M
   print

   print "Swap vectors x & y (cuBLAS)"
   x_swap, y_swap = cublas_swap_vectors(h,x,y)
   print "x = \n%s" % x_swap
   print "y = \n%s" % y_swap
   print
   
   print "Swap vectors x & y (NumPy)"
   x_swap, y_swap = x.copy(), y.copy()
   x_swap[:, 0], y_swap[:, 0] = y_swap[:, 0], x_swap[:, 0].copy()
   print "x = \n%s" % x_swap
   print "y = \n%s" % y_swap
   print

   print "Max element index (cuBLAS)"
   print cublas_max_element_index(h,x)
   print

   print "Max element index (NumPy)"
   print np.argmax(x)
   print

   print "x + y (cuBLAS)"
   print cublas_add_vectors(h,x,y)
   print

   print "x + y (NumPy)"
   print x + y
   print

   print "M T * x (cuBLAS)"
   print cublas_matrix_vector_product(h,M,x)
   print

   print "M T * x (NumPy)"
   print M.T.dot(x)
   print

   print "x * x T (cuBLAS)"
   print cublas_vector_transpose_product(h,x)
   print

   print "x * x T (NumPy)"
   print x * x.T
   print

   print "M * M T (cuBLAS)"
   print cublas_matrix_transpose_product(h,M)
   print

   print "M * M T (NumPy)"
   print M.dot(M.T)

   cublas.cublasDestroy(h)


if __name__ == "__main__":

   proc_data = ProcData()

   if int(proc_data.request_info["data_segment_number"]) + 1 == int(proc_data.request_info["data_segment_count"]):
       example(proc_data)

   proc_data.complete()

udf_cublas_exec.py

import sys
import gpudb

KINETICA_HOST = '127.0.0.1'
KINETICA_PORT = '9191'
proc_name = 'udf_cublas_proc'
file_name = proc_name + '.py'

INPUT_TABLE = 'udf_cublas_in_table'


# Read proc code in as bytes and add to a file data array
files = {}
with open(file_name, 'rb') as file:
    files[file_name] = file.read()


# Connect to Kinetica
h_db = gpudb.GPUdb(encoding = 'BINARY', host = KINETICA_HOST, port = KINETICA_PORT)


# Remove proc if it exists from a prior registration
if h_db.has_proc(proc_name)['proc_exists']:
    h_db.delete_proc(proc_name)


print "Registering proc..."
response = h_db.create_proc(proc_name, 'distributed', files, 'python', [file_name], {})
print response

print "Executing proc..."
response = h_db.execute_proc(proc_name, {}, {}, [INPUT_TABLE], {}, [], {})
print response