Version:

Spark Connector Developer Manual

The following guide provides step by step instructions to get started using Spark to ingest data into Kinetica. Source code for the connector can be found at:

Build & Install

The connector jar can be built with Maven as follows:

$ git clone https://github.com/kineticadb/kinetica-connector-spark.git
$ cd kinetica-connector-spark
$ mvn clean package

This should produce the connector jar under the target directory:

  • target/spark-2.2.0-kinetica-6.1.0-connector.jar

This jar should be made available to the Spark cluster. Once this is done, installation is complete.

Architecture

The connector accepts Spark DataFrame and LoaderParams objects. The LoaderParams object holds all necessary configuration to interface with Kinetica.

The connector is case sensitive when it maps dataset column names to Kinetica table column names. This is an existing limitation with Spark row.AS. Another option exposed via connector is to map data to columns based on position. If this is used, the number & order of columns must match between the dataset and table.

The SparkKineticaTableBuilder API will extract the data types from a Spark DataFrame and construct a table in Kinetica with the corresponding schema. The following Spark datatypes are supported:

  • ByteType
  • ShortType
  • IntegerType
  • LongType
  • FloatType
  • DoubleType
  • DecimcalType
  • StringType
  • BooleanType (converted to 1 or 0, integer)
  • DateType
  • TimestampType

Once the target table has been created, the SparkKineticaLoader can be used to load data into it.

Each Spark DataFrame partition instantiates a Kinetica BulkInserter, which is a native API for MPP ingest.

Creating a Table

  1. Create a SparkSession
  2. Create a LoaderParams and initialize with appropriate connection config
  3. Create a DataFrame with the schema that the table should match
  4. Call SparkKineticaTableBuilder.KineticaMapWriter with the DataFrame & LoaderParams

Loading Data

  1. Create a SparkSession
  2. Create a LoaderParams and initialize with appropriate connection config
  3. Create a DataFrame with the data to load (the data types should match the schema of the table being loaded)
  4. Call SparkKineticaLoader.KineticaWriter with the DataFrame & LoaderParams

Usage Considerations

  • The connector does not perform any ETL transformations
  • Data types must match between Spark and Kinetica
  • For row updates, columns not present during update will be set to null
  • Each Dataset partition should handle no fewer than 1-2 million records
  • To load multiple tables, call LoaderParams.setTablename() prior to calling SparkKineticaLoader.KineticaWriter() for each table loaded
  • If LDAP/SSL is enabled, the connection string must point to the SSL URL and a valid certificate must be used

Example

The following example shows how to load data into Kinetica via DataFrame. It will first read airline data from CSV into a DataFrame, and then load the DataFrame into Kinetica. The data file comes from the 2008 airline data set, available here:

This example assumes the 2008.csv & Spark connector jar are located in the /opt/gpudb/connectors/spark directory.

  1. Launch Spark Shell:

    $ spark-shell --jars /opt/gpudb/connectors/spark/spark-2.2.0-kinetica-6.1.0-connector.jar
    
  2. Import libraries:

    scala> import com.kinetica.spark._
    
  3. Create LoaderParams instance & set properties, assigning the hostname, username, & password appropriate for the target database:

    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    val hostname = <KineticaHostName>;
    val username = <KineticaLoginUsername>;
    val password = <KineticaLoginPassword>;
    val lp = new LoaderParams();
    lp.setGPUdbURL("http://" + hostname + ":9191");
    lp.setTablename("airline");
    lp.setTableReplicated(false);
    lp.setGpudbIpRegex("");
    lp.setInsertSize(10000);
    lp.setUpdateOnExistingPk(true);
    lp.setKauth(true);
    lp.setKusername(username);
    lp.setKpassword(password);
    lp.setThreads(4);
    
    // Exiting paste mode, now interpreting.
    

    Note: If authentication is not enabled, Kauth can be set to false and the username & password not specified.

  4. Create Dataset:

    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    val df = spark.read
             .format("csv")
             .option("header", "true")
             .option("inferSchema", "true")
             .option("delimiter", ",")
             .csv("/opt/gpudb/connectors/spark/2008.csv");
    
    // Exiting paste mode, now interpreting.
    
  5. Load data into Kinetica:

    scala> SparkKineticaLoader.KineticaWriter(df, lp, true);