Kafka Developer Manual

The following guide provides step-by-step instructions to get started integrating Kinetica with Kafka. The connector can be integrated as part of a Kafka stack or the Confluent platform. The connector is available as a prebuilt package that can be downloaded from the release page or built locally from source.

This project aims to make Kafka topics accessible to Kinetica, meaning data can be streamed from a Kinetica table or to a Kinetica table via Kafka Connect. The custom Kafka Source Connector and Kafka Sink Connector do no additional processing, though limited data transformations can be made.

Note

There is a separate set of parameters in the connect-standalone.properties file within the repository that allows lightweight message-at-a-time data transformations, such as creating a new column from a Kafka message's timestamp. This file is part of worker task configuration that is configured as part of Kafka Connect.

The two connector classes that integrate Kinetica with Kafka are:

  • com.kinetica.kafka.KineticaSourceConnector -- A Kafka Source Connector, which receives a data stream from the database via table monitor
  • com.kinetica.kafka.KineticaSinkConnector -- A Kafka Sink Connector, which receives a data stream from a Kafka Source Connector and writes it to the database

Installation & Configuration

The connector provided in this project assumes launching will be done on a server capable of running the Kinetica Kafka Connectors in standalone mode or in a cluster.

Version Support

Kafka Connect allows you to configure the Kinetica Kafka Connector exactly the same for a simple Kafka stack as you would for an integrated Confluent platform. The following table is provided for the ease of identifying compatible dependency versions.

Confluent PlatformApache KafkaJava
3.1.x0.10.1.x1.7.0_60, 1.8.0_60
3.2.x0.10.2.x1.7.0_60, 1.8.0_60
3.3.x0.11.0.x1.7.0_60, 1.8.0_60
4.0.x1.0.x1.7.0_60, 1.8.0_60
4.1.x1.1.x1.7.0_60, 1.8.0_60
5.0.x2.0.x1.8.0_60
5.1.x2.1.x1.8.0_60
5.2.x2.2.x1.8.0_60
5.4.12.4.11.8.0_60
6.0.02.6.x1.8.0_60

The Kinetica Kafka Connector has a kafka.version property in the pom.xml configuration file to set the Kafka version appropriate for the target environment. The build process will add the Kafka version to the name of the JAR:

1
kafka-<kafka.version>-connector-kinetica-7.1.<X>.<Y>-jar-with-dependencies.jar

The Connector code is Java 7 compatible and does not require a separate build to support a Java 8 environment.

Build

Clone and build the project as follows:

1
2
3
git clone https://github.com/kineticadb/kinetica-connector-kafka
cd kinetica-connector-kafka/kafka-connect-kinetica
mvn clean compile package -DskipTests=true

Three JAR files are produced by the Maven build, in kinetica-connector-kafka/kafka-connect-kinetica/target/:

  • kafka-<ver>-connector-kinetica-<ver>.jar - default JAR (not for use)
  • kafka-<ver>-connector-kinetica-<ver>-jar-with-dependencies.jar - complete connector JAR
  • kafka-<ver>-connector-kinetica-<ver>-tests.jar - tests JAR (see below how to use it to test connectivity)

Test

To run JUnit tests as part of build process, make sure that you have a running Kinetica instance, and that its URL and user credentials are properly configured in the following files:

  • config/quickstart-kinetica-sink.properties
  • config/quickstart-kinetica-source.properties

Then, run:

1
mvn clean compile package

Kinetica Kafka Connector Plugin

The connector JAR built in the previous section is a Kafka Connect plugin. A Kafka Connect plugin is simply a set of JAR files where Kafka Connect can find an implementation of one or more connectors, transforms, and/or converters. Kafka Connect isolates each plugin from one another so that libraries in one plugin are not affected by the libraries in any other plugins.

A Kafka Connect plugin is either:

  • an uber JAR containing all of the class files for the plugin and its third-party dependencies in a single JAR file:

    1
    
    kinetica-connector-kafka/kafka-connect-kinetica/target/kafka-<ver>-connector-kinetica-<ver>-jar-with-dependencies.jar
    
  • a directory on the file system that contains the JAR files for the plugin and its third-party dependencies:

    kinetica-connector-kafka/kafka-connect-kinetica/target/kafka-<ver>-connector-kinetica-<ver>-package
    |-- etc
        |-- kafka-connect-kinetica
            |-- quickstart-kinetica-sink.properties
            |-- quickstart-kinetica-source.properties
    |-- share
        |-- doc
            |-- kafka-connect-kinetica
                |-- licenses
                    |-- LICENSE.apache2.0.txt
                    |-- LICENSE.bsd.txt
                    |-- ...
                |-- LICENSE
                |-- NOTICE
                |-- version.txt
        |-- java
            |-- kafka-connect-kinetica
                |-- ...
                |-- kafka-connect-kinetica-<ver>.jar
                |-- ...
                |-- xz-1.5.jar
    

