Load Data from Confluent Kafka

Copy and paste tutorial for loading data from Confluent Kafka

Loading data from Confluent Kafka can be done in three easy steps:

  1. Create a credential - holds Kafka account authentication information
  2. Create a data source - holds Kafka connection information; uses the credential for authentication
  3. Ingest the data - Load data from Kafka; uses the data source to identify the Kafka source

Note

Only Avro, GeoJSON, & JSON are supported for Kafka ingest.


Create Credential

To store the access details for your data source, first create a credential with the CREATE CREDENTIAL command, and reference it in your CREATE DATA SOURCE statement. This will allow you to store the authentication details of your connection separately.

If connecting to a Kafka topic that allows anonymous access, this step can be skipped.

Password
1
2
3
4
CREATE CREDENTIAL confluent_cred
TYPE = 'confluent',
IDENTITY = 'jdoe',
SECRET = 'foobaz123'
SSL with Truststore
1
2
3
4
5
6
7
CREATE CREDENTIAL confluent_cred
TYPE = 'confluent'
WITH OPTIONS
(
    'security.protocol' = 'SSL',
    'ssl.ca.location' = 'kifs://ssl/ca-bundle.crt'
)
SSL with Truststore/Client Auth
1
2
3
4
5
6
7
8
CREATE CREDENTIAL confluent_cred
TYPE = 'confluent'
WITH OPTIONS
(
    'security.protocol' = 'SSL',
    'ssl.ca.location' = 'kifs://ssl/ca-bundle.crt',
    'ssl.certificate.location' = 'kifs://ssl/client.pem'
)
SSL with Encryption
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE CREDENTIAL confluent_cred
TYPE = 'confluent'
WITH OPTIONS
(
    'security.protocol' = 'SSL',
    'ssl.ca.location' = 'kifs://ssl/ca-bundle.crt',
    'ssl.certificate.location' = 'kifs://ssl/client.pem',
    'ssl.key.location' = 'kifs://ssl/client.key',
    'ssl.key.password' = 'foobaz123'
)
SASL
1
2
3
4
5
6
7
8
9
CREATE CREDENTIAL confluent_cred
TYPE = 'confluent'
WITH OPTIONS
(
    'security.protocol' = 'SASL_SSL',
    'sasl.mechanism' = 'PLAIN',
    'sasl.username' = 'jdoe',
    'sasl.password' = 'foobaz123'
)
Kerberos
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE CREDENTIAL confluent_cred
TYPE = 'confluent'
WITH OPTIONS
(
    'security.protocol' = 'SASL_PLAINTEXT',
    'sasl.mechanism' = 'GSSAPI',
    'sasl.kerberos.service.name' = 'kafka',
    'sasl.kerberos.keytab' = 'kifs://security/jdoe.keytab',
    'sasl.kerberos.principal' = 'jdoe@example.com'
)
Kerberos SSL
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
CREATE CREDENTIAL confluent_cred
TYPE = 'confluent'
WITH OPTIONS
(
    'security.protocol' = 'SASL_SSL',
    'sasl.mechanism' = 'GSSAPI',
    'sasl.kerberos.service.name' = 'kafka',
    'sasl.kerberos.keytab' = 'kifs://security/jdoe.keytab',
    'sasl.kerberos.principal' = 'jdoe@example.com',
    'ssl.ca.location' = 'kifs://ssl/ca-bundle.crt',
    'ssl.certificate.location' = 'kifs://ssl/client.pem',
    'ssl.key.location' = 'kifs://ssl/client.key',
    'ssl.key.password' = 'foobaz123'
)

Create Data Source

Next, create a data source using the CREATE DATA SOURCE command in Kinetica. The data source defines how Kinetica connects to your Kafka topic.

Credential
1
2
3
4
5
6
7
CREATE DATA SOURCE confluent_ds
LOCATION = 'CONFLUENT://example.com:9092'
WITH OPTIONS
(
    KAFKA_TOPIC_NAME = 'sample',
    CREDENTIAL = 'confluent_cred'
)
Credential w/ Schema Registry
1
2
3
4
5
6
7
8
9
CREATE DATA SOURCE confluent_ds
LOCATION = 'CONFLUENT://example.com:9092'
WITH OPTIONS
(
    KAFKA_TOPIC_NAME = 'sample',
    CREDENTIAL = 'confluent_cred',
    SCHEMA_REGISTRY_LOCATION = 'https://example.com:8082',
    SCHEMA_REGISTRY_CREDENTIAL = 'confluent_sr_cred'
)
Anonymous
1
2
3
4
5
6
CREATE DATA SOURCE confluent_ds
LOCATION = 'CONFLUENT://example.com:9092'
WITH OPTIONS
(
    KAFKA_TOPIC_NAME = 'sample'
)

Ingest Data

To initiate the ingest of data into a Kinetica table, use the LOAD INTO command with the option SUBSCRIBE = TRUE. This will start the stream of data from your Kafka topic into the specified table. You can control the ingest (pause, resume, and cancel) by using the ALTER TABLE command.

Start Ingest
1
2
3
LOAD DATA INTO example.orders
FORMAT JSON
WITH OPTIONS (DATA SOURCE = 'confluent_ds', SUBSCRIBE = TRUE)
Pause Ingest
1
2
ALTER TABLE example.orders
PAUSE SUBSCRIPTION confluent_ds
Resume Ingest
1
2
ALTER TABLE example.orders
RESUME SUBSCRIPTION confluent_ds
Cancel Ingest
1
2
ALTER TABLE example.orders
CANCEL SUBSCRIPTION confluent_ds

Considerations

The following are some common options used when loading. For the full list of options, see LOAD INTO. For copy/paste examples of many of the options, see Loading Data.


