Version:

Kafka Developer Manual

The following guide provides step-by-step instructions to get started integrating Kinetica with Kafka. The connector can be integrated as part of a Kafka stack or the Confluent platform.

This project is aimed to make Kafka topics accessible to Kinetica, meaning data can be streamed from a Kinetica table or to a Kinetica table via Kafka Connect. The custom Kafka Source Connector and Kafka Sink Connector do no additional processing, though limited data transformations can be made.

The two connector classes that integrate Kinetica with Kafka are:

  • com.kinetica.kafka.KineticaSourceConnector -- A Kafka Source Connector, which receives a data stream from the database via table monitor
  • com.kinetica.kafka.KineticaSinkConnector -- A Kafka Sink Connector, which receives a data stream from a Kafka Source Connector and writes it to the database

Installation & Configuration

The connector provided in this project assumes launching will be done on a server capable of running the Kinetica Kafka Connectors in standalone mode or in a cluster.

Version Support

Kafka Connect allows you to configure the Kinetica Kafka Connector exactly the same for a simple Kafka stack as you would for an integrated Confluent platform. The following table is provided for the ease of identifying compatible dependency versions.

Confluent Platform Apache Kafka Java
3.1.x 0.10.1.x 1.7.0_60, 1.8.0_60
3.2.x 0.10.2.x 1.7.0_60, 1.8.0_60
3.3.x 0.11.0.x 1.7.0_60, 1.8.0_60
4.0.x 1.0.x 1.7.0_60, 1.8.0_60
4.1.x 1.1.x 1.7.0_60, 1.8.0_60
5.0.x 2.0.x 1.8.0_60
5.1.x 2.1.x 1.8.0_60

The Kinetica Kafka Connector has a kafka.version property in the pom.xml configuration file to set the Kafka version appropriate for the target environment. The build process will add the Kafka version to the name of the JAR:

kafka-<kafka.version>-connector-kinetica-7.0.<X>.<Y>-jar-with-dependencies.jar

The Connector code is Java 7 compatible and does not require a separate build to support a Java 8 environment.

Build

Clone and build the project as follows:

git clone https://github.com/kineticadb/kinetica-connector-kafka
cd kinetica-connector-kafka/kafka-connect-kinetica
mvn clean compile package -DskipTests=true

Three JAR files are produced by the Maven build, in kinetica-connector-kafka/kafka-connect-kinetica/target/:

  • kafka-<ver>-connector-kinetica-<ver>.jar - default JAR (not for use)
  • kafka-<ver>-connector-kinetica-<ver>-jar-with-dependencies.jar - complete connector JAR
  • kafka-<ver>-connector-kinetica-<ver>-tests.jar - tests JAR (see below how to use it to test connectivity)

Test

To run JUnit tests as part of build process, make sure that you have a running Kinetica instance, and that its URL and user credentials are properly configured in the following files:

  • config/quickstart-kinetica-sink.properties
  • config/quickstart-kinetica-source.properties

Then, run:

mvn clean compile package

Kinetica Kafka Connector Plugin

The connector JAR built in the previous section is a Kafka Connect plugin. A Kafka Connect plugin is simply a set of JAR files where Kafka Connect can find an implementation of one or more connectors, transforms, and/or converters. Kafka Connect isolates each plugin from one another so that libraries in one plugin are not affected by the libraries in any other plugins.

A Kafka Connect plugin is either:

  • an uber JAR containing all of the class files for the plugin and its third-party dependencies in a single JAR file:

    kinetica-connector-kafka/kafka-connect-kinetica/target/kafka-<ver>-connector-kinetica-<ver>-jar-with-dependencies.jar
    
  • a directory on the file system that contains the JAR files for the plugin and its third-party dependencies:

    kinetica-connector-kafka/kafka-connect-kinetica/target/kafka-<ver>-connector-kinetica-<ver>-package
    |-- etc
        |-- kafka-connect-kinetica
            |-- quickstart-kinetica-sink.properties
            |-- quickstart-kinetica-source.properties
    |-- share
        |-- doc
            |-- kafka-connect-kinetica
                |-- licenses
                    |-- LICENSE.apache2.0.txt
                    |-- LICENSE.bsd.txt
                    |-- ...
                |-- LICENSE
                |-- NOTICE
                |-- version.txt
        |-- java
            |-- kafka-connect-kinetica
                |-- ...
                |-- kafka-connect-kinetica-<ver>.jar
                |-- ...
                |-- xz-1.5.jar
    

Installing on a Kafka Stack

To install the Kinetica Kafka Connector at the target server location using a simple Kafka stack, copy the uber JAR into the Kafka libraries folder:

cp target/kafka-<ver>-connector-kinetica-<ver>-jar-with-dependencies.jar <KAFKA_HOME>/libs/

Ensure the configuration in the following files matches the configuration you've tested with your Kafka connector:

  • <KAFKA_HOME>/config/connect-distributed.properties
  • <KAFKA_HOME>/config/connect-standalone.properties

Any additional properties files you might need should go in the same folder:

<KAFKA_HOME>/config/

Installing on the Confluent Platform

To install the Kinetica Kafka Connector at the target server location using the Confluent platform, copy the contents of the build package folder to the appropriate locations under the Confluent home directory:

mkdir <CONFLUENT_HOME>/share/java/kafka-connect-kinetica
cp target/kafka-<ver>-connector-kinetica-<ver>-package/share/java/* <CONFLUENT_HOME>/share/java/kafka-connect-kinetica/
mkdir <CONFLUENT_HOME>/etc/kafka-connect-kinetica
cp target/kafka-<ver>-connector-kinetica-<ver>-package/etc/* <CONFLUENT_HOME>/etc/kafka-connect-kinetica/
mkdir <CONFLUENT_HOME>/share/doc/kafka-connect-kinetica
cp target/kafka-<ver>-connector-kinetica-<ver>-package/share/doc/* <CONFLUENT_HOME>/share/doc/kafka-connect-kinetica/

Important

Following this convention accurately when naming folders and placing the JAR file and its configuration properties accordingly is essential to being able to load and start/stop the connector remotely via REST services. The Unique Connector name in the quickstart properties is passed to the REST service. The kafka-connect-kinetica folder name is treated both as the connector identifier and as a part of the path built on the fly when the connector is engaged. Starting the folder name with kafka-connect-<connector name> is a Confluent convention used for all Kafka Connect components, such as jdbc, s3, hdfs, and others.

Kinetica Kafka Connector Plugin Deployment Considerations

Users can run Kafka Connect in two ways:

Standalone Mode

In standalone mode, a single process runs all the connectors. It is not fault tolerant. Since it uses only a single process, it is not scalable. Standalone mode can be used for proofs of concept, demonstrations, and testing. It is managed through the CLI.

When testing the connector in standalone mode, the following properties files govern the interaction:

  • connect-standalone.properties - used to configure worker tasks
  • quickstart-kinetica.properties - used to configure the connector itself

To run the connector, use the following syntax:

  • Kafka

    cd <KAFKA_HOME>
    bin/connect-standalone.sh \
        config/connect-standalone.properties \
        config/quickstart-kinetica.properties
    
  • Confluent

    cd <CONFLUENT_HOME>
    bin/connect-standalone \
        etc/kafka/connect-standalone.properties \
        etc/kafka-connect-kinetica/quickstart-kinetica.properties
    

Worker tasks and connector configurations use different sets of configuration parameters. Do not add connector configuration parameters to the worker task configuration, as they would be ignored. Configurations for sink and source connectors differ in the name and number of parameters, although some parameters are common to both.

When you start a connector (sink or source), it should have its own dedicated port set with the rest.port parameter. You can't use the same connect-standalone.properties file for different connectors running simultaneously. It is not required, but overall very useful to have separate configuration files named connect-standalone-sink.properties and connect-standalone-source.properties with preset port values, such as rest.port=8090 for the sink and rest.port=8089 for the source.

Distributed Mode

In distributed mode, multiple workers run Kafka Connect and are aware of each others' existence, which can provide fault tolerance and coordination between them during the event of reconfiguration. Distributed mode provides flexibility, scalability, and high availability; thus, it is better suited for production use. It is managed through the REST interface.

Important

See distributed mode for details on using distributed mode for a production deployment.

Kafka Connector configuration sent in REST calls has the same config properties that are listed in connect-standalone-sink.properties and connect-standalone-source.properties for standalone mode deployments, but should have the configuration formatted as an application/json object.

For example, this is the Source connector JSON:

{
   "name":"kinetica-source-connector",
   "config":{
      "name":"kinetica-source-connector",
      "connector.class":"com.kinetica.kafka.KineticaSourceConnector",
      "tasks.max":"3",
      "kinetica.url":"http://localhost:9191",
      "kinetica.table_names":"KafkaConnectorTest"
   }
}

The value of name parameter should be consistent, as it is the same for the outer object (connector) and inner (connector config). It is going to be used as the connector name as part of the REST endpoint URL.

When testing the connector in distributed mode, use the following syntax to start the Kafka Connect service:

  • Kafka

    cd <KAFKA_HOME>
    bin/connect-distributed.sh config/connect-distributed.properties
    
  • Confluent

    cd <CONFLUENT_HOME>
    bin/connect-distributed etc/kafka/connect-distributed.properties
    

By default, Kafka Connect is listening on port 8083, and assuming your bootstrap server IP is 127.0.0.1, here are examples of the available REST calls:

  • Check the available connectors:

    curl -X GET -H "Accept: application/json" http://127.0.0.1:8083/connectors
    
    ["my-jdbc-source", "my-hdfs-sink"]
    
  • Create a new connector (connector object is returned):

    curl -X POST -H "Accept: application/json" -H "Content-Type: application/json" --data '{"name":"kinetica-source-connector", "config": {"name":"kinetica-source-connector","connector.class":"com.kinetica.kafka.KineticaSourceConnector","tasks.max":"3","kinetica.url":"http://localhost:9191","kinetica.table_names":"KafkaConnectorTest"}}' http://127.0.0.1:8083/connectors
    
    {
       "name":"kinetica-source-connector",
       "config":{
          "name":"kinetica-source-connector",
          "connector.class":"com.kinetica.kafka.KineticaSourceConnector",
          "tasks.max":"3",
          "kinetica.url":"http://localhost:9191",
          "kinetica.table_names":"KafkaConnectorTest"
       },
       "tasks":[],
       "type":null
    }
    
  • Get info on existing connector (connector object is returned):

    curl -X GET -H "Accept: application/json" http://127.0.0.1:8083/connectors/kinetica-source-connector
    
    {
       "name":"kinetica-source-connector",
       "config":{
          "name":"kinetica-source-connector",
          "connector.class":"com.kinetica.kafka.KineticaSourceConnector",
          "tasks.max":"3",
          "kinetica.url":"http://localhost:9191",
          "kinetica.table_names":"KafkaConnectorTest"
       },
       "tasks":[{"connector":"kinetica-source-connector","task":0}],
       "type":"source"
    }
    
  • Variation of the previous call--get the connectors tasks collection only

    curl -X GET -H "Accept: application/json" http://127.0.0.1:8083/connectors/kinetica-source-connector/tasks
    
    [
       {
          "id":{
             "connector":"kinetica-source-connector",
             "task":0
          },
          "config":{
             "kinetica.timeout":"0",
             "kinetica.password":"",
             "task.class":"com.kinetica.kafka.KineticaSourceTask",
             "kinetica.url":"http://localhost:9191",
             "kinetica.kafka_schema_version":"",
             "kinetica.table_names":"KafkaConnectorTest",
             "kinetica.topic_prefix":"",
             "kinetica.username":""
          }
       }
    ]
    
  • Variation of the previous call--get the connectors config only

    curl -X GET -H "Accept: application/json" http://127.0.0.1:8083/connectors/kinetica-source-connector/config
    
    {
       "name":"kinetica-source-connector",
       "connector.class":"com.kinetica.kafka.KineticaSourceConnector",
       "tasks.max":"3",
       "kinetica.url":"http://localhost:9191",
       "kinetica.table_names":"KafkaConnectorTest"
    }
    
  • Reconfigure the running connector (would cascade to reconfiguring its tasks).

    Ensure that you send only the "config" node from the original JSON connector configuration:

    curl -X PUT -H "Accept: application/json" -H "Content-Type: application/json" --data '{"name":"kinetica-source-connector","connector.class":"com.kinetica.kafka.KineticaSourceConnector","tasks.max":"10","kinetica.url":"http://localhost:9191","kinetica.table_names":"KafkaConnectorTest,KafkaConnectorTest2"}' http://127.0.0.1:8083/connectors/kinetica-source-connector/config
    
    {
       "name":"kinetica-source-connector",
       "config":{
          "name":"kinetica-source-connector",
          "connector.class":"com.kinetica.kafka.KineticaSourceConnector",
          "tasks.max":"10",
          "kinetica.url":"http://localhost:9191",
          "kinetica.table_names":"KafkaConnectorTest,KafkaConnectorTest2"
       },
       "tasks":[{"connector":"kinetica-source-connector","task":0}],
       "type":"source"
    }
    
  • Halt the connector's tasks, then delete the connector and its configuration

    curl -X DELETE http://127.0.0.1:8083/connectors/kinetica-source-connector
    
    204 No Content
    

For more considerations on using the Kafka Connect REST API please refer to the Confluent web site.

Streaming Data from Kinetica into Kafka

The KineticaSourceConnector can be used as-is by Kafka Connect to stream data from Kinetica into Kafka. The connector will create table monitors to listen for inserts into a set of tables and publish the inserted rows to separate topics. A separate Kafka topic will be created for each database table configured. Data will be streamed in flat Kafka Connect Struct format with one field for each table column.

The KineticaSourceConnector is configured using the quickstart-kinetica-source.properties file, which accepts the following parameters:

Property Name Required Description
name Y Name for the connector
connector.class Y Must be com.kinetica.kafka.KineticaSourceConnector
tasks.max Y Number of threads
kinetica.url Y The URL of the Kinetica database server
kinetica.username N Username for authentication
kinetica.password N Password for authentication
kinetica.table_names Y A comma-delimited list of names of tables to stream from
kinetica.topic_prefix Y Token prepended to the name of each topic (see below)
kinetica.timeout N Timeout in milliseconds (default is no timeout)

The connector uses kinetica.topic_prefix to generate the name for the destination topic from kinetica.table_names. For example, if kinetica.topic_prefix is Tweets. and an insert is made to table KafkaConnectorTest, then it would publish the change to the topic named Tweets.KafkaConnectorTest.

Streaming Data from Kafka into Kinetica

The KineticaSinkConnector can be used as-is by Kafka Connect to stream data from Kafka into Kinetica. Streamed data must be in a flat Kafka Connect Struct that uses only supported data types for fields (BYTES, FLOAT64, FLOAT32, INT32, INT64, and STRING). No translation is performed on the data and it is streamed directly into a table. The target table and collection will be created if they do not exist.

Warning

If the target table does not exist, the connector will create it based on the information available in the Kafka schema. This information is missing potentially performance-impacting column types & properties. If these attributes are important to your application, the table should be created in advance of running the connector, so that these properties can be defined.

The KineticaSinkConnector is configured using the quickstart-kinetica-sink.properties file, which accepts the following parameters:

Property Name Required Description
name Y Name for the connector
connector.class Y Must be com.kinetica.kafka.KineticaSinkConnector
topics Y Comma-separated list of topics to stream from
topics.regex N Regular expression used to match against all available topic names to select topics to stream from; must start with ^
tasks.max Y Number of threads
kinetica.url Y The URL of the Kinetica database server
kinetica.username N Username for authentication
kinetica.password N Password for authentication
kinetica.table_prefix N Prefix for destination tables (see below)
kinetica.dest_table_override N Override for table name (see below)
kinetica.collection_name Y Collection to put the table in (default is empty)
kinetica.batch_size N The number of records to insert at a time (default = 10000)
kinetica.timeout N Timeout in milliseconds (default = 1000)
kinetica.create_table N Automatically create the table, if missing (default = true)
kinetica.update_on_existing_pk N When true and the destination table has a primary key, update any existing records in the destination table with incoming message data when the primary key value(s) of those records match the corresponding field values of any incoming messages; otherwise discard any incoming messages with matching primary key values (default = true)
kinetica.allow_schema_evolution N When true, allow schema evolution support for incoming messages; requires Schema Registry running in the Kafka stack (default = false)
kinetica.single_table_per_topic N When true, put all incoming messages into a single table; otherwise create a table for each message type (default = false)
kinetica.add_new_fields_as_columns N When kinetica.allow_schema_evolution is true and an incoming message has a new field, create a new column for it in the destination table (default = false)
kinetica.make_missing_field_nullable N When kinetica.allow_schema_evolution is true and an incoming message does not have a required field, alter the corresponding table column to be nullable (default = false)
kinetica.retry_count N Number of attempts to insert data before task fails (default = 1)

The topics and topics.regex parameters are mutually exclusive. Either the names of Kafka topics to subscribe to are provided explicitly via topics, or a regular expression in topics.regex will be used to filter all available Kafka topics for matching topic names.

You can use the optional kinetica.table_prefix parameter to prepend a token to the table name. This is useful for testing where you have a source connector reading from multiple tables and you want the sink connector to write to different tables in the same database.

You can also use the optional kinetica.dest_table_override parameter to manually specify a table name not generated from the Kafka schema. When the topics parameter has a comma-separated list of topic names, kinetica.dest_table_override should be a comma-separated list of table names corresponding to those topics. When topics are selected by the topics.regex expression, the kinetica.dest_table_override parameter is ignored.

If kinetica.single_table_per_topic is set to true, a single table is created with the name of the topic as its name. All incoming data would be formatted to fit its structure and column types. If the parameter is set to false, there may be multiple tables created for a single topic, depending on the number of unique message schemas read from the topic. The name of the destination table is then based on the message schema.

If kinetica.allow_schema_evolution is set to true, schema versions of the message objects will be looked up through the Schema Registry. Additional parameters, kinetica.add_new_fields_as_columns and kinetica.make_missing_field_nullable, allow modification of Kinetica tables on the fly upon receiving Kafka messages with new or missing fields. The default value for these three parameters is false, as mapping schemas and altering tables are expensive, time-consuming operations. This set of parameters was added to support Avro Schema Evolution and connecting to the Schema Registry. If kinetica.allow_schema_evolution is set to false, the connector assumes the object format is not going to change over time and will not attempt to map field names and types of incoming data to cached schemas, even if the Schema Registry service is available. Every schema version other than the version available at the time the connector subscribed to the topic would be blacklisted and data in that format ignored.

Note

You can find more on schema evolution rules in Apache Avro Specification and in Kafka Schema Evolution.

If you intend to use the Kafka Schema Registry (with or without a possibility of Schema Evolution), please configure the following parameters in your connect-standalone.properties file. The Schema Registry service is usually set up on the same server as bootstrap-server, port 8081.

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.schemas.enable=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.schemas.enable=true

Error Handling

Starting with Kafka 2.0.0 (Confluent 5.0.0) you can configure additional error-handling and error-logging options; all are optional:

Property Name Description
errors.tolerance

Whether to fail the task upon exceeding retires for any message (default = none)

  • none - (default) tolerate none of the complete message failures; fail immediately
  • all - tolerate all of the complete message failures; skip silently
errors.retry.timeout Total time that a failed operation will be retried, in milliseconds; use -1 for infinite retries [-1, Long.MAX_VALUE] (default = 0)
errors.retry.delay.max.ms Maximum delay between consecutive retries, in milliseconds [1, Long.MAX_VALUE] (default = 60000)
errors.log.enable Whether to enable error context logging, including the message and failure detail (default = false)
errors.log.include.messages Whether to log the content of failed messages (default = false)
errors.deadletterqueue.topic.name Name of Kafka topic to redirect failed messages to; if not given, the dead letter queue will not be used (default behavior)
errors.deadletterqueue.topic.replication.factor Replication factor used for an automatically created dead letter queue topic [1, Short.MAX_VALUE] (default = 3)
errors.deadletterqueue.context.headers.enable Whether to log extra metadata of failed messages (default = false)

These options are added to the connector configuration files:

  • quickstart-kinetica-sink.properties
  • quickstart-kinetica-source.properties

The errors.deadletterqueue family of options are valid for the sink connector only.

Testing

Datapump Test Utility

The datapump utility is used to generate insert activity on tables to facilitate testing. It will create tables KafkaConnectorTest and KafkaConnectorTest2 and insert records at regular intervals.

usage: TestDataPump [options] [URL]
 -c,--configFile <path>      Relative path to configuration file
 -d,--delay-time <seconds>   Seconds between batches
 -h,--help                   Show usage
 -n,--batch-size <count>     Number of records per batch
 -t,--total-batches <count>  Number of batches to insert

The below example (built for Kafka 2.0.0 and Kinetica 7.0.0.0) runs the datapump with default options on a local Kinetica instance and will insert batches of 10 records every 3 seconds.

java -cp kafka-2.0.0-connector-kinetica-7.0.0.0-tests.jar:kafka-2.0.0-connector-kinetica-7.0.0.0-jar-with-dependencies.jar \
    com.kinetica.kafka.TestDataPump \
    http://localhost:9191

A configuration file can be used to provide the Kinetica DB instance configuration, including URL, username, password, and timeout:

java -cp kafka-2.0.0-connector-kinetica-7.0.0.0-tests.jar:kafka-2.0.0-connector-kinetica-7.0.0.0-jar-with-dependencies.jar \
    com.kinetica.kafka.TestDataPump \
    -c config/quickstart-kinetica-sink.properties

System Test

This test will demonstrate the Kinetica Kafka Connector source and sink in standalone mode. The following steps use example files that may require modifications to match the target environment.

  1. Create two configuration files, connect-standalone-source.properties & connect-standalone-sink.properties, in the <KAFKA_HOME>/config directory with the following content:

    # This should point to your Kafka broker
    bootstrap.servers=localhost:9092
    
    offset.storage.file.filename=/tmp/connect.offsets
    offset.flush.interval.ms=5000
    
    # Source port
    #rest.port=8089
    
    # Sink port
    #rest.port=8090
    
    # needed for Kinetica
    key.converter.schemas.enable=true
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable=true
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    # Disable schemas for internal key/value parameters
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    
  2. In the connect-standalone-source.properties file, uncomment the source port:

    # Source port
    rest.port=8089
    
  3. In the connect-standalone-sink.properties file, uncomment the sink port:

    # Sink port
    rest.port=8090
    
  4. In the same directory, create a configuration file (source.properties) for the source connector:

    # Connector API required config
    name=TwitterSourceConnector
    connector.class=com.kinetica.kafka.KineticaSourceConnector
    tasks.max=1
    
    # Kinetic specific config
    kinetica.url=http://localhost:9191
    kinetica.table_names=KafkaConnectorTest,KafkaConnectorTest2
    kinetica.timeout=1000
    kinetica.topic_prefix=Tweets.
    
  5. In the same directory, create a configuration file (sink.properties) for the sink connector:

    name=TwitterSinkConnector
    topics=Tweets.KafkaConnectorTest,Tweets.KafkaConnectorTest2
    connector.class=com.kinetica.kafka.KineticaSinkConnector
    tasks.max=1
    
    # Kinetic specific config
    kinetica.url=http://localhost:9191
    kinetica.collection_name=TEST
    kinetica.table_prefix=out_
    kinetica.timeout=1000
    kinetica.batch_size=100
    

The rest of the system test will require four terminal windows:

  1. In terminal 1, start Zookeeper and Kafka:

    cd <KAFKA_HOME>
    bin/zookeeper-server-start.sh config/zookeeper.properties &
    bin/kafka-server-start.sh config/server.properties
    
  2. In terminal 2, start the test datapump. This will create the KafkaConnectorTest and KafkaConnectorTest2 tables and generate insert activity:

    java -cp kafka-2.0.0-connector-kinetica-7.0.0.0-tests.jar:kafka-2.0.0-connector-kinetica-7.0.0.0-jar-with-dependencies.jar \
        com.kinetica.kafka.TestDataPump -c config/sink.properties
    
  3. In terminal 3, start the Kafka sink connector:

    cd <KAFKA_HOME>
    bin/connect-standalone.sh config/connect-standalone-sink.properties config/sink.properties
    
  4. In terminal 4, start the Kafka source connector:

    cd <KAFKA_HOME>
    bin/connect-standalone.sh config/connect-standalone-source.properties config/source.properties
    
  5. Verify that data is copied to tables out_KafkaConnectorTest and out_KafkaConnectorTest2.

To test schemaless JSON format, in the connect-standalone-source.properties & connect-standalone-sink.properties files, set:

key.converter.schemas.enable=false
value.converter.schemas.enable=false

For more information, please refer to configuration descriptions and test scenarios in the kinetica-connector-kafka GitHub project, under test-connect/README.md.