Note
This documentation is for a prior release of Kinetica. For the latest documentation, click here.
The following guide provides step by step instructions to get started using Kinetica as a data source to read from and write to. Source code for the connector can be found at:
Building the Kinetica NiFi Connector
The connector jar can be built with Maven.
- Download the connector source: - $ git clone https://github.com/kineticadb/kinetica-connector-nifi.git $ cd kinetica-connector-nifi 
- If using a version of NiFi other than 1.3.0, update the pom.xml file with the correct version of NiFi in this block: - <parent> <groupId>org.apache.nifi</groupId> <artifactId>nifi-nar-bundles</artifactId> <version>1.3.0</version> </parent>
- Build the connector jar: - $ mvn clean package 
Installing the Kinetica NiFi Connector into NiFi
Deploy the connector jar built in the previous step to the NiFi libraries directory:
$ cp nifi-GPUdbNiFi-nar/target/nifi-GPUdbNiFi-nar-1.3.0.nar <NiFiHome>/lib
Getting Streaming Data from Kinetica to JSON or CSV Files
- Drag a new Processor onto the flow- Select the GetKineticaToJSON or GetKineticaToCSV type
 
- Properties tab- Server URL: The URL of the Kinetica instance you are using. This
will be:- Format: http://<db.host>:9191
- Example: http://localhost:9191;CombinePrepareAndExecute=1;RowsPerFetch=20000
 
- Table Name: The name of the table to read from
- Table Monitor URL: The URL Kinetica will be using to forward any new
data inserted into the above table. This will be:- Format: tcp://<db.host>:9002/
- Example: tcp://localhost:9002
 
- Delimiter: For CSVs, the delimiter used in the file (e.g., comma, tab, pipe, etc.); defaults to tab
- Username: Kinetica login username
- Password: Kinetica login password
 
- Server URL: The URL of the Kinetica instance you are using. This
will be:
The output of GetKineticaToJSON is a JSON file containing the record inserted into the Kinetica table.
The output of GetKineticaToCSV is a CSV file containing the record inserted into the Kinetica table.
Saving Data to Kinetica Using NiFi Attributes
- Drag a new Processor onto the flow: - Select the PutKinetica type
 
- Settings tab: - Under Auto terminate Relationships, check the failure and success options.
 
- Properties tab: - Server URL: The URL of the Kinetica instance you are using. This will be: - Format: http://<db.host>:9191
- Example: http://localhost:9191;CombinePrepareAndExecute=1;RowsPerFetch=20000
 
- Collection Name: Set this value if you want the table created in a collection. 
- Table Name: The name of the table to write to 
- Schema: A CSV string, where each entry is: - Format: <fieldname>|<data type>[|<subtype>] 
- Example: - X|Float|data,Y|Float|data,TIMESTAMP|Long|data,TEXT|String|store_only|text_search 
 - For more details on schemas, read the Kinetica documentation. 
- Batch Size: The size of the batch to compress for efficient loading 
- Username: Kinetica login username 
- Password: Kinetica login password 
- Update on Existing PK: If a primary key (PK) is defined for a table, then there are two options for handling each new record pending insert that has a PK value matching an existing record in the target table. If set to true, the record in the target table will be updated with the new record's values; if false, the new record will be discarded; defaults to false 
- Replicate Table: If true, the target table will be replicated; if false, the table will be distributed; defaults to false 
- Date Format: The date format to use to parse values in any datetime fields (e.g., dd-MM-yyyy hh:mm:ss) 
- TimeZone: Provide the timezone if the date is not from your local timezone 
 
- Specifying data to be saved into Kinetica: - Place processors upstream from this which assigns values to user-defined attributes named <field name>, where <field name> is the name of a field in your table
- Each record written to your table will contain field values of:- the value in the attributes with names <field name> or
- the value of null if no attribute is found with that field name
 
 
Saving Data to Kinetica Using Delimited Files
- Drag a new Processor onto the flow - Select the PutKineticaFromFile type
 
- Settings tab: - Under Auto terminate Relationships, check the failure and success options.
 
- Properties tab: - Server URL: The URL of the Kinetica instance you are using. This will be: - Format: http://<db.host>:9191
- Example: http://localhost:9191;CombinePrepareAndExecute=1;RowsPerFetch=20000
 
- Collection Name: Set this value if you want the table created in a collection. 
- Table Name: The name of the table to write to 
- Schema: A CSV string, where each entry is: - Format: <fieldname>|<data type>[|<subtype>] 
- Example: - X|Float|data,Y|Float|data,TIMESTAMP|Long|data,TEXT|String|store_only|text_search 
 - For more details on schemas, read the Kinetica documentation. 
- Delimiter: The delimiter used in the file (e.g., comma, tab, pipe, etc.); defaults to , 
- Escape Character: The character used to escape other characters in the data (e.g., \); defaults to " 
- Quote Character: The character used to quote column data in the file (e.g., " or '); defaults to " 
- File Has Header: Whether the first line of the file is a header row or not; defaults to true 
- Batch Size: The size of the batch to compress for efficient loading 
- Error Handling: If true, the processor will skip rows that can't be loaded successfully (due to parse error, etc.); if false, the processor will stop loading as soon as an error occurs; defaults to true 
- Username: Kinetica login username 
- Password: Kinetica login password 
- Update on Existing PK: If a primary key (PK) is defined for a table, then there are two options for handling each new record pending insert that has a PK value matching an existing record in the target table. If set to true, the record in the target table will be updated with the new record's values; if false, the new record will be discarded; defaults to false 
- Replicate Table: If true, the target table will be replicated; if false, the table will be distributed; defaults to false 
- Date Format: The date format to use to parse values in any datetime fields (e.g., dd-MM-yyyy hh:mm:ss) 
- TimeZone: Provide the timezone if the date is not from your local timezone 
 
- Create a connector between the data source processor and the PutKineticaFromFile processor - Details tab: check the with coordinates option.
 
The input for the PutKineticaFromFile processor is a delimited file.