The following guide provides step by step instructions to get started integrating Kinetica with Kafka.
This project is aimed to make Kinetica Kafka accessible, meaning data can be streamed from a Kinetica table or to a Kinetica table via Kafka Connect. The custom Kafka Source Connector and Sink Connector do no additional processing.
Source code for the connector can be found at https://github.com/Kinetica/gpudb-connector-kafka
The two connector classes that integrate Kinetica with Kafka are:
com.gpudb.kafka
GPUdbSourceConnector
- A Kafka Source Connector, which receives a data
stream from a GPUdb table monitorGPUdbSinkConnector
- A Kafka Sink Connector, which receives a data
stream from a Kafka Source Connector and writes it to GPUdbThe GPUdbSourceConnector
can be used as-is by Kafka Connect to stream
data from GPUdb into Kafka. Data will be streamed in flat Kafka Connect
Struct
format with one field for each table column. A separate Kafka
topic will be created for each GPUdb table configured.
The GPUdbSourceConnector
is configured using a properties file that
accepts the following parameters:
gpudb.url
: The URL of the GPUdb servergpudb.username
(optional): Username for authenticationgpudb.password
(optional): Password for authenticationgpudb.timeout
(optional): Timeout in millisecondsgpudb.table_names
: A comma-delimited list of names of tables in GPUdb to
stream fromtopic_prefix
: The token that will be prepended to the name of each table
to form the name of the corresponding Kafka topic into which records will be
queuedThe GPUdbSinkConnector
can be used as-is by Kafka Connect to stream
data from Kafka into GPUdb. Streamed data must be in a flat Kafka Connect
Struct
that uses only supported data types for fields (BYTES
,
FLOAT64
, FLOAT32
, INT32
, INT64
, and STRING
). No
translation is performed on the data and it is streamed directly into a
GPUdb table. The target table and collection will be created if they do
not exist.
The GPUdbSinkConnector
is configured using a properties file that
accepts the following parameters:
gpudb.url
: The URL of the GPUdb servergpudb.username
(optional): Username for authenticationgpudb.password
(optional): Password for authenticationgpudb.timeout
(optional): Timeout in millisecondsgpudb.collection_name
(optional): Collection to put the table ingpudb.table_name
: The name of the table in GPUdb to stream togpudb.batch_size
: The number of records to insert at one timetopics
: Kafka parameter specifying which topics will be used as sourcesThe connector provided in this project assumes launching will be done on a server capable of submitting Kafka connectors in standalone mode or to a cluster.
Two JAR files are produced by this project:
kafka-connector-<ver>.jar
- default JAR (not for use)kafka-connector-<ver>-jar-with-dependencies.jar
- complete connector JARTo install the connector:
Copy the kafka-connector-<ver>-jar-with-dependencies.jar
library to the
target server
Create a configuration file (source.properties
) for the source connector:
name=<UniqueNameOfSourceConnector>
connector.class=com.gpudb.kafka.GPUdbSourceConnector
tasks.max=1
gpudb.url=<GPUdbServiceURL>
gpudb.username=<GPUdbAuthenticatingUserName>
gpudb.password=<GPUdbAuthenticatingUserPassword>
gpudb.table_names=<GPUdbSourceTableNameA,GPUdbSourceTableNameB>
gpudb.timeout=<GPUdbConnectionTimeoutInSeconds>
topic_prefix=<TargetKafkaTopicNamesPrefix>
Create a configuration file (sink.properties
) for the sink connector:
name=<UniqueNameOfSinkConnector>
connector.class=com.gpudb.kafka.GPUdbSinkConnector
tasks.max=<NumberOfKafkaToGPUdbWritingProcesses>
gpudb.url=<GPUdbServiceURL>
gpudb.username=<GPUdbAuthenticatingUserName>
gpudb.password=<GPUdbAuthenticatingUserPassword>
gpudb.collection_name=<TargetGPUdbCollectionName>
gpudb.table_name=<GPUdbTargetTableName>
gpudb.timeout=<GPUdbConnectionTimeoutInSeconds>
gpudb.batch_size=<NumberOfRecordsToBatchBeforeInsert>
topics=<SourceKafkaTopicNames>
This example will demonstrate the GPUdb Kafka Connector in standalone mode.
It assumes the presence of a TwitterSource
table in GPUdb that has records
streaming into it.
Start Kafka locally
Create a configuration file (source.properties
) for the source connector:
name=TwitterSourceConnector
connector.class=com.gpudb.kafka.GPUdbSourceConnector
tasks.max=1
gpudb.url=http://localhost:9191
gpudb.table_names=TwitterSource
topic_prefix=Tweets.
Create a configuration file (sink.properties
) for the sink connector:
name=TwitterSinkConnector
connector.class=com.gpudb.kafka.GPUdbSinkConnector
tasks.max=4
gpudb.url=http://localhost:9191
gpudb.table_name=TwitterTarget
gpudb.batch_size=10
topics=Tweets.TwitterSource
Make sure the CLASSPATH environment variable includes the directory containing the GPUdb Kafka Connector jar:
export CLASSPATH=<GPUdbKafkaConnectorDirectory>/*
Run Kafka Connect from the Kafka installation directory:
bin/connect-standalone.sh config/connect-standalone.properties source.properties sink.properties
This will stream data as it is inserted into the TwitterSource
table into
the TwitterTarget
table via Kafka Connect.