> ## Documentation Index
> Fetch the complete documentation index at: https://docs.kinetica.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Load Data from Confluent Kafka

> Copy and paste tutorial for loading data from Confluent Kafka

<a id="load-data-by-source-confluent" />

Loading data from Confluent Kafka can be done in three easy steps:

1. [Create a credential](/content/load_data/by_source/confluent#load-from-confluent-credential) - holds
   Kafka account authentication information
2. [Create a data source](/content/load_data/by_source/confluent#load-from-confluent-data-source) - holds
   Kafka connection information; uses the *credential* for authentication
3. [Ingest the data](/content/load_data/by_source/confluent#load-from-confluent-ingest) - Load data from
   Kafka; uses the *data source* to identify the Kafka source

<Info>
  Only Avro, delimited text, GeoJSON, & JSON are supported for Kafka
  ingest.
</Info>

<a id="load-from-confluent-credential" />

## Create Credential

To store the access details for your *data source*, first create a *credential*
with the [CREATE CREDENTIAL](/content/sql/ddl#sql-create-credential) command, and
reference it in your `CREATE DATA SOURCE` statement.  This will allow you to
store the authentication details of your connection separately.

If connecting to a Kafka topic that allows anonymous access, this step can be
skipped.

<CodeGroup>
  ```sql SSL with CA Cert theme={null}
  CREATE CREDENTIAL confluent_cred
  TYPE = 'confluent'
  WITH OPTIONS
  (
      'security.protocol' = 'SSL',
      'ssl.ca.location' = 'kifs://ssl/ca-bundle.crt'
  )
  ```

  ```sql SSL with CA Cert/Client Auth theme={null}
  CREATE CREDENTIAL confluent_cred
  TYPE = 'confluent'
  WITH OPTIONS
  (
      'security.protocol' = 'SSL',
      'ssl.ca.location' = 'kifs://ssl/ca-bundle.crt',
      'ssl.certificate.location' = 'kifs://ssl/client.pem'
  )
  ```

  ```sql SSL with Encryption theme={null}
  CREATE CREDENTIAL confluent_cred
  TYPE = 'confluent'
  WITH OPTIONS
  (
      'security.protocol' = 'SSL',
      'ssl.ca.location' = 'kifs://ssl/ca-bundle.crt',
      'ssl.certificate.location' = 'kifs://ssl/client.pem',
      'ssl.key.location' = 'kifs://ssl/client.key',
      'ssl.key.password' = 'foobaz123'
  )
  ```

  ```sql SASL theme={null}
  CREATE CREDENTIAL confluent_cred
  TYPE = 'confluent'
  WITH OPTIONS
  (
      'security.protocol' = 'SASL_SSL',
      'sasl.mechanism' = 'PLAIN',
      'sasl.username' = 'jdoe',
      'sasl.password' = 'foobaz123'
  )
  ```

  ```sql Kerberos theme={null}
  CREATE CREDENTIAL confluent_cred
  TYPE = 'confluent'
  WITH OPTIONS
  (
      'security.protocol' = 'SASL_PLAINTEXT',
      'sasl.mechanism' = 'GSSAPI',
      'sasl.kerberos.service.name' = 'kafka',
      'sasl.kerberos.keytab' = 'kifs://security/jdoe.keytab',
      'sasl.kerberos.principal' = 'jdoe@example.com'
  )
  ```

  ```sql Kerberos SSL theme={null}
  CREATE CREDENTIAL confluent_cred
  TYPE = 'confluent'
  WITH OPTIONS
  (
      'security.protocol' = 'SASL_SSL',
      'sasl.mechanism' = 'GSSAPI',
      'sasl.kerberos.service.name' = 'kafka',
      'sasl.kerberos.keytab' = 'kifs://security/jdoe.keytab',
      'sasl.kerberos.principal' = 'jdoe@example.com',
      'ssl.ca.location' = 'kifs://ssl/ca-bundle.crt',
      'ssl.certificate.location' = 'kifs://ssl/client.pem',
      'ssl.key.location' = 'kifs://ssl/client.key',
      'ssl.key.password' = 'foobaz123'
  )
  ```
</CodeGroup>

<a id="load-from-confluent-data-source" />

## Create Data Source

Next, create a *data source* using the
[CREATE DATA SOURCE](/content/sql/ddl#sql-create-data-source) command in Kinetica.
The *data source* defines how Kinetica connects to your Kafka topic.

<CodeGroup>
  ```sql Credential theme={null}
  CREATE DATA SOURCE confluent_ds
  LOCATION = 'CONFLUENT://example.com:9092'
  WITH OPTIONS
  (
      KAFKA_TOPIC_NAME = 'sample',
      CREDENTIAL = 'confluent_cred'
  )
  ```

  ```sql Credential w/ Schema Registry theme={null}
  CREATE DATA SOURCE confluent_ds
  LOCATION = 'CONFLUENT://example.com:9092'
  WITH OPTIONS
  (
      KAFKA_TOPIC_NAME = 'sample',
      CREDENTIAL = 'confluent_cred',
      SCHEMA_REGISTRY_LOCATION = 'https://example.com:8082',
      SCHEMA_REGISTRY_CREDENTIAL = 'confluent_sr_cred'
  )
  ```

  ```sql Anonymous theme={null}
  CREATE DATA SOURCE confluent_ds
  LOCATION = 'CONFLUENT://example.com:9092'
  WITH OPTIONS
  (
      KAFKA_TOPIC_NAME = 'sample'
  )
  ```
</CodeGroup>

<a id="load-from-confluent-ingest" />

## Ingest Data

To initiate the ingest of data into a Kinetica table, use the
[LOAD INTO](/content/sql/load#sql-load-into) command with the option
`SUBSCRIBE = TRUE`. This will start the stream of data from your Kafka topic
into the specified table. You can control the ingest (pause, resume, and cancel)
by using the [ALTER TABLE](/content/sql/ddl#sql-alter-table-manage-sub) command.

<CodeGroup>
  ```sql Start Ingest theme={null}
  LOAD DATA INTO example.orders
  FORMAT JSON
  WITH OPTIONS (DATA SOURCE = 'confluent_ds', SUBSCRIBE = TRUE)
  ```

  ```sql Pause Ingest theme={null}
  ALTER TABLE example.orders
  PAUSE SUBSCRIPTION confluent_ds
  ```

  ```sql Resume Ingest theme={null}
  ALTER TABLE example.orders
  RESUME SUBSCRIPTION confluent_ds
  ```

  ```sql Cancel Ingest theme={null}
  ALTER TABLE example.orders
  CANCEL SUBSCRIPTION confluent_ds
  ```
</CodeGroup>

## Considerations

The following are some common options used when loading.  For the full list of
options, see [LOAD INTO](/content/sql/load#sql-load-into).  For copy/paste examples of many of
the options, see [Loading Data](/content/snippets/load-data).

### Error Handling

Kinetica has two different
[error handling modes](/content/sql/load#sql-load-file-server-load-opt) for dealing
with erroneous data. To halt ingestion after a bad record is found, use the
`ABORT` mode. To skip erroneous records and continue the ingest, use the
`SKIP` mode.

To inspect erroneous records, you may use the
[BAD RECORD TABLE NAME](/content/sql/load#sql-load-file-server-load-opt) option.
All bad records encountered will be stored there for review. The bad records
table is limited to 10,000 records by default and may be overridden using the
[BAD RECORD TABLE LIMIT](/content/sql/load#sql-load-file-server-load-opt) option.

<CodeGroup>
  ```sql Abort theme={null}
  LOAD INTO ki_home.error_example
  FORMAT JSON
  WITH OPTIONS (
      DATA SOURCE = 'confluent_ds',
      SUBSCRIBE = true,
      ON ERROR = ABORT
  )
  ```

  ```sql Log Bad Records theme={null}
  LOAD INTO ki_home.error_example
  FORMAT JSON
  WITH OPTIONS (
      DATA SOURCE = 'confluent_ds',
      SUBSCRIBE = true,
      BAD RECORD TABLE NAME = 'error_example_invalid',
      ON ERROR = SKIP
  )
  ```
</CodeGroup>

### Load Specific Columns

In some cases, you may only want to store certain columns from your source data.
Use the [FIELDS MAPPED BY NAME(...)](/content/sql/load#sql-load-file-server-load-opt)
option, which allows you to specify the desired fields to store in Kinetica.

```sql title="Load Specific Columns Example" theme={null}
LOAD DATA INTO example.orders
FORMAT JSON
WITH OPTIONS (
    DATA SOURCE = 'confluent_ds',
    FIELDS MAPPED BY NAME(ID, Name, Product_ID, Quantity)
)
```

### DateTime Formatting

Use the  [COLUMN FORMATS](/content/sql/load#sql-load-file-server-load-opt) option
to format date and time fields into Kinetica Date, Time, and DateTime columns.
Time formats are specified using a JSON formatted string. Non-placeholder characters
must be wrapped in quotes, which must also be escaped (e.g.
`'{"dt": {"date": "\"(\"YYYY\")\" Mon, DD"}}'`). Alternatively, you can use
the `?` character as a wildcard character. Note that
Kinetica does not handle or store timezones and they will be discarded.
See the full list of supported
[date and time format codes](/content/sql/query#sql-datetime-conversion-codes).

<CodeGroup>
  ```sql ISO 8601 Timestamps theme={null}
  -- Example Data:
  -- {
  --   "dt": "2022-01-19T15:50:42Z+05:00"
  -- }

  LOAD INTO ki_home.timestamp_example
  FORMAT JSON
  WITH OPTIONS (
      DATA SOURCE = 'confluent_ds',
      SUBSCRIBE = true,
      COLUMN FORMATS = '
      {
          "dt": {"datetime": "YYYY-MM-DD?HH:MI:SS"}
      }'
  )
  ```

  ```sql Custom Date theme={null}
  -- Example Data:
  -- {
  --   "d": "(2022) Feb, 22"
  -- }

  LOAD INTO ki_home.date_example
  FORMAT JSON
  WITH OPTIONS (
      DATA SOURCE = 'confluent_ds',
      SUBSCRIBE = true,
      COLUMN FORMATS = '{"d": {"date": "\"(\"YYYY\")\" Mon, DD"}}'
  )
  ```

  ```sql Custom Time theme={null}
  -- Example Data:
  -- {
  --   "t": "18-27-59.5536"
  -- }

  LOAD INTO ki_home.time_example
  FORMAT JSON
  WITH OPTIONS (
      DATA SOURCE = 'confluent_ds',
      SUBSCRIBE = true,
      COLUMN FORMATS = '{"t": {"time": "HH-MI-SS.MS"}}'
  )
  ```
</CodeGroup>

### Null Value Handling

By default, Kinetica will use the native `null` type in JSON for null values.
However, if your data uses a custom string to specify null (e.g. `"null"`) use
the [NULL](/content/sql/load#sql-load-file-server-load-opt) option.

```sql Null String theme={null}
-- Example Data:
-- {
--   "example_null": "null"
-- }

LOAD INTO ki_home.null_example
FORMAT JSON
WITH OPTIONS (
    DATA SOURCE = 'confluent_ds',
    SUBSCRIBE = true,
    NULL = 'null'
)
```

<a id="load-from-confluent-schema-registry" />

### Avro w/ Schema Registry

To load Avro data where the data schema is stored in a Confluent Schema
Registry, follow three steps:

1. Create [credentials](/content/sql/ddl#sql-create-credential) for both the Kafka
   source and the Schema Registry service.
2. Create a [data source](/content/sql/ddl#sql-create-data-source) that references
   the Kafka source, the Schema Registry service, and both *credentials*.
3. Initiate a [load](/content/sql/load#sql-load-into) that references the
   *data source* and the name of the schema to use.

<CodeGroup>
  ```sql Create Confluent Credentials theme={null}
  CREATE CREDENTIAL confluent_cred
  TYPE = 'confluent'
  WITH OPTIONS
  (
      'security.protocol' = 'SASL_PLAINTEXT',
      'sasl.mechanism' = 'GSSAPI',
      'sasl.kerberos.service.name' = 'kafka',
      'sasl.kerberos.keytab' = 'kifs://security/jdoe.keytab',
      'sasl.kerberos.principal' = 'jdoe@example.com'
  )
  ```

  ```sql Create Schema Registry Credentials theme={null}
  CREATE CREDENTIAL confluent_sr_cred
  TYPE = 'confluent'
  WITH OPTIONS
  (
      'security.protocol' = 'SASL_SSL',
      'sasl.mechanism' = 'PLAIN',
      'sasl.username' = 'jdoe',
      'sasl.password' = 'foobaz123'
  )
  ```

  ```sql Create Data Source theme={null}
  CREATE DATA SOURCE confluent_ds
  LOCATION = 'CONFLUENT://example.com:9092'
  WITH OPTIONS
  (
      KAFKA_TOPIC_NAME = 'sample',
      CREDENTIAL = 'confluent_cred',
      SCHEMA_REGISTRY_LOCATION = 'https://example.com:8082',
      SCHEMA_REGISTRY_CREDENTIAL = 'confluent_sr_cred'
  )
  ```

  ```sql Initiate Load theme={null}
  LOAD DATA INTO example.orders
  FORMAT AVRO
  WITH OPTIONS
  (
  	DATA SOURCE = 'confluent_ds',
  	SCHEMA_REGISTRY_SCHEMA_NAME = 'order_schema',
  	SUBSCRIBE = TRUE
  )
  ```
</CodeGroup>
