Data Sinks

A data sink is reference object for a data target that is generally external to the database. It consists of the location & connection information to that target. A data sink can make use of a credential object for storing remote authentication information.

A data sink name must adhere to the standard naming criteria. Each data sink exists within a schema and follows the standard name resolution rules for tables.

The following data sink types are supported:

  • Azure (Microsoft blob storage)
  • CData (CData Software source-specific JDBC driver; see driver list for the full list of supported JDBC drivers)
  • GCS (Google Cloud Storage)
  • HDFS (Apache Hadoop Distributed File System)
  • JDBC (Java Database Connectivity, using a user-supplied driver)
  • Kafka (Apache Kafka streaming feed)
  • Local (Table within the same Kinetica instance)
  • S3 (Amazon S3 Bucket)
  • Webhook (HTTP/HTTPS)

Note

The following default hosts are used for Azure, GCS, & S3, but can be overridden in the destination parameter:

  • Azure: <service_account_name>.blob.core.windows.net
  • GCS: storage.googleapis.com
  • S3: <region>.amazonaws.com

Data sinks perform no function by themselves, but act as proxies for transmitting data when referenced as a destination in the creation of a table monitor (see also the CREATE STREAM command in SQL).

Note

  • CData data sinks can use a JDBC credential for authentication.
  • Kafka data sinks will be validated upon creation, by default, and will fail to be created if an authorized connection cannot be established.

Managing Data Sinks

A data sink can be managed using the following API endpoint calls. For managing data sinks in SQL, see CREATE DATA SINK.

API CallDescription
/create/datasinkCreates a data sink, given a location and connection information
/alter/datasinkModifies the properties of a data sink, validating the new connection
/drop/datasinkRemoves the data sink reference from the database; optionally removing all dependent table monitors as well
/show/datasinkOutputs the data sink properties
/grant/permissionGrants the permission for a user to connect to a data sink
/revoke/permissionRevokes the permission for a user to connect to a data sink

Creating a Data Sink

To create a data sink, kin_dsink, that targets Apache Kafka, in Python:

1
2
3
4
5
6
7
8
kinetica.create_datasink(
    name = 'kin_dsink',
    destination = 'kafka://kafka.abc.com:9092',
    options = {
        'credential': 'kafka_credential',
        'kafka_topic_name': 'kafka_topic'
    }
)

To create a data sink that targets a local database table, in Python:

1
2
3
4
kinetica.create_datasink(
    name = 'kin_dsink',
    destination = 'table://example.ds_employee_backup'
)

Consumer-Specific Syntax

Several authentication schemes across multiple providers are supported.


Azure

Credential
1
2
3
4
5
6
7
8
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'azure[://<host>]',
    options = {
        'credential': '[<credential schema name>.]<credential name>',
        'azure_container_name': '<azure container name>'
    }
)
Managed Credentials
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'azure[://<host>]',
    options = {
        'use_managed_credentials': 'true',
        'azure_storage_account_name': '<azure storage account name>',
        'azure_container_name': '<azure container name>',
        'azure_tenant_id': '<azure tenant id>'
    }
)

CData

Credential
1
2
3
4
5
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = '<cdata jdbc url>',
    options = {'credential': '[<credential schema name>.]<credential name>'}
)
Password in URL
1
2
3
4
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = '<cdata jdbc url with username/password>'
)

GCS

Credential
1
2
3
4
5
6
7
8
9
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'gcs[://<host>]',
    options = {
        'credential': '[<credential schema name>.]<credential name>',
        ['gcs_project_id': '<gcs project id>',]
        'gcs_bucket_name': '<gcs bucket name>'
    }
)
Managed Credentials
1
2
3
4
5
6
7
8
9
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'gcs[://<host>]',
    options = {
        'use_managed_credentials': 'true',
        ['gcs_project_id': '<gcs project id>',]
        'gcs_bucket_name': '<gcs bucket name>'
    }
)
Public (No Auth)
1
2
3
4
5
6
7
8
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'gcs[://<host>]',
    options = {
        ['gcs_project_id': '<gcs project id>',]
        'gcs_bucket_name': '<gcs bucket name>'
    }
)
JSON Key
1
2
3
4
5
6
7
8
9
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'gcs[://<host>]',
    options = {
        'gcs_service_account_keys': '<gcs account json key text>',
        ['gcs_project_id': '<gcs project id>',]
        'gcs_bucket_name': '<gcs bucket name>'
    }
)

HDFS

Credential
1
2
3
4
5
6
7
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'hdfs://<host>:<port>',
    options = {
        'credential': '[<credential schema name>.]<credential name>'
    }
)

JDBC

Credential
1
2
3
4
5
6
7
8
9
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = '<jdbc url>',
    options = {
        ['jdbc_driver_class_name': '<jdbc driver class full path>',]
        ['jdbc_driver_jar_path': 'kifs://<jdbc driver jar path>',]
        'credential': '[<credential schema name>.]<credential name>'
    }
)
Password in URL
1
2
3
4
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = '<jdbc url with username/password>'
)

Kafka

Credential
1
2
3
4
5
6
7
8
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'kafka://<kafka.host>:<kafka.port>',
    options = {
        'credential': '[<credential schema name>.]<credential name>',
        'kafka_topic_name': '<kafka topic name>'
    }
)
Public (No Auth)
1
2
3
4
5
6
7
kinetica.create_datasink(
    name = '[<schema name>.]<data sink name>',
    destination = 'kafka://<kafka.host>:<kafka.port>',
    options = {
        'kafka_topic_name': '<kafka topic name>'
    }
)

Local (Kinetica)

User Auth
1
2
3
4
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'table://[<table schema name>.]<table name>'
)

S3

Credential
1
2
3
4
5
6
7
8
9
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 's3[://<host>]',
    options = {
        'credential': '[<credential schema name>.]<credential name>',
        's3_bucket_name': '<aws s3 bucket name>',
        's3_region': '<aws s3 region>'
    }
)
Managed Credentials
1
2
3
4
5
6
7
8
9
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 's3[://<host>]',
    options = {
        'use_managed_credentials': 'true',
        's3_bucket_name': '<aws s3 bucket name>',
        's3_region': '<aws s3 region>'
    }
)
Public (No Auth)
1
2
3
4
5
6
7
8
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 's3[://<host>]',
    options = {
        's3_bucket_name': '<aws s3 bucket name>',
        's3_region': '<aws s3 region>'
    }
)

Webhook

Credential (with HTTPS)
1
2
3
4
5
6
7
kinetica.create_datasink(
    name = '[<data sink schema name>.]<data sink name>',
    destination = 'https://<webhook.host>:<webhook.port>',
    options = {
        'credential': '[<credential schema name>.]<credential name>'
    }
)
HTTP
1
2
3
4
kinetica.create_datasink(
    name = '[<schema name>.]<data sink name>',
    destination = 'http://<webhook.host>:<webhook.port>'
)