The following is a complete example, using the Python API, of a CUDA-based
UDF that performs various computations using the scikit-CUDA interface. It
will take two vectors and one matrix of data loaded from a Kinetica table and
perform various operations in both NumPy & cuBLAS, writing the comparison output
to /opt/gpudb/core/logs/gpudb.log
.
This setup assumes the UDF is being developed on the Kinetica host (or head
node host, if a multi-node Kinetica cluster); and that the Python database
API is available at /opt/gpudb/api/python
and the Python UDF API is
available at /opt/gpudb/udf/api/python
.
This example will contain the following Python scripts:
Note
All commands should be run as the gpudb
user.
Since this example relies on the Scikit-CUDA package, that package must first be installed.
Ensure that the CUDA path is set:
$ export CUDADIR=/usr/local/cuda
$ export PATH=$PATH:$CUDADIR/bin
$ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$CUDADIR/lib64/:$LD_LIBRARY_PATH
Install the Scikit-CUDA package:
$ /opt/gpudb/udf/api/python/gpudb-pip.sh install scikit-cuda
The example can be run as follows:
$ /opt/gpudb/core/bin/gpudb_env.sh python udf_cublas_init.py
$ /opt/gpudb/core/bin/gpudb_env.sh python udf_cublas_exec.py
import sys
import collections
import gpudb
KINETICA_HOST = '127.0.0.1'
KINETICA_PORT = '9191'
INPUT_TABLE = 'udf_cublas_in_table'
## Connect to Kinetica
h_db = gpudb.GPUdb(encoding = 'BINARY', host = KINETICA_HOST, port = KINETICA_PORT)
## Create input data table
columns = []
columns.append(gpudb.GPUdbRecordColumn("x", gpudb.GPUdbRecordColumn._ColumnType.FLOAT))
columns.append(gpudb.GPUdbRecordColumn("y", gpudb.GPUdbRecordColumn._ColumnType.FLOAT))
columns.append(gpudb.GPUdbRecordColumn("z", gpudb.GPUdbRecordColumn._ColumnType.FLOAT))
test_record_type = gpudb.GPUdbRecordType(columns, label = INPUT_TABLE + '_lbl')
test_record_type.create_type(h_db)
type_id = test_record_type.type_id
if h_db.has_table(table_name = INPUT_TABLE)['table_exists']:
h_db.clear_table(table_name = INPUT_TABLE)
h_db.create_table(table_name = INPUT_TABLE, type_id = type_id, options = {"is_replicated":"true"})
## Insert input data
import random
encoded_obj_list = []
for val in range(10):
test_record_data = [random.uniform(0,10), random.uniform(0,10), random.uniform(0,10)]
encoded_obj_list.append(gpudb.GPUdbRecord(test_record_type, test_record_data).binary_data)
h_db.insert_records(table_name = INPUT_TABLE, data = encoded_obj_list, list_encoding = 'binary', options = {})
import sys,os,os.path
from kinetica_proc import ProcData
from skcuda import cublas
import numpy as np
import pycuda.autoinit
import pycuda.gpuarray as gpuarray
# Update PATH with location of nvcc compiler
CUDA_BIN = '/usr/local/cuda/bin'
os.environ['PATH']=os.environ['PATH'] + os.pathsep + os.path.join(os.sep,os.sep.join(CUDA_BIN.split('/')))
def cublas_max_element_index(h,x):
x_gpu = gpuarray.to_gpu(x)
return cublas.cublasIsamax(h, x.size, x_gpu.gpudata,1)
def cublas_swap_vectors(h,x,y):
x_gpu = gpuarray.to_gpu(x)
y_gpu = gpuarray.to_gpu(y)
cublas.cublasSswap(h, x.size, x_gpu.gpudata, 1, y_gpu.gpudata, 1)
return x_gpu.get(), y_gpu.get()
def cublas_add_vectors(h,x,y):
x_gpu = gpuarray.to_gpu(x)
y_gpu = gpuarray.to_gpu(y)
cublas.cublasSaxpy(h, x.size, 1.0, x_gpu.gpudata, 1, y_gpu.gpudata, 1)
return y_gpu.get()
def cublas_matrix_vector_product(h,M,x):
M_gpu = gpuarray.to_gpu(M)
x_gpu = gpuarray.to_gpu(x)
y1_gpu = gpuarray.empty((M.shape[1], 1), np.float32)
cublas.cublasSgemv(h, 'n', M.shape[1], M.shape[0], np.float32(1.0), M_gpu.gpudata, M.shape[1], x_gpu.gpudata, 1, np.float32(0.0), y1_gpu.gpudata, 1)
return y1_gpu.get()
def cublas_vector_transpose_product(h,x):
x_gpu = gpuarray.to_gpu(x)
A_gpu = gpuarray.zeros((x.shape[0], x.shape[0]), np.float32)
cublas.cublasSsyr(h, 'u', x.shape[0], 1.0, x_gpu.gpudata, 1, A_gpu.gpudata, x.shape[0])
return A_gpu.get()
def cublas_matrix_transpose_product(h,A):
A_gpu = gpuarray.to_gpu(A)
C_gpu = gpuarray.zeros((A.shape[0], A.shape[0]), np.float32)
cublas.cublasSsyrk(h, 'u', 't', A.shape[0], A.shape[1], 1.0, A_gpu.gpudata, A.shape[1], 0.0, C_gpu.gpudata, A.shape[0])
return C_gpu.get()
def example(pd):
np.set_printoptions(linewidth=200)
in_table = pd.input_data[0]
x = np.ndarray(shape=(in_table.size, 1), dtype=float).astype(np.float32)
y = np.ndarray(shape=(in_table.size, 1), dtype=float).astype(np.float32)
M = np.ndarray(shape=(in_table.size, 3), dtype=float).astype(np.float32)
# Initialize vectors & matrix with database values
for i in xrange(0, in_table.size):
x[i,0] = in_table['x'][i]
y[i,0] = in_table['y'][i]
M[i,0] = in_table['x'][i]
M[i,1] = in_table['y'][i]
M[i,2] = in_table['z'][i]
h = cublas.cublasCreate()
print "x = \n%s" % x
print "y = \n%s" % y
print "M = \n%s" % M
print
print "Swap vectors x & y (cuBLAS)"
x_swap, y_swap = cublas_swap_vectors(h,x,y)
print "x = \n%s" % x_swap
print "y = \n%s" % y_swap
print
print "Swap vectors x & y (NumPy)"
x_swap, y_swap = x.copy(), y.copy()
x_swap[:, 0], y_swap[:, 0] = y_swap[:, 0], x_swap[:, 0].copy()
print "x = \n%s" % x_swap
print "y = \n%s" % y_swap
print
print "Max element index (cuBLAS)"
print cublas_max_element_index(h,x)
print
print "Max element index (NumPy)"
print np.argmax(x)
print
print "x + y (cuBLAS)"
print cublas_add_vectors(h,x,y)
print
print "x + y (NumPy)"
print x + y
print
print "M T * x (cuBLAS)"
print cublas_matrix_vector_product(h,M,x)
print
print "M T * x (NumPy)"
print M.T.dot(x)
print
print "x * x T (cuBLAS)"
print cublas_vector_transpose_product(h,x)
print
print "x * x T (NumPy)"
print x * x.T
print
print "M * M T (cuBLAS)"
print cublas_matrix_transpose_product(h,M)
print
print "M * M T (NumPy)"
print M.dot(M.T)
cublas.cublasDestroy(h)
if __name__ == "__main__":
proc_data = ProcData()
if int(proc_data.request_info["data_segment_number"]) + 1 == int(proc_data.request_info["data_segment_count"]):
example(proc_data)
proc_data.complete()
import sys
import gpudb
KINETICA_HOST = '127.0.0.1'
KINETICA_PORT = '9191'
proc_name = 'udf_cublas_proc'
file_name = proc_name + '.py'
INPUT_TABLE = 'udf_cublas_in_table'
# Read proc code in as bytes and add to a file data array
files = {}
with open(file_name, 'rb') as file:
files[file_name] = file.read()
# Connect to Kinetica
h_db = gpudb.GPUdb(encoding = 'BINARY', host = KINETICA_HOST, port = KINETICA_PORT)
# Remove proc if it exists from a prior registration
if h_db.has_proc(proc_name)['proc_exists']:
h_db.delete_proc(proc_name)
print "Registering proc..."
response = h_db.create_proc(proc_name, 'distributed', files, 'python', [file_name], {})
print response
print "Executing proc..."
response = h_db.execute_proc(proc_name, {}, {}, [INPUT_TABLE], {}, [], {})
print response