Installing on a Kafka Stack

Ensure the configuration in the following files matches the configuration you've tested with your Kafka connector:

  • <KAFKA_HOME>/config/connect-distributed.properties
  • <KAFKA_HOME>/config/connect-standalone.properties

Any additional properties files you might need should go in the same folder:

1
<KAFKA_HOME>/config/

To install the Kinetica Kafka Connector at the target server location using a simple Kafka stack, find the plugin.path value in the <KAFKA_HOME>/config/connect-distributed.properties and <KAFKA_HOME>/config/connect-standalone.properties files and copy the uber JAR there:

1
cp target/kafka-<ver>-connector-kinetica-<ver>-jar-with-dependencies.jar <plugin.path directory>

If the plugin.path location has not been set, create a plugins or connectors directory under the Kafka root directory, then edit the connect-distributed.properties and connect-standalone.properties files to add the plugin.path setting and the new directory as its value:

1
2
3
mkdir <KAFKA_HOME>/plugins/
cp target/kafka-<ver>-connector-kinetica-<ver>-jar-with-dependencies.jar <KAFKA_HOME>/plugins/
# edit <KAFKA_HOME>/config/connect-distributed.properties & <KAFKA_HOME>/config/connect-standalone.properties for new plugin.path

If the other properties in these files don't match with the configuration you've tested with your Kafka connector, create a new directory named kafka-connect-kinetica under the <KAFKA_HOME>/config/ directory and save the customized property files there as well as any additional properties files you might need, e.g., kinetica-sink.properties, kinetica-source.properties, etc.

Installing on the Confluent Platform

To install the Kinetica Kafka Connector at the target server location for the Confluent platform, find the plugin.path value in the <CONFLUENT_HOME>/etc/kafka/connect-distributed.properties and <CONFLUENT_HOME>/etc/kafka/connect-standalone.properties files and copy the uber JAR there:

1
cp target/kafka-<ver>-connector-kinetica-<ver>-jar-with-dependencies.jar <plugin.path directory>

If the plugin.path location has not been set, create a plugins or connectors directory under the Kafka root directory, then edit the connect-distributed.properties and connect-standalone.properties files to add the plugin.path setting and the new directory as its value:

1
2
3
mkdir <CONFLUENT_HOME>/plugins/
cp target/kafka-<ver>-connector-kinetica-<ver>-jar-with-dependencies.jar <CONFLUENT_HOME>/plugins/
# edit <CONFLUENT_HOME>/etc/kafka/connect-distributed.properties & <CONFLUENT_HOME>/etc/kafka/connect-standalone.properties for new plugin.path

If the other properties in these files don't match with the configuration you've tested with your Kafka connector, create a new directory named kafka-connect-kinetica under the <CONFLUENT_HOME/etc/ directory and save the customized property files there as well as any additional properties files you might need, e.g., kinetica-sink.properties, kinetica-source.properties, etc.

Note

A separate directory for connector plugins ensures that any dependency conflicts between Kafka/Confluent and the connector do not jeopardize the health of the Kafka stack. Do not drop the connector JAR directly into the <KAFKA_HOME>/libs or <CONFLUENT_HOME>/share/java directories.

Confluent recommends the following procedure to deploy a well-tested connector plugin in a production environment:

1
2
3
4
5
6
mkdir <CONFLUENT_HOME>/share/java/kafka-connect-kinetica
cp target/kafka-<ver>-connector-kinetica-<ver>-package/share/java/* <CONFLUENT_HOME>/share/java/kafka-connect-kinetica/
mkdir <CONFLUENT_HOME>/etc/kafka-connect-kinetica
cp target/kafka-<ver>-connector-kinetica-<ver>-package/etc/* <CONFLUENT_HOME>/etc/kafka-connect-kinetica/
mkdir <CONFLUENT_HOME>/share/doc/kafka-connect-kinetica
cp target/kafka-<ver>-connector-kinetica-<ver>-package/share/doc/* <CONFLUENT_HOME>/share/doc/kafka-connect-kinetica/

Important

Following this convention accurately when naming folders and placing the JAR file and its configuration properties accordingly is essential to being able to load and start/stop the connector remotely via REST services. The Unique Connector name in the quickstart properties is passed to the REST service. The kafka-connect-kinetica folder name is treated both as the connector identifier and as a part of the path built on the fly when the connector is engaged. Starting the folder name with kafka-connect-<connector name> is a Confluent convention used for all Kafka Connect components, such as jdbc, s3, hdfs, and others.

Plugin Deployment Considerations

Users can run Kafka Connect in two ways: standalone mode or distributed mode. Visit Managing the Kafka Connector for more information.

Streaming Data from Kinetica into Kafka

The KineticaSourceConnector can be used as-is by Kafka Connect to stream data from Kinetica into Kafka. The connector will create table monitors to listen for inserts into a set of tables and publish the inserted rows to separate topics. A separate Kafka topic will be created for each database table configured. Data will be streamed in flat Kafka Connect Struct format with one field for each table column.

The KineticaSourceConnector is configured using the quickstart-kinetica-source.properties file, which accepts the following parameters:

Property NameRequiredDescription
nameYName for the connector
connector.classYMust be com.kinetica.kafka.KineticaSourceConnector
tasks.maxYNumber of threads
kinetica.urlYThe URL of the Kinetica database server
kinetica.usernameNUsername for authentication
kinetica.passwordNPassword for authentication
kinetica.table_namesYA comma-delimited list of names of tables to stream from
kinetica.topic_prefixYToken prepended to the name of each topic (see below)
kinetica.timeoutNTimeout in milliseconds (default is no timeout)

The connector uses kinetica.topic_prefix to generate the name for the destination topic from kinetica.table_names. For example, if kinetica.topic_prefix is Tweets. and an insert is made to table KafkaConnectorTest, then it would publish the change to the topic named Tweets.KafkaConnectorTest.

Streaming Data from Kafka into Kinetica

The KineticaSinkConnector can be used as-is by Kafka Connect to stream data from Kafka into Kinetica. Streamed data must be in a flat Kafka Connect Struct that uses only supported data types for fields (BYTES, FLOAT64, FLOAT32, INT32, INT64, and STRING). No translation is performed on the data and it is streamed directly into a table. The target table and schema will be created if they don't exist and the user has sufficient privileges.

Warning

If the target table does not exist, the connector will create it based on the information available in the Kafka schema. This information is missing potentially performance-impacting column types & properties. If these attributes are important to your application, the table should be created in advance of running the connector, so that these properties can be defined.

The KineticaSinkConnector is configured using the quickstart-kinetica-sink.properties file, which accepts the following parameters:

Property NameRequiredDescription
nameYName for the connector
connector.classYMust be com.kinetica.kafka.KineticaSinkConnector
topicsYComma-separated list of topics to stream from
topics.regexNRegular expression used to match against all available topic names to select topics to stream from; must start with ^
tasks.maxYNumber of threads
kinetica.urlYThe URL of the Kinetica database server
kinetica.usernameNUsername for authentication
kinetica.passwordNPassword for authentication
kinetica.timeoutNTimeout in milliseconds (default = 1000)
kinetica.enable_multiheadNEnable multihead ingest (default = true)
kinetica.retry_countNNumber of attempts to insert data before task fails (default = 1)
kinetica.batch_sizeNThe number of records to insert at a time (default = 10000)
kinetica.tables.create_tableNAutomatically create the table, if missing (default = true)
kinetica.tables.prefixNPrefix for destination tables (see below)
kinetica.tables.schema_nameYSchema to put the table in (the user's default schema will be used at runtime if one is not provided)
kinetica.tables.destination_nameNOverride for table name (see below)
kinetica.tables.single_table_per_topicNWhen true, put all incoming messages into a single table; otherwise create a table for each message type (default = false)
kinetica.tables.update_on_existing_pkNWhen true and the destination table has a primary key, update any existing records in the destination table with incoming message data when the primary key value(s) of those records match the corresponding field values of any incoming messages; otherwise discard any incoming messages with matching primary key values (default = true)
kinetica.schema_evolution.enabledNWhen true, allow schema evolution support for incoming messages; requires Schema Registry running in the Kafka stack (default = false)
kinetica.schema_evolution.add_new_fields_as_columnsNWhen kinetica.schema_evolution.enabled is true and an incoming message has a new field, create a new column for it in the destination table (default = false)
kinetica.schema_evolution.make_missing_field_nullableNWhen kinetica.schema_evolution.enabled is true and an incoming message does not have a required field, alter the corresponding table column to be nullable (default = false)

The topics and topics.regex parameters are mutually exclusive. Either the names of Kafka topics to subscribe to are provided explicitly via topics, or a regular expression in topics.regex will be used to filter all available Kafka topics for matching topic names.

You can use the optional kinetica.tables.prefix parameter to prepend a token to the table name. This is useful for testing where you have a source connector reading from multiple tables and you want the sink connector to write to different tables in the same database.

You can also use the optional kinetica.tables.destination_name parameter to manually specify a table name not generated from the Kafka schema. When the topics parameter has a comma-separated list of topic names, kinetica.tables.destination_name should be a comma-separated list of table names corresponding to those topics. When topics are selected by the topics.regex expression, the kinetica.tables.destination_name parameter is ignored.

If kinetica.tables.single_table_per_topic is set to true, a single table is created with the name of the topic as its name. All incoming data would be formatted to fit its structure and column types. If the parameter is set to false, there may be multiple tables created for a single topic, depending on the number of unique message schemas read from the topic. The name of the destination table is then based on the message schema.

If kinetica.schema_evolution.enabled is set to true, schema versions of the message objects will be looked up through the Schema Registry. Additional parameters, kinetica.schema_evolution.add_new_fields_as_columns and kinetica.schema_evolution.make_missing_field_nullable, allow modification of Kinetica tables on the fly upon receiving Kafka messages with new or missing fields. The default value for these three parameters is false, as mapping schemas and altering tables are expensive, time-consuming operations. This set of parameters was added to support Avro Schema Evolution and connecting to the Schema Registry. If kinetica.schema_evolution.enabled is set to false, the connector assumes the object format is not going to change over time and will not attempt to map field names and types of incoming data to cached schemas, even if the Schema Registry service is available. Every schema version other than the version available at the time the connector subscribed to the topic would be blacklisted and data in that format ignored.

Note

You can find more on schema evolution rules in Apache Avro Specification and in Kafka Schema Evolution.

If you intend to use the Kafka Schema Registry (with or without a possibility of Schema Evolution), please configure the following parameters in your connect-standalone.properties file. The Schema Registry service is usually set up on the same server as bootstrap-server, port 8081.

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.schemas.enable=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.schemas.enable=true

Deprecated Properties

The following parameters from the 7.0 version of the Kinetica Kafka Connector have been deprecated with the advent of 7.1:

Property Name (7.0)Property Name (7.1)
kinetica.create_tablekinetica.tables.create_table
kinetica.table_prefixkinetica.tables.prefix
kinetica.collection_namekinetica.tables.schema_name
kinetica.dest_table_overridekinetica.tables.destination_name
kinetica.single_table_per_topickinetica.tables.single_table_per_topic
kinetica.update_on_existing_pkkinetica.tables.update_on_existing_pk
kinetica.allow_schema_evolutionkinetica.schema_evolution.enabled
kinetica.add_new_fields_as_columnskinetica.schema_evolution.add_new_fields_as_columns
kinetica.make_missing_field_nullablekinetica.schema_evolution.make_missing_field_nullable

Secure Connections

If the Kafka stack is configured to allow only secure connections, find the setup below that aligns with the Kafka stack and add ensuing parameters to connect-standalone.properties or connect-distributed.properties.

SSL Certificates with Server-side Truststore

If the Kafka stack is configured to use SSL certificates with a server-side truststore, add the following parameters to the relevant properties file:

1
2
3
4
5
ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
ssl.truststore.password=<password>
ssl.keystore.location=/etc/kafka/secrets/kafka.client.keystore.jks
ssl.keystore.password=<password>
ssl.key.password=<password>

Additional parameters are available for secure connections to the Schema Registry server:

1
2
3
4
5
6
7
inter.instance.protocol=http
schema.registry.url: "<schema-registry-url>:<port>"
schema.registry.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
schema.registry.ssl.truststore.password=<password>
schema.registry.ssl.keystore.location=/etc/kafka/secrets/kafka.client.keystore.jks
schema.registry.ssl.keystore.password=<password>
schema.registry.ssl.key.password=<password>

Basic HTTP Authentication

If the Kafka stack is configured to use basic HTTP authentication, add the following parameters to the relevant properties file:

1
2
basic.auth.credentials.source=USER_INFO
basic.auth.user.info=<username>:<password>

SASL Authentication

If the Kafka stack is configured to use the Java Authentication and Authorization service (JAAS) for SASL, add the following parameters to the relevant properties file, noting that you must provide JAAS configurations for all SASL authentication mechanisms:

1
2
3
4
sasl.mechanism=PLAIN
# Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";

Note

While configuring the the sasl.jaas.config value, please follow the format provided above; do not break the value into multiple lines, do not replace the double quotes with single quotes, and use a semicolon at the end of the value.

If your Kafka connector configured in the local Kafka/Confluent stack is connecting to cloud Kafka, you'll need to set additional SASL-related properties prefixed with producer. for the source connector and consumer. for the sink connector:

1
2
3
4
5
bootstrap.servers=<your kafka cloud>.aws.confluent.cloud:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<cluster API key >" password="<cluster API secret>";

producer.security.protocol=SASL_SSL producer.sasl.mechanism=PLAIN producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<cluster API key >" password="<cluster API secret>";

consumer.security.protocol=SASL_SSL consumer.sasl.mechanism=PLAIN consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<cluster API key >" password="<cluster API secret>";

For any other encryption- or security-related Kafka options, e.g., SASL/GSSAPI, SASL OAUTHBEARER, SASL/SCRAM, Delegation Tokens, LDAP, Kerberos, etc., visit the Confluent security documentation and follow directions for configuring a Kafka Client.

Error Handling

Starting with Kafka 2.0.0 (Confluent 5.0.0) you can configure additional error-handling and error-logging options; all are optional:

Property NameDescription
errors.tolerance

Whether to fail the task upon exceeding retires for any message (default = none)

  • none - (default) tolerate none of the complete message failures; fail immediately
  • all - tolerate all of the complete message failures; skip silently
errors.retry.timeoutTotal time that a failed operation will be retried, in milliseconds; use -1 for infinite retries [-1, Long.MAX_VALUE] (default = 0)
errors.retry.delay.max.msMaximum delay between consecutive retries, in milliseconds [1, Long.MAX_VALUE] (default = 60000)
errors.log.enableWhether to enable error context logging, including the message and failure detail (default = false)
errors.log.include.messagesWhether to log the content of failed messages (default = false)
errors.deadletterqueue.topic.nameName of Kafka topic to redirect failed messages to; if not given, the dead letter queue will not be used (default behavior)
errors.deadletterqueue.topic.replication.factorReplication factor used for an automatically created dead letter queue topic [1, Short.MAX_VALUE] (default = 3)
errors.deadletterqueue.context.headers.enableWhether to log extra metadata of failed messages (default = false)

These options are added to the connector configuration files:

  • quickstart-kinetica-sink.properties
  • quickstart-kinetica-source.properties

The errors.deadletterqueue family of options are valid for the sink connector only.

Testing

Datapump Test Utility

The datapump utility is used to generate insert activity on tables to facilitate testing. It will create tables KafkaConnectorTest and KafkaConnectorTest2 and insert records at regular intervals.

usage: TestDataPump [options] [URL]
 -c,--configFile <path>      Relative path to configuration file
 -d,--delay-time <seconds>   Seconds between batches
 -h,--help                   Show usage
 -n,--batch-size <count>     Number of records per batch
 -t,--total-batches <count>  Number of batches to insert

The below example (built for Kafka 2.6.0 and Kinetica 7.1.0.0) runs the datapump with default options on a local Kinetica instance and will insert batches of 10 records every 3 seconds.

java -cp kafka-2.6.0-connector-kinetica-7.1.0.0-tests.jar:kafka-2.6.0-connector-kinetica-7.1.0.0-jar-with-dependencies.jar \
    com.kinetica.kafka.TestDataPump \
    http://localhost:9191

A configuration file can be used to provide the Kinetica DB instance configuration, including URL, username, password, and timeout:

java -cp kafka-2.6.0-connector-kinetica-7.1.0.0-tests.jar:kafka-2.6.0-connector-kinetica-7.1.0.0-jar-with-dependencies.jar \
    com.kinetica.kafka.TestDataPump \
    -c config/quickstart-kinetica-sink.properties

System Test

This test will demonstrate the Kinetica Kafka Connector source and sink in standalone mode. The following steps use example files that may require modifications to match the target environment.

  1. Create two configuration files, connect-standalone-source.properties & connect-standalone-sink.properties, in the <KAFKA_HOME>/config directory with the following content:

    # This should point to your Kafka broker
    bootstrap.servers = localhost:9092
    
    offset.storage.file.filename=/tmp/connect.offsets
    offset.flush.interval.ms = 5000
    
    # These ports must not be used by other processes on the host.
    # source rest port
    # rest.port = 8089
    # sink rest port
    # rest.port = 8090
    
    # Key is stored in commit log with JSON schema.
    key.converter = org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable = true
    
    # Value is stored in commit log with JSON schema.
    value.converter = org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable = true
    
    # Disable schemas for internal key/value parameters:
    internal.key.converter = org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable = false
    internal.value.converter = org.apache.kafka.connect.json.JsonConverter
    internal.value.converter.schemas.enable = false
    
  2. In the connect-standalone-source.properties file, uncomment the source port:

    # source rest port
    rest.port=8089
    
  3. In the connect-standalone-sink.properties file, uncomment the sink port:

    # sink rest port
    rest.port=8090
    
  4. In the same directory, create a configuration file (source.properties) for the source connector with the following content:

    # Connector API required config
    name=TwitterSourceConnector
    connector.class=com.kinetica.kafka.KineticaSourceConnector
    tasks.max=1
    
    # Kinetic specific config
    kinetica.url=http://localhost:9191
    kinetica.table_names=KafkaConnectorTest,KafkaConnectorTest2
    kinetica.timeout=1000
    kinetica.topic_prefix=Tweets.
    
  5. In the same directory, create a configuration file (sink.properties) for the sink connector with the following content:

    name=TwitterSinkConnector
    topics=Tweets.KafkaConnectorTest,Tweets.KafkaConnectorTest2
    connector.class=com.kinetica.kafka.KineticaSinkConnector
    tasks.max=1
    
    # Kinetic specific config
    kinetica.url=http://localhost:9191
    kinetica.tables.schema_name=example_kafka
    kinetica.tables.prefix=out_
    kinetica.timeout=1000
    kinetica.batch_size=100
    

The rest of the system test will require four terminal windows:

  1. In terminal 1, start Zookeeper and Kafka:

    cd <KAFKA_HOME>
    bin/zookeeper-server-start.sh config/zookeeper.properties && bin/kafka-server-start.sh config/server.properties
    
  2. In terminal 2, start the test datapump. This will create the KafkaConnectorTest and KafkaConnectorTest2 tables and generate insert activity:

    java -cp kafka-<kafka version>-connector-kinetica-<database version>-tests.jar:kafka-<kafka version>-connector-kinetica-<database version>-jar-with-dependencies.jar \
        com.kinetica.kafka.TestDataPump http://localhost:9191
    
  3. In terminal 3, start the Kafka sink connector:

    cd <KAFKA_HOME>
    bin/connect-standalone.sh config/connect-standalone-sink.properties config/sink.properties
    
  4. In terminal 4, start the Kafka source connector:

    cd <KAFKA_HOME>
    bin/connect-standalone.sh config/connect-standalone-source.properties config/source.properties
    
  5. Verify that data is copied to tables out_KafkaConnectorTest and out_KafkaConnectorTest2.

To test schemaless JSON format, in the connect-standalone-source.properties & connect-standalone-sink.properties files, set:

key.converter.schemas.enable=false
value.converter.schemas.enable=false

For more information, please refer to configuration descriptions and test scenarios in the kinetica-connector-kafka GitHub project, under test-connect/README.md.

Alternatively, you could run a Kafka producer script in Python to generate data for a Kafka topic, then run a sink connector to populate Kinetica tables and run a source connector to create another Kafka topic. Sample Python producer script:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import random
import logging as log

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

# Block for 'synchronous' sends
try:
      for i in range(100):
      data = {}
      data['symbol'] = 'AA'+str(i)
      data['sector'] = 'equipment'
      data['securityType'] = 'commonstock'
      data['bidPrice'] = random.randint(0,20)
      data['bidSize'] = random.randint(0,10)
      data['askPrice'] = random.randint(0,20)
      data['askSize'] = random.randint(0,10)
      data['lastUpdated'] = 1547587673240
      data['lastSalePrice'] = 153.04
      data['lastSaleSize'] = 100
      data['lastSaleTime'] = 1547585999856
      data['volume'] = 757810
      data['marketPercent'] = 0.0267
      json_data = json.dumps(data)
      producer.send('NewTestTopic1', json_data.encode(), 'table')

except KafkaError as e:
      # Decide what to do if producer request failed...
      log.exception(str(e))
      pass


def on_send_error(excp):
      log.error('I am an errback', exc_info=excp)
      # handle exception

producer.flush()