A data sink is reference object for a data target that is generally external
to the database. It consists of the location & connection information to that
target. A data sink can make use of a
credential object for storing remote authentication
information.
A data sink name must adhere to the standard
naming criteria . Each data sink
exists within a schema and follows the standard
name resolution rules for tables .
The following data sink types are supported:
Azure (Microsoft blob storage) GCS (Google Cloud Storage) HDFS (Apache Hadoop Distributed File System) JDBC (Java Database Connectivity, using a user-supplied driver or one of the
drivers on the supported list ) Kafka (Apache Kafka streaming feed) Local (Table within the same Kinetica instance) S3 (Amazon S3 Bucket) Webhook (HTTP/HTTPS) Note
The following default hosts are used for Azure, GCS, & S3, but can be
overridden in the destination parameter:
Azure: <service_account_name>.blob.core.windows.net GCS: storage.googleapis.com S3: <region>.amazonaws.com Data sinks perform no function by themselves, but act as proxies for
transmitting data when referenced as a destination in the creation of a
table monitor (see also the
CREATE STREAM command in SQL).
Note
Kafka data sinks will be validated upon creation, by default, and will
fail to be created if an authorized connection cannot be established.
Managing Data Sinks A data sink can be managed using the following API endpoint calls. For
managing data sinks in SQL, see CREATE DATA SINK .
Creating a Data Sink To create a data sink , kin_dsink , that targets Apache Kafka , in
Python :
1
2
3
4
5
6
7
8
kinetica . create_datasink (
name = 'kin_dsink' ,
destination = 'kafka://kafka.abc.com:9092' ,
options = {
'credential' : 'kafka_credential' ,
'kafka_topic_name' : 'kafka_topic'
}
)
To create a data sink that targets a local database table, in Python :
1
2
3
4
kinetica . create_datasink (
name = 'kin_dsink' ,
destination = 'table://example.ds_employee_backup'
)
Consumer-Specific Syntax Several authentication schemes across multiple providers are supported.
Azure Credential
1
2
3
4
5
6
7
8
kinetica . create_datasink (
name = '[<data sink schema name>.]<data sink name>' ,
destination = 'azure[://<host>]' ,
options = {
'credential' : '[<credential schema name>.]<credential name>' ,
'azure_container_name' : '<azure container name>'
}
)
Managed Credentials
1
2
3
4
5
6
7
8
9
10
kinetica . create_datasink (
name = '[<data sink schema name>.]<data sink name>' ,
destination = 'azure[://<host>]' ,
options = {
'use_managed_credentials' : 'true' ,
'azure_storage_account_name' : '<azure storage account name>' ,
'azure_container_name' : '<azure container name>' ,
'azure_tenant_id' : '<azure tenant id>'
}
)
GCS Credential
1
2
3
4
5
6
7
8
9
kinetica . create_datasink (
name = '[<data sink schema name>.]<data sink name>' ,
destination = 'gcs[://<host>]' ,
options = {
'credential' : '[<credential schema name>.]<credential name>' ,
[ 'gcs_project_id' : '<gcs project id>' ,]
'gcs_bucket_name' : '<gcs bucket name>'
}
)
Managed Credentials
1
2
3
4
5
6
7
8
9
kinetica . create_datasink (
name = '[<data sink schema name>.]<data sink name>' ,
destination = 'gcs[://<host>]' ,
options = {
'use_managed_credentials' : 'true' ,
[ 'gcs_project_id' : '<gcs project id>' ,]
'gcs_bucket_name' : '<gcs bucket name>'
}
)
Public (No Auth)
1
2
3
4
5
6
7
8
kinetica . create_datasink (
name = '[<data sink schema name>.]<data sink name>' ,
destination = 'gcs[://<host>]' ,
options = {
[ 'gcs_project_id' : '<gcs project id>' ,]
'gcs_bucket_name' : '<gcs bucket name>'
}
)
JSON Key
1
2
3
4
5
6
7
8
9
kinetica . create_datasink (
name = '[<data sink schema name>.]<data sink name>' ,
destination = 'gcs[://<host>]' ,
options = {
'gcs_service_account_keys' : '<gcs account json key text>' ,
[ 'gcs_project_id' : '<gcs project id>' ,]
'gcs_bucket_name' : '<gcs bucket name>'
}
)
HDFS Credential
1
2
3
4
5
6
7
kinetica . create_datasink (
name = '[<data sink schema name>.]<data sink name>' ,
destination = 'hdfs://<host>:<port>' ,
options = {
'credential' : '[<credential schema name>.]<credential name>'
}
)
JDBC Credential
1
2
3
4
5
6
7
8
9
kinetica . create_datasink (
name = '[<data sink schema name>.]<data sink name>' ,
destination = '<jdbc url>' ,
options = {
[ 'jdbc_driver_class_name' : '<jdbc driver class full path>' ,]
[ 'jdbc_driver_jar_path' : 'kifs://<jdbc driver jar path>' ,]
'credential' : '[<credential schema name>.]<credential name>'
}
)
Password in URL
1
2
3
4
kinetica . create_datasink (
name = '[<data sink schema name>.]<data sink name>' ,
destination = '<jdbc url with username/password>'
)
Kafka Credential
1
2
3
4
5
6
7
8
kinetica . create_datasink (
name = '[<data sink schema name>.]<data sink name>' ,
destination = 'kafka://<kafka.host>:<kafka.port>' ,
options = {
'credential' : '[<credential schema name>.]<credential name>' ,
'kafka_topic_name' : '<kafka topic name>'
}
)
Public (No Auth)
1
2
3
4
5
6
7
kinetica . create_datasink (
name = '[<schema name>.]<data sink name>' ,
destination = 'kafka://<kafka.host>:<kafka.port>' ,
options = {
'kafka_topic_name' : '<kafka topic name>'
}
)
Local (Kinetica) User Auth
1
2
3
4
kinetica . create_datasink (
name = '[<data sink schema name>.]<data sink name>' ,
destination = 'table://[<table schema name>.]<table name>'
)
S3 Credential
1
2
3
4
5
6
7
8
9
kinetica . create_datasink (
name = '[<data sink schema name>.]<data sink name>' ,
destination = 's3[://<host>]' ,
options = {
'credential' : '[<credential schema name>.]<credential name>' ,
's3_bucket_name' : '<aws s3 bucket name>' ,
's3_region' : '<aws s3 region>'
}
)
Managed Credentials
1
2
3
4
5
6
7
8
9
kinetica . create_datasink (
name = '[<data sink schema name>.]<data sink name>' ,
destination = 's3[://<host>]' ,
options = {
'use_managed_credentials' : 'true' ,
's3_bucket_name' : '<aws s3 bucket name>' ,
's3_region' : '<aws s3 region>'
}
)
Public (No Auth)
1
2
3
4
5
6
7
8
kinetica . create_datasink (
name = '[<data sink schema name>.]<data sink name>' ,
destination = 's3[://<host>]' ,
options = {
's3_bucket_name' : '<aws s3 bucket name>' ,
's3_region' : '<aws s3 region>'
}
)
Webhook Credential (with HTTPS)
1
2
3
4
5
6
7
kinetica . create_datasink (
name = '[<data sink schema name>.]<data sink name>' ,
destination = 'https://<webhook.host>:<webhook.port>' ,
options = {
'credential' : '[<credential schema name>.]<credential name>'
}
)
HTTP
1
2
3
4
kinetica . create_datasink (
name = '[<schema name>.]<data sink name>' ,
destination = 'http://<webhook.host>:<webhook.port>'
)