Load Data from Confluent Kafka
Copy and paste tutorial for loading data from Confluent Kafka
Copy and paste tutorial for loading data from Confluent Kafka
Loading data from Confluent Kafka can be done in three easy steps:
Note
Only Avro, GeoJSON, & JSON are supported for Kafka ingest.
To store the access details for your data source, first create a credential with the CREATE CREDENTIAL command, and reference it in your CREATE DATA SOURCE statement. This will allow you to store the authentication details of your connection separately.
If connecting to a Kafka topic that allows anonymous access, this step can be skipped.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Next, create a data source using the CREATE DATA SOURCE command in Kinetica. The data source defines how Kinetica connects to your Kafka topic.
|
|
|
|
|
|
To initiate the ingest of data into a Kinetica table, use the LOAD INTO command with the option SUBSCRIBE = TRUE. This will start the stream of data from your Kafka topic into the specified table. You can control the ingest (pause, resume, and cancel) by using the ALTER TABLE command.
|
|
|
|
|
|
|
|
The following are some common options used when loading. For the full list of options, see LOAD INTO. For copy/paste examples of many of the options, see Loading Data.
Kinetica has two different error handling modes for dealing with erroneous data. To halt ingestion after a bad record is found, use the ABORT mode. To skip erroneous records and continue the ingest, use the SKIP mode.
To inspect erroneous records, you may use the BAD RECORD TABLE NAME option. All bad records encountered will be stored there for review. The bad records table is limited to 10,000 records by default and may be overridden using the BAD RECORD TABLE LIMIT option.
|
|
|
|
In some cases, you may only want to store certain columns from your source data. Use the FIELDS MAPPED BY NAME(...) option, which allows you to specify the desired fields to store in Kinetica.
|
|
Use the COLUMN FORMATS option to format date and time fields into Kinetica Date, Time, and DateTime columns. Time formats are specified using a JSON formatted string. Non-placeholder characters must be wrapped in quotes, which must also be escaped (e.g. '{"dt": {"date": "\"(\"YYYY\")\" Mon, DD"}}'). Alternatively, you can use the ? character as a wildcard character. Note that Kinetica does not handle or store timezones and they will be discarded. See the full list of supported date and time format codes.
|
|
|
|
|
|
By default, Kinetica will use the native null type in JSON for null values. However, if your data uses a custom string to specify null (e.g. "null") use the NULL option.
|
|
To load Avro data where the data schema is stored in a Confluent Schema Registry, follow three steps:
|
|
|
|
|
|
|
|