Data Sinks

A data sink is reference object for a data target that is generally external to the database. It consists of the location & connection information to that target. A data sink can make use of a credential object for storing remote authentication information.

A data sink name must adhere to the standard naming criteria. Each data sink exists within a schema and follows the standard name resolution rules for tables.

The following data sink types are supported:

  • Azure (Microsoft blob storage)
  • GCS (Google Cloud Storage)
  • HDFS (Apache Hadoop Distributed File System)
  • JDBC (Java Database Connectivity, using a user-supplied driver or one of the drivers on the supported list)
  • Kafka (Apache Kafka streaming feed)
  • Local (Table within the same Kinetica instance)
  • S3 (Amazon S3 Bucket)
  • Webhook (HTTP/HTTPS)

Note

The following default hosts are used for Azure, GCS, & S3, but can be overridden in the destination parameter:

  • Azure: <service_account_name>.blob.core.windows.net
  • GCS: storage.googleapis.com
  • S3: <region>.amazonaws.com

Data sinks perform no function by themselves, but act as proxies for transmitting data when referenced as a destination in the creation of a table monitor (see also the CREATE STREAM command in SQL).

Note

Kafka data sinks will be validated upon creation, by default, and will fail to be created if an authorized connection cannot be established.


Managing Data Sinks

A data sink can be managed using the following API endpoint calls. For managing data sinks in SQL, see CREATE DATA SINK.

API CallDescription
/create/datasinkCreates a data sink, given a location and connection information
/alter/datasinkModifies the properties of a data sink, validating the new connection
/drop/datasinkRemoves the data sink reference from the database; optionally removing all dependent table monitors as well
/show/datasinkOutputs the data sink properties
/grant/permissionGrants the permission for a user to connect to a data sink
/revoke/permissionRevokes the permission for a user to connect to a data sink

Creating a Data Sink

To create a data sink, kin_dsink, that targets Apache Kafka, in Python:

1
2
3
4
5
6
7
8
kinetica.create_datasink(
    name = 'kin_dsink',
    destination = 'kafka://kafka.abc.com:9092',
    options = {
        'credential': 'kafka_credential',
        'kafka_topic_name': 'kafka_topic'
    }
)

To create a data sink that targets a local database table, in Python:

1
2
3
4
kinetica.create_datasink(
    name = 'kin_dsink',
    destination = 'table://example.ds_employee_backup'
)

Consumer-Specific Syntax

Several authentication schemes across multiple providers are supported.


Azure

Credential
1
2
3
4
5
6
7
8
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'azure[://<host>]',
    options = {
        'credential': '[<credential schema name>.]<credential name>',
        'azure_container_name': '<azure container name>'
    }
)
Managed Credentials
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'azure[://<host>]',
    options = {
        'use_managed_credentials': 'true',
        'azure_storage_account_name': '<azure storage account name>',
        'azure_container_name': '<azure container name>',
        'azure_tenant_id': '<azure tenant id>'
    }
)

GCS

Credential
1
2
3
4
5
6
7
8
9
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'gcs[://<host>]',
    options = {
        'credential': '[<credential schema name>.]<credential name>',
        ['gcs_project_id': '<gcs project id>',]
        'gcs_bucket_name': '<gcs bucket name>'
    }
)
Managed Credentials
1
2
3
4
5
6
7
8
9
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'gcs[://<host>]',
    options = {
        'use_managed_credentials': 'true',
        ['gcs_project_id': '<gcs project id>',]
        'gcs_bucket_name': '<gcs bucket name>'
    }
)
Public (No Auth)
1
2
3
4
5
6
7
8
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'gcs[://<host>]',
    options = {
        ['gcs_project_id': '<gcs project id>',]
        'gcs_bucket_name': '<gcs bucket name>'
    }
)
JSON Key
1
2
3
4
5
6
7
8
9
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'gcs[://<host>]',
    options = {
        'gcs_service_account_keys': '<gcs account json key text>',
        ['gcs_project_id': '<gcs project id>',]
        'gcs_bucket_name': '<gcs bucket name>'
    }
)

HDFS

Credential
1
2
3
4
5
6
7
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'hdfs://<host>:<port>',
    options = {
        'credential': '[<credential schema name>.]<credential name>'
    }
)

JDBC

Credential
1
2
3
4
5
6
7
8
9
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = '<jdbc url>',
    options = {
        ['jdbc_driver_class_name': '<jdbc driver class full path>',]
        ['jdbc_driver_jar_path': 'kifs://<jdbc driver jar path>',]
        'credential': '[<credential schema name>.]<credential name>'
    }
)
Password in URL
1
2
3
4
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = '<jdbc url with username/password>'
)

Kafka

Credential
1
2
3
4
5
6
7
8
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'kafka://<kafka.host>:<kafka.port>',
    options = {
        'credential': '[<credential schema name>.]<credential name>',
        'kafka_topic_name': '<kafka topic name>'
    }
)
Public (No Auth)
1
2
3
4
5
6
7
kinetica.create_datasink(
    name = '[<schema name>.]<data sink name>',
    destination = 'kafka://<kafka.host>:<kafka.port>',
    options = {
        'kafka_topic_name': '<kafka topic name>'
    }
)

Local (Kinetica)

User Auth
1
2
3
4
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'table://[<table schema name>.]<table name>'
)

S3

Credential
1
2
3
4
5
6
7
8
9
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 's3[://<host>]',
    options = {
        'credential': '[<credential schema name>.]<credential name>',
        's3_bucket_name': '<aws s3 bucket name>',
        's3_region': '<aws s3 region>'
    }
)
Managed Credentials
1
2
3
4
5
6
7
8
9
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 's3[://<host>]',
    options = {
        'use_managed_credentials': 'true',
        's3_bucket_name': '<aws s3 bucket name>',
        's3_region': '<aws s3 region>'
    }
)
Public (No Auth)
1
2
3
4
5
6
7
8
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 's3[://<host>]',
    options = {
        's3_bucket_name': '<aws s3 bucket name>',
        's3_region': '<aws s3 region>'
    }
)

Webhook

Credential (with HTTPS)
1
2
3
4
5
6
7
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'https://<webhook.host>:<webhook.port>',
    options = {
        'credential': '[<credential schema name>.]<credential name>'
    }
)
HTTP
1
2
3
4
kinetica.create_datasink(
    name = '[<schema name>.]<data sink name>',
    destination = 'http://<webhook.host>:<webhook.port>'
)