Note
This documentation is for a prior release of Kinetica. For the latest documentation, click here.
The following guide provides step by step instructions to get started using Spark with Kinetica.
Spark via JDBC
SQL can be issued against Kinetica through the Spark JDBC interface. This not only allows for data ingest/egress, but provides access to native Kinetica functions in queries, including geospatial operations.
See Connecting via JDBC for obtaining the JDBC driver.
Note
JDBC queries will not be partitioned, as they were when using the Egress Processor of the Legacy Spark Connector.
Spark Egress
The following example shows how to execute queries against Kinetica. It will use JDBC as the read format, require the Kinetica JDBC driver to be accessible and load it, and allow the specified query to run. The result of the query will be loaded into a DataFrame and the schema and result set will be output to the console.
This example makes use of the NYC taxi trip table, which can be loaded using GAdmin from the Demo Data page, under Cluster > Demo.
Launch Spark Shell:
1
$ spark-shell --jars kinetica-jdbc-*-fullshaded.jar
Configure JDBC for source database and specify query for map key dbtable; be sure to provide an appropriate value for url for the target system, as well as username & password, if the database is configured to require authentication.
Note
If connecting over SSL, see JDBC Secure Connections for the modified URL to use.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
var url = s"jdbc:kinetica:URL=http://<db.host>:9191;CombinePrepareAndExecute=1[[;<parameter>=<value>]*]" val username = "" val password = "" val options = Map( "url" -> url, "driver" -> "com.kinetica.jdbc.Driver", "UID" -> username, "PWD" -> password, "dbtable" -> s"""( SELECT vendor_id, DECIMAL(MIN(geo_miles)) AS min_geo_miles, DECIMAL(AVG(geo_miles)) AS avg_geo_miles, DECIMAL(MAX(geo_miles)) AS max_geo_miles FROM ( SELECT vendor_id, DECIMAL(GEODIST(pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude) * 0.000621371) AS geo_miles FROM demo.nyctaxi ) WHERE geo_miles BETWEEN .01 AND 100 GROUP BY vendor_id )""" )
Read queried data from Kinetica into DataFrame:
1
val df = spark.read.format("jdbc").options(options).load()
Output DataFrame schema for query:
1
df.printSchema
Verify output:
root |-- vendor_id: string (nullable = true) |-- min_geo_miles: decimal(18,4) (nullable = true) |-- avg_geo_miles: decimal(18,4) (nullable = true) |-- max_geo_miles: decimal(18,4) (nullable = true)
Output query result set:
1
df.orderBy("vendor_id").show
Verify output:
+---------+-------------+-------------+-------------+ |vendor_id|min_geo_miles|avg_geo_miles|max_geo_miles| +---------+-------------+-------------+-------------+ | CMT| 0.0100| 2.0952| 80.8669| | DDS| 0.0148| 2.7350| 64.2944| | NYC| 0.0101| 2.1548| 36.9236| | VTS| 0.0100| 2.0584| 94.5213| | YCAB| 0.0100| 2.1049| 36.0565| +---------+-------------+-------------+-------------+
Spark Ingest
The following example shows how to ingest data into Kinetica. It will use JDBC as the write format, require the Kinetica JDBC driver to be accessible and load it, and ingest the given DataFrame into a new table.
This example makes use of the DataFrame populated in the Spark Egress section, referencing it as df.
Run the Spark Egress example.
Update the options map, specifying the table to ingest into for map key dbtable.
1 2 3 4 5 6 7
val options = Map( "url" -> url, "driver" -> "com.kinetica.jdbc.Driver", "UID" -> username, "PWD" -> password, "dbtable" -> "demo.nyctaxi_copy" )
Write data from DataFrame into Kinetica:
1
df.write.format("jdbc").options(options).mode("append").save()
Verify the ingestion into the demo.nyctaxi_copy table using GAdmin or Workbench.
Spark Logging
Logging of Kinetica JDBC operations can be configured via the JDBC URL, by adding a LogLevel parameter to the end of the URL. Valid log levels can be found under JDBC Client Parameters.
For instance, to enable DEBUG logging:
jdbc:kinetica:URL=http://kineticahost:9191;CombinePrepareAndExecute=1;RowsPerFetch=20000;LogLevel=5
To enable both Scala and JDBC driver DEBUG logging, without having to modify the JDBC URL:
sc.setLogLevel("DEBUG")
Mapping Spark to Kinetica
Some Spark data types and functions may need custom mappings to Kinetica.
The following dialect snippet is a custom mapping, which maps:
- Spark's CLOB/VARCHAR types to the Kinetica VARCHAR type
- Spark's BLOB type to the Kinetica BLOB type
- Spark's boolean type to the Kinetica TINYINT type
- The truncate command (which does DROP/CREATE, by default) to Kinetica's TRUNCATE TABLE command
|
|
After running the application code, the dialect can be unregistered as follows:
|
|