The following guide provides step by step instructions to get started using
Spark with Kinetica.
Spark via JDBC
SQL can be issued against Kinetica through the Spark JDBC interface. This
not only allows for data ingest/egress, but provides access to native Kinetica
functions in queries, including geospatial operations.
See Connecting via JDBC for obtaining the JDBC
driver.
JDBC queries will not be partitioned, as they were when using the
Egress Processor of the Legacy Spark Connector.
Spark Egress
The following example shows how to execute queries against Kinetica. It will
use JDBC as the read format, require the Kinetica JDBC driver to be accessible
and load it, and allow the specified query to run. The result of the query will
be loaded into a DataFrame and the schema and result set will be output to
the console.
This example makes use of the NYC taxi trip table, which can be loaded using
GAdmin from the Demo Data page, under Cluster > Demo.
-
Launch Spark Shell:
$ spark-shell --jars kinetica-jdbc-*-fullshaded.jar
-
Configure JDBC for source database and specify query for map key
dbtable;
be sure to provide an appropriate value for url for the target system, as
well as username & password, if the database is configured to
require authentication.
var url = s"jdbc:kinetica:URL=http://<db.host>:9191;CombinePrepareAndExecute=1[[;<parameter>=<value>]*]"
val username = ""
val password = ""
val options = Map(
"url" -> url,
"driver" -> "com.kinetica.jdbc.Driver",
"UID" -> username,
"PWD" -> password,
"dbtable" -> s"""(
SELECT
vendor_id,
DECIMAL(MIN(geo_miles)) AS min_geo_miles,
DECIMAL(AVG(geo_miles)) AS avg_geo_miles,
DECIMAL(MAX(geo_miles)) AS max_geo_miles
FROM
(
SELECT
vendor_id,
DECIMAL(GEODIST(pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude) * 0.000621371) AS geo_miles
FROM demo.nyctaxi
)
WHERE geo_miles BETWEEN .01 AND 100
GROUP BY vendor_id
)"""
)
-
Read queried data from Kinetica into
DataFrame:
val df = spark.read.format("jdbc").options(options).load()
-
Output
DataFrame schema for query:
-
Verify output:
root
|-- vendor_id: string (nullable = true)
|-- min_geo_miles: decimal(18,4) (nullable = true)
|-- avg_geo_miles: decimal(18,4) (nullable = true)
|-- max_geo_miles: decimal(18,4) (nullable = true)
-
Output query result set:
df.orderBy("vendor_id").show
-
Verify output:
+---------+-------------+-------------+-------------+
|vendor_id|min_geo_miles|avg_geo_miles|max_geo_miles|
+---------+-------------+-------------+-------------+
| CMT| 0.0100| 2.0952| 80.8669|
| DDS| 0.0148| 2.7350| 64.2944|
| NYC| 0.0101| 2.1548| 36.9236|
| VTS| 0.0100| 2.0584| 94.5213|
| YCAB| 0.0100| 2.1049| 36.0565|
+---------+-------------+-------------+-------------+
Spark Ingest
The following example shows how to ingest data into Kinetica. It will use
JDBC as the write format, require the Kinetica JDBC driver to be accessible
and load it, and ingest the given DataFrame into a new table.
This example makes use of the DataFrame populated in the Spark Egress
section, referencing it as df.
-
Run the Spark Egress example.
-
Update the options map, specifying the table to ingest into for map key
dbtable.
val options = Map(
"url" -> url,
"driver" -> "com.kinetica.jdbc.Driver",
"UID" -> username,
"PWD" -> password,
"dbtable" -> "demo.nyctaxi_copy"
)
-
Write data from
DataFrame into Kinetica:
df.write.format("jdbc").options(options).mode("append").save()
-
Verify the ingestion into the
demo.nyctaxi_copy table using GAdmin or
Workbench.
Spark Logging
Logging of Kinetica JDBC operations can be configured via the JDBC URL, by
adding a LogLevel parameter to the end of the URL. Valid log levels can be
found under JDBC Client Parameters.
For instance, to enable DEBUG logging:
jdbc:kinetica:URL=http://kineticahost:9191;CombinePrepareAndExecute=1;RowsPerFetch=20000;LogLevel=5
To enable both Scala and JDBC driver DEBUG logging, without having to modify
the JDBC URL:
Mapping Spark to Kinetica
Some Spark data types and functions may need custom mappings to Kinetica.
The following dialect snippet is a custom mapping, which maps:
- Spark’s CLOB/VARCHAR types to the Kinetica
VARCHAR type
- Spark’s BLOB type to the Kinetica
BLOB type
- Spark’s boolean type to the Kinetica
TINYINT type
- The truncate command (which does
DROP/CREATE, by default) to
Kinetica’s TRUNCATE TABLE command
import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcDialect, JdbcType}
import java.sql.Types
import java.util.Locale
import org.apache.spark.sql.types._
val KineticaDialect = new JdbcDialect {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:kinetica")
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("TEXT", java.sql.Types.VARCHAR))
case BinaryType => Some(JdbcType("VARBINARY", java.sql.Types.VARBINARY))
case BooleanType => Option(JdbcType("TINYINT", java.sql.Types.TINYINT))
case _ => None
}
override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
override def getTruncateQuery(
table: String,
cascade: Option[Boolean] = isCascadingTruncateTable
): String = { s"TRUNCATE TABLE $table" }
}
JdbcDialects.registerDialect(KineticaDialect)
After running the application code, the dialect can be unregistered as follows:
JdbcDialects.unregisterDialect(KineticaDialect)