The following guide provides step by step instructions to get started using Spark to ingest data into Kinetica. Source code for the connector can be found at:
The connector jar can be built with Maven as follows:
$ git clone https://github.com/kineticadb/kinetica-connector-spark.git
$ cd kinetica-connector-spark
$ mvn clean package
This should produce the connector jar under the target
directory:
target/spark-2.2.0-kinetica-6.1.0-connector.jar
This jar should be made available to the Spark cluster. Once this is done, installation is complete.
The connector accepts Spark DataFrame
and LoaderParams
objects. The
LoaderParams
object holds all necessary configuration to interface with
Kinetica.
The connector is case sensitive when it maps dataset column names to Kinetica
table column names. This is an existing limitation with Spark row.AS
.
Another option exposed via connector is to map data to columns based on
position. If this is used, the number & order of columns must match between the
dataset and table.
The SparkKineticaTableBuilder
API will extract the data types from a Spark
DataFrame
and construct a table in Kinetica with the corresponding schema.
The following Spark datatypes are supported:
ByteType
ShortType
IntegerType
LongType
FloatType
DoubleType
DecimcalType
StringType
BooleanType
(converted to 1 or 0, integer)DateType
TimestampType
Once the target table has been created, the SparkKineticaLoader
can be used
to load data into it.
Each Spark DataFrame
partition instantiates a Kinetica BulkInserter
,
which is a native API for MPP ingest.
SparkSession
LoaderParams
and initialize with appropriate connection configDataFrame
with the schema that the table should matchSparkKineticaTableBuilder.KineticaMapWriter
with the DataFrame
& LoaderParams
SparkSession
LoaderParams
and initialize with appropriate connection configDataFrame
with the data to load (the data types should match the
schema of the table being loaded)SparkKineticaLoader.KineticaWriter
with the DataFrame
&
LoaderParams
null
Dataset
partition should handle no fewer than 1-2 million recordsLoaderParams.setTablename()
prior to calling
SparkKineticaLoader.KineticaWriter()
for each table loadedThe following example shows how to load data into Kinetica via DataFrame
.
It will first read airline data from CSV into a DataFrame
, and then load the
DataFrame
into Kinetica. The data file comes from the 2008 airline data
set, available here:
This example assumes the 2008.csv
& Spark connector jar are located in the
/opt/gpudb/connectors/spark
directory.
Launch Spark Shell:
$ spark-shell --jars /opt/gpudb/connectors/spark/spark-2.2.0-kinetica-6.1.0-connector.jar
Import libraries:
scala> import com.kinetica.spark._
Create LoaderParams
instance & set properties, assigning the
hostname
, username
, & password
appropriate for the target
database:
scala> :paste
// Entering paste mode (ctrl-D to finish)
val hostname = <KineticaHostName>;
val username = <KineticaLoginUsername>;
val password = <KineticaLoginPassword>;
val lp = new LoaderParams();
lp.setGPUdbURL("http://" + hostname + ":9191");
lp.setTablename("airline");
lp.setTableReplicated(false);
lp.setGpudbIpRegex("");
lp.setInsertSize(10000);
lp.setUpdateOnExistingPk(true);
lp.setKauth(true);
lp.setKusername(username);
lp.setKpassword(password);
lp.setThreads(4);
// Exiting paste mode, now interpreting.
Note: If authentication is not enabled, Kauth
can be set to false
and the username
& password
not specified.
Create Dataset:
scala> :paste
// Entering paste mode (ctrl-D to finish)
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", ",")
.csv("/opt/gpudb/connectors/spark/2008.csv");
// Exiting paste mode, now interpreting.
Load data into Kinetica:
scala> SparkKineticaLoader.KineticaWriter(df, lp, true);