Error Handling

Kinetica has two different error handling modes for dealing with erroneous data. To halt ingestion after a bad record is found, use the ABORT mode. To skip erroneous records and continue the ingest, use the SKIP mode.

To inspect erroneous records, you may use the BAD RECORD TABLE NAME option. All bad records encountered will be stored there for review. The bad records table is limited to 10,000 records by default and may be overridden using the BAD RECORD TABLE LIMIT option.

Abort
1
2
3
4
5
6
7
LOAD INTO ki_home.error_example
FORMAT JSON
WITH OPTIONS (
    DATA SOURCE = 'confluent_ds',
    SUBSCRIBE = true,
    ON ERROR = ABORT
)
Log Bad Records
1
2
3
4
5
6
7
8
LOAD INTO ki_home.error_example
FORMAT JSON
WITH OPTIONS (
    DATA SOURCE = 'confluent_ds',
    SUBSCRIBE = true,
    BAD RECORD TABLE NAME = 'error_example_invalid',
    ON ERROR = SKIP
)

Load Specific Columns

In some cases, you may only want to store certain columns from your source data. Use the FIELDS MAPPED BY NAME(...) option, which allows you to specify the desired fields to store in Kinetica.

Load Specific Columns Example
1
2
3
4
5
6
LOAD DATA INTO example.orders
FORMAT JSON
WITH OPTIONS (
    DATA SOURCE = 'confluent_ds',
    FIELDS MAPPED BY NAME(ID, Name, Product_ID, Quantity)
)

DateTime Formatting

Use the COLUMN FORMATS option to format date and time fields into Kinetica Date, Time, and DateTime columns. Time formats are specified using a JSON formatted string. Non-placeholder characters must be wrapped in quotes, which must also be escaped (e.g. '{"dt": {"date": "\"(\"YYYY\")\" Mon, DD"}}'). Alternatively, you can use the ? character as a wildcard character. Note that Kinetica does not handle or store timezones and they will be discarded. See the full list of supported date and time format codes.

ISO 8601 Timestamps
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
-- Example Data:
-- {
--   "dt": "2022-01-19T15:50:42Z+05:00"
-- }

LOAD INTO ki_home.timestamp_example
FORMAT JSON
WITH OPTIONS (
    DATA SOURCE = 'confluent_ds',
    SUBSCRIBE = true,
    COLUMN FORMATS = '
    {
        "dt": {"datetime": "YYYY-MM-DD?HH:MI:SS"}
    }'
)
Custom Date
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
-- Example Data:
-- {
--   "d": "(2022) Feb, 22"
-- }

LOAD INTO ki_home.date_example
FORMAT JSON
WITH OPTIONS (
    DATA SOURCE = 'confluent_ds',
    SUBSCRIBE = true,
    COLUMN FORMATS = '{"d": {"date": "\"(\"YYYY\")\" Mon, DD"}}'
)
Custom Time
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
-- Example Data:
-- {
--   "t": "18-27-59.5536"
-- }

LOAD INTO ki_home.time_example
FORMAT JSON
WITH OPTIONS (
    DATA SOURCE = 'confluent_ds',
    SUBSCRIBE = true,
    COLUMN FORMATS = '{"t": {"time": "HH-MI-SS.MS"}}'
)

Null Value Handling

By default, Kinetica will use the native null type in JSON for null values. However, if your data uses a custom string to specify null (e.g. "null") use the NULL option.

Null String
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
-- Example Data:
-- {
--   "example_null": "null"
-- }

LOAD INTO ki_home.null_example
FORMAT JSON
WITH OPTIONS (
    DATA SOURCE = 'confluent_ds',
    SUBSCRIBE = true,
    NULL = 'null'
)

Avro w/ Schema Registry

To load Avro data where the data schema is stored in a Confluent Schema Registry, follow three steps:

  1. Create credentials for both the Kafka source and the Schema Registry service.
  2. Create a data source that references the Kafka source, the Schema Registry service, and both credentials.
  3. Initiate a load that references the data source and the name of the schema to use.
Create Confluent Credentials
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE CREDENTIAL confluent_cred
TYPE = 'confluent'
WITH OPTIONS
(
    'security.protocol' = 'SASL_PLAINTEXT',
    'sasl.mechanism' = 'GSSAPI',
    'sasl.kerberos.service.name' = 'kafka',
    'sasl.kerberos.keytab' = 'kifs://security/jdoe.keytab',
    'sasl.kerberos.principal' = 'jdoe@example.com'
)
Create Schema Registry Credentials
1
2
3
4
5
6
7
8
9
CREATE CREDENTIAL confluent_sr_cred
TYPE = 'confluent'
WITH OPTIONS
(
    'security.protocol' = 'SASL_SSL',
    'sasl.mechanism' = 'PLAIN',
    'sasl.username' = 'jdoe',
    'sasl.password' = 'foobaz123'
)
Create Data Source
1
2
3
4
5
6
7
8
9
CREATE DATA SOURCE confluent_ds
LOCATION = 'CONFLUENT://example.com:9092'
WITH OPTIONS
(
    KAFKA_TOPIC_NAME = 'sample',
    CREDENTIAL = 'confluent_cred',
    SCHEMA_REGISTRY_LOCATION = 'https://example.com:8082',
    SCHEMA_REGISTRY_CREDENTIAL = 'confluent_sr_cred'
)
Initiate Load
1
2
3
4
5
6
7
8
LOAD DATA INTO example.orders
FORMAT AVRO
WITH OPTIONS
(
      DATA SOURCE = 'confluent_ds',
      SCHEMA_REGISTRY_SCHEMA_NAME = 'order_schema',
      SUBSCRIBE = TRUE
)