The following guide provides step by step instructions to get started using Spark to ingest data into Kinetica. Source code for the connector can be found at:
The connector jar can be built with Maven as follows:
$ git clone https://github.com/kineticadb/kinetica-connector-spark.git
$ cd kinetica-connector-spark
$ mvn clean package
This should produce the connector jar under the target directory:
target/spark-2.2.0-kinetica-6.1.0-connector.jarThis jar should be made available to the Spark cluster. Once this is done, installation is complete.
The connector accepts Spark DataFrame and LoaderParams objects. The
LoaderParams object holds all necessary configuration to interface with
Kinetica.
The connector is case sensitive when it maps dataset column names to Kinetica
table column names. This is an existing limitation with Spark row.AS.
Another option exposed via connector is to map data to columns based on
position. If this is used, the number & order of columns must match between the
dataset and table.
The SparkKineticaTableBuilder API will extract the data types from a Spark
DataFrame and construct a table in Kinetica with the corresponding schema.
The following Spark datatypes are supported:
ByteTypeShortTypeIntegerTypeLongTypeFloatTypeDoubleTypeDecimcalTypeStringTypeBooleanType (converted to 1 or 0, integer)DateTypeTimestampTypeOnce the target table has been created, the SparkKineticaLoader can be used
to load data into it.
Each Spark DataFrame partition instantiates a Kinetica BulkInserter,
which is a native API for MPP ingest.
SparkSessionLoaderParams and initialize with appropriate connection configDataFrame with the schema that the table should matchSparkKineticaTableBuilder.KineticaMapWriter with the DataFrame
& LoaderParamsSparkSessionLoaderParams and initialize with appropriate connection configDataFrame with the data to load (the data types should match the
schema of the table being loaded)SparkKineticaLoader.KineticaWriter with the DataFrame &
LoaderParamsnullDataset partition should handle no fewer than 1-2 million recordsLoaderParams.setTablename() prior to calling
SparkKineticaLoader.KineticaWriter() for each table loadedThe following example shows how to load data into Kinetica via DataFrame.
It will first read airline data from CSV into a DataFrame, and then load the
DataFrame into Kinetica. The data file comes from the 2008 airline data
set, available here:
This example assumes the 2008.csv & Spark connector jar are located in the
/opt/gpudb/connectors/spark directory.
Launch Spark Shell:
$ spark-shell --jars /opt/gpudb/connectors/spark/spark-2.2.0-kinetica-6.1.0-connector.jar
Import libraries:
scala> import com.kinetica.spark._
Create LoaderParams instance & set properties, assigning the
hostname, username, & password appropriate for the target
database:
scala> :paste
// Entering paste mode (ctrl-D to finish)
val hostname = <KineticaHostName>;
val username = <KineticaLoginUsername>;
val password = <KineticaLoginPassword>;
val lp = new LoaderParams();
lp.setGPUdbURL("http://" + hostname + ":9191");
lp.setTablename("airline");
lp.setTableReplicated(false);
lp.setGpudbIpRegex("");
lp.setInsertSize(10000);
lp.setUpdateOnExistingPk(true);
lp.setKauth(true);
lp.setKusername(username);
lp.setKpassword(password);
lp.setThreads(4);
// Exiting paste mode, now interpreting.
Note: If authentication is not enabled, Kauth can be set to false
and the username & password not specified.
Create Dataset:
scala> :paste
// Entering paste mode (ctrl-D to finish)
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", ",")
.csv("/opt/gpudb/connectors/spark/2008.csv");
// Exiting paste mode, now interpreting.
Load data into Kinetica:
scala> SparkKineticaLoader.KineticaWriter(df, lp, true);