Building the Kinetica NiFi Connector
The connector jar can be built with Maven.-
Download the connector source:
-
If using a version of NiFi other than 1.3.0, update the
pom.xmlfile with the correct version of NiFi in this block: -
Build the connector jar:
Installing the Kinetica NiFi Connector into NiFi
Deploy the connector jar built in the previous step to the NiFi libraries directory:Getting Streaming Data from Kinetica to JSON or CSV Files
-
Drag a new Processor onto the flow
- Select the GetKineticaToJSON or GetKineticaToCSV type
-
Properties tab
-
Server URL: The URL of the Kinetica instance you are using. This
will be:
- Format:
http://<db.host>:9191 - Example:
http://localhost:9191;CombinePrepareAndExecute=1;RowsPerFetch=20000
- Format:
- Table Name: The name of the table to read from
-
Table Monitor URL: The URL Kinetica will be using to forward any new
data inserted into the above table. This will be:
- Format:
tcp://<db.host>:9002/ - Example:
tcp://localhost:9002
- Format:
- Delimiter: For CSVs, the delimiter used in the file (e.g., comma, tab, pipe, etc.); defaults to tab
- Username: Kinetica login username
- Password: Kinetica login password
-
Server URL: The URL of the Kinetica instance you are using. This
will be:
Saving Data to Kinetica Using NiFi Attributes
-
Drag a new Processor onto the flow:
- Select the PutKinetica type
-
Settings tab:
- Under Auto terminate Relationships, check the failure and success options.
-
Properties tab:
-
Server URL: The URL of the Kinetica instance you are using. This
will be:
- Format:
http://<db.host>:9191 - Example:
http://localhost:9191;CombinePrepareAndExecute=1;RowsPerFetch=20000
- Format:
- Collection Name: Set this value if you want the table created in a collection.
- Table Name: The name of the table to write to
-
Schema: A CSV string, where each entry is:
-
Format:
<fieldname>|<data type>[|<subtype>] -
Example:
-
Format:
- Batch Size: The size of the batch to compress for efficient loading
- Username: Kinetica login username
- Password: Kinetica login password
-
Update on Existing PK: If a primary key (PK) is defined for a table,
then there are two options for handling each new record pending insert
that has a PK value matching an existing record in the target table. If
set to
true, the record in the target table will be updated with the new record’s values; iffalse, the new record will be discarded; defaults tofalse -
Replicate Table: If
true, the target table will be replicated; iffalse, the table will be distributed; defaults tofalse -
Date Format: The date format to use to parse values in any datetime
fields (e.g.,
dd-MM-yyyy hh:mm:ss) - TimeZone: Provide the timezone if the date is not from your local timezone
-
Server URL: The URL of the Kinetica instance you are using. This
will be:
-
Specifying data to be saved into Kinetica:
-
Place processors upstream from this which assigns values to user-defined
attributes named
<field name>, where<field name>is the name of a field in your table -
Each record written to your table will contain field values of:
- the value in the attributes with names
<field name>or - the value of null if no attribute is found with that field name
- the value in the attributes with names
-
Place processors upstream from this which assigns values to user-defined
attributes named
Saving Data to Kinetica Using Delimited Files
-
Drag a new Processor onto the flow
- Select the PutKineticaFromFile type
-
Settings tab:
- Under Auto terminate Relationships, check the failure and success options.
-
Properties tab:
-
Server URL: The URL of the Kinetica instance you are using. This
will be:
- Format:
http://<db.host>:9191 - Example:
http://localhost:9191;CombinePrepareAndExecute=1;RowsPerFetch=20000
- Format:
- Collection Name: Set this value if you want the table created in a collection.
- Table Name: The name of the table to write to
-
Schema: A CSV string, where each entry is:
-
Format:
<fieldname>|<data type>[|<subtype>] -
Example:
-
Format:
-
Delimiter: The delimiter used in the file (e.g., comma, tab, pipe,
etc.); defaults to
, -
Escape Character: The character used to escape other characters in the
data (e.g.,
\); defaults to" -
Quote Character: The character used to quote column data in the file
(e.g.,
"or'); defaults to" -
File Has Header: Whether the first line of the file is a header row or
not; defaults to
true - Batch Size: The size of the batch to compress for efficient loading
-
Error Handling: If
true, the processor will skip rows that can’t be loaded successfully (due to parse error, etc.); iffalse, the processor will stop loading as soon as an error occurs; defaults totrue - Username: Kinetica login username
- Password: Kinetica login password
-
Update on Existing PK: If a primary key (PK) is defined for a table,
then there are two options for handling each new record pending insert
that has a PK value matching an existing record in the target table. If
set to
true, the record in the target table will be updated with the new record’s values; iffalse, the new record will be discarded; defaults tofalse -
Replicate Table: If
true, the target table will be replicated; iffalse, the table will be distributed; defaults tofalse -
Date Format: The date format to use to parse values in any datetime
fields (e.g.,
dd-MM-yyyy hh:mm:ss) - TimeZone: Provide the timezone if the date is not from your local timezone
-
Server URL: The URL of the Kinetica instance you are using. This
will be:
-
Create a connector between the data source processor and the
PutKineticaFromFile processor
- Details tab: check the with coordinates option.