The following guide provides step-by-step instructions to get started integrating Kinetica with Kafka.
This project is aimed to make Kafka topics accessible to Kinetica, meaning data can be streamed from a Kinetica table or to a Kinetica table via Kafka Connect. The custom Kafka Source Connector and Sink Connector do no additional processing.
The two connector classes that integrate Kinetica with Kafka are:
com.kinetica.kafka.KineticaSourceConnector
-- A Kafka Source Connector,
which receives a data stream from the database via table monitorcom.kinetica.kafka.KineticaSinkConnector
-- A Kafka Sink Connector,
which receives a data stream from a Kafka Source Connector and writes it to
the databaseSource code for the connector can be found here.
The connector provided in this project assumes launching will be done on a server capable of submitting Kafka connectors in standalone mode or to a cluster.
Two JAR files are produced by this project:
kafka-connector-<ver>.jar
-- default JAR (not for use)kafka-connector-<ver>-jar-with-dependencies.jar
-- complete connector JARTo install the connector:
Copy the kafka-connector-<ver>-jar-with-dependencies.jar
library to the
target server
Create a configuration file (connect-standalone.properties
) if you are
using standalone mode
Create a configuration file (source.properties
) for the source
connector:
name = <UniqueNameOfSourceConnector>
connector.class = com.kinetica.kafka.KineticaSourceConnector
tasks.max = 1
kinetica.url = <KineticaServiceURL>
kinetica.username = <KineticaAuthenticatingUserName>
kinetica.password = <KineticaAuthenticatingUserPassword>
kinetica.table_names = <KineticaSourceTableNameA,KineticaSourceTableNameB>
kinetica.timeout = <KineticaConnectionTimeoutInSeconds>
topic_prefix = <TargetKafkaTopicNamesPrefix>
Create a configuration file (sink.properties
) for the sink connector:
name = <UniqueNameOfSinkConnector>
connector.class = com.kinetica.kafka.KineticaSinkConnector
tasks.max = <NumberOfKafkaToKineticaWritingProcesses>
topics = <TopicPrefix><SourceTableName>
kinetica.url = <KineticaServiceURL>
kinetica.username = <KineticaAuthenticatingUserName>
kinetica.password = <KineticaAuthenticatingUserPassword>
kinetica.collection_name = <TargetKineticaCollectionName>
kinetica.timeout = <KineticaConnectionTimeoutInSeconds>
The KineticaSourceConnector
can be used as-is by Kafka Connect to stream
data from Kinetica into Kafka. The connector will create table monitors to
listen for inserts or updates on a set of tables and
publish the updated rows to separate topics. A separate Kafka topic will be
created for each database table configured. Data will be streamed in flat
Kafka Connect Struct
format with one field for each table column.
The KineticaSourceConnector
is configured using a properties file that
accepts the following parameters:
Property Name | Required | Description |
---|---|---|
name |
Y | Name for the connector |
connector.class |
Y | Must be com.kinetica.kafka.KineticaSourceConnector |
tasks.max |
Y | Number of threads |
kinetica.url |
Y | The URL of the Kinetica database server |
kinetica.username |
N | Username for authentication |
kinetica.password |
N | Password for authentication |
kinetica.table_names |
Y | A comma-delimited list of names of tables to stream from |
kinetica.topic_prefix |
Y | Token prepended to the name of each topic (see below) |
kinetica.timeout |
N | Timeout in milliseconds (default = none) |
The connector uses the topic_prefix
to generate the name of a destination
topic from the table name. For example if topic_prefix
is Tweets.
and
an insert is made to table KafkaConnectorTest
then it would publish the
change to topic Tweets.KafkaConnectorTest
.
The KineticaSinkConnector
can be used as-is by Kafka Connect to stream
data from Kafka into Kinetica. Streamed data must be in a flat
Kafka Connect Struct
that uses only supported data types for fields
(BYTES
, FLOAT64
, FLOAT32
, INT32
, INT64
, and STRING
). No
translation is performed on the data and it is streamed directly into a table.
The target table and collection will be created if they do not exist.
The KineticaSinkConnector
is configured using a properties file that
accepts the following parameters:
Property Name | Required | Description |
---|---|---|
name |
Y | Name for the connector |
connector.class |
Y | Must be com.kinetica.kafka.KineticaSourceConnector |
topics |
Y | Comma-separated list of topics to stream from |
tasks.max |
Y | Number of threads |
kinetica.url |
Y | The URL of the Kinetica database server |
kinetica.username |
N | Username for authentication |
kinetica.password |
N | Password for authentication |
kinetica.table_prefix |
N | Prefix for destination tables (see below) |
kinetica.collection_name |
Y | Collection to put the table in (default is empty) |
kinetica.batch_size |
N | The number of records to insert at a time (default = 10000) |
kinetica.timeout |
N | Timeout in milliseconds (default = 1000) |
The connector determines the name of the destination table based on the Avro
schema attached to the message. You can use the optional table_prefix
parameter to have it prepend a token to the table name. This is useful for
testing where you have a source connector reading from multiple tables and
you want the sink connector to write to different tables in the same database.
Note
This connector does not permit schema-less SinkRecords, so you must
add the following lines to the connect-standalone.properties
file before running connect-standalone.sh
:
key.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
Important
If the target table does not exist, the connector will create
it based on the information available in the Kafka schema. This
information is missing column attributes like timestamp
,
shard_key
, and charN
. If these attributes are important
then you should create the table in advance of running the
connector so it will use the existing table.
The datapump utility is used to generate insert activity on tables to
facilitate testing. It will create tables KafkaConnectorTest
and
KafkaConnectorTest2
and insert records at regular intervals.
usage: TestDataPump [options] URL
-d,--delay-time <seconds> Seconds between batches.
-h,--help Show Usage
-n,--batch-size <count> Number of records
The below example runs the datapump with default options and will insert batches of 10 records every 3 seconds.
java -cp kafka-connector-6.1.0-jar-with-dependencies.jar \
com.kinetica.kafka.tests.TestDataPump \
http://gpudb:9191
This test will demonstrate the Kinetica Kafka Connector source and sink in standalone mode. The following steps use example files that may require modifications.
Important
The standalone mode should be used only for testing. You should use distributed mode for a production deployment.
Create a configuration file (connect-standalone.properties
)
# This should point to your Kafka broker
bootstrap.servers=broker:9092
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=5000
rest.port=8083
# Make sure your connector jar is in this path
plugin.path=/opt/connect-test
# needed for Kinetica
key.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
Create a configuration file (source.properties
) for the source
connector:
# Connector API required config
name=TwitterSourceConnector
connector.class=com.kinetica.kafka.KineticaSourceConnector
tasks.max=1
# Kinetic specific config
kinetica.url=http://localhost:9191
kinetica.table_names=KafkaConnectorTest,KafkaConnectorTest2
kinetica.timeout=1000
kinetica.topic_prefix=Tweets.
Create a configuration file (sink.properties
) for the sink connector:
name=TwitterSinkConnector
topics=Tweets.KafkaConnectorTest,Tweets.KafkaConnectorTest2
connector.class=com.kinetica.kafka.KineticaSinkConnector
tasks.max=1
# Kinetic specific config
kinetica.url=http://localhost:9191
kinetica.collection_name=TEST
kinetica.table_prefix=out_
kinetica.timeout=1000
kinetica.batch_size=100
The rest of the system test will require three terminal windows:
In terminal 1, start Zookeeper and Kafka:
cd <path/to/Kafka>
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties
In terminal 2, start test datapump. This will create the
KafkaConnectorTest
and KafkaConnectorTest2
tables and generate insert
activity:
java -cp kafka-connector-6.1.0-jar-with-dependencies.jar \
com.kinetica.kafka.tests.TestDataPump <Kinetica url>
In terminal 3, start Kafka connector:
export CLASSPATH=<path/to/kafka-connector-6.1.0-jar-with-dependencies.jar>
cd <path/to/Kafka>
bin/connect-standalone.sh \
config/connect-standalone.properties <source.properties> <sink.properties>
Verify that data is copied to tables out_KafkaConnectorTest
and
out_KafkaConnectorTest2
.