> ## Documentation Index
> Fetch the complete documentation index at: https://docs.kinetica.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Kinetica Spark Guide

The following guide provides step by step instructions to get started using
*Spark* with *Kinetica*.

<a id="spark-connector" />

<a id="spark-data-loader" />

<a id="spark-ingest-egress-processor" />

<a id="spark-streaming-processor" />

<a id="spark-sql" />

## Spark via JDBC

SQL can be issued against *Kinetica* through the *Spark* JDBC interface.  This
not only allows for data ingest/egress, but provides access to native *Kinetica*
functions in queries, including geospatial operations.

See [Connecting via JDBC](/content/connectors/sql_guide#jdbc) for obtaining the JDBC
driver.

<Info>
  JDBC queries will not be partitioned, as they were when using the
  *Egress Processor* of the *Legacy Spark Connector*.
</Info>

### Spark Egress

The following example shows how to execute queries against *Kinetica*.  It will
use JDBC as the read format, require the *Kinetica* JDBC driver to be accessible
and load it, and allow the specified query to run.  The result of the query will
be loaded into a `DataFrame` and the schema and result set will be output to
the console.

This example makes use of the NYC taxi trip table, which can be loaded using
*GAdmin* from the *Demo Data* page, under *Cluster* > *Demo*.

1. Launch *Spark Shell*:

   ```bash theme={null}
   $ spark-shell --jars kinetica-jdbc-*-fullshaded.jar
   ```

2. Configure JDBC for source database and specify query for map key `dbtable`;
   be sure to provide an appropriate value for `url` for the target system, as
   well as `username` & `password`, if the database is configured to
   require authentication.

   <Info>
     If connecting over SSL, see
     [JDBC Secure Connections](/content/connectors/sql_guide#jdbc-connecting-secure) for the
     modified URL to use.
   </Info>

   ```scala theme={null}
   var url = s"jdbc:kinetica:URL=http://<db.host>:9191;CombinePrepareAndExecute=1[[;<parameter>=<value>]*]"
   val username = ""
   val password = ""
   val options = Map(
      "url" -> url,
      "driver" -> "com.kinetica.jdbc.Driver",
      "UID" -> username,
      "PWD" -> password,
      "dbtable" -> s"""(
         SELECT
            vendor_id,
            DECIMAL(MIN(geo_miles)) AS min_geo_miles,
            DECIMAL(AVG(geo_miles)) AS avg_geo_miles,
            DECIMAL(MAX(geo_miles)) AS max_geo_miles
         FROM
         (
            SELECT
               vendor_id,
               DECIMAL(GEODIST(pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude) * 0.000621371) AS geo_miles
            FROM demo.nyctaxi
         )
         WHERE geo_miles BETWEEN .01 AND 100
         GROUP BY vendor_id
      )"""
   )
   ```

3. Read queried data from *Kinetica* into `DataFrame`:

   ```scala theme={null}
   val df = spark.read.format("jdbc").options(options).load()
   ```

4. Output `DataFrame` schema for query:

   ```scala theme={null}
   df.printSchema
   ```

5. Verify output:

   ```
   root
    |-- vendor_id: string (nullable = true)
    |-- min_geo_miles: decimal(18,4) (nullable = true)
    |-- avg_geo_miles: decimal(18,4) (nullable = true)
    |-- max_geo_miles: decimal(18,4) (nullable = true)
   ```

6. Output query result set:

   ```scala theme={null}
   df.orderBy("vendor_id").show
   ```

7. Verify output:

   ```
   +---------+-------------+-------------+-------------+
   |vendor_id|min_geo_miles|avg_geo_miles|max_geo_miles|
   +---------+-------------+-------------+-------------+
   |      CMT|       0.0100|       2.0952|      80.8669|
   |      DDS|       0.0148|       2.7350|      64.2944|
   |      NYC|       0.0101|       2.1548|      36.9236|
   |      VTS|       0.0100|       2.0584|      94.5213|
   |     YCAB|       0.0100|       2.1049|      36.0565|
   +---------+-------------+-------------+-------------+
   ```

### Spark Ingest

The following example shows how to ingest data into *Kinetica*.  It will use
JDBC as the write format, require the *Kinetica* JDBC driver to be accessible
and load it, and ingest the given `DataFrame` into a new table.

This example makes use of the `DataFrame` populated in the [Spark Egress](#spark-egress)
section, referencing it as `df`.

1. Run the [Spark Egress](#spark-egress) example.

2. Update the options map, specifying the table to ingest into for map key
   `dbtable`.

   ```scala theme={null}
   val options = Map(
      "url" -> url,
      "driver" -> "com.kinetica.jdbc.Driver",
      "UID" -> username,
      "PWD" -> password,
      "dbtable" -> "demo.nyctaxi_copy"
   )
   ```

3. Write data from `DataFrame` into *Kinetica*:

   ```scala theme={null}
   df.write.format("jdbc").options(options).mode("append").save()
   ```

4. Verify the ingestion into the `demo.nyctaxi_copy` table using *GAdmin* or
   *Workbench*.

### Spark Logging

Logging of *Kinetica* JDBC operations can be configured via the JDBC URL, by
adding a `LogLevel` parameter to the end of the URL.  Valid log levels can be
found under [JDBC Client Parameters](/content/connectors/sql_guide#jdbc-config-client).

For instance, to enable `DEBUG` logging:

```
jdbc:kinetica:URL=http://kineticahost:9191;CombinePrepareAndExecute=1;RowsPerFetch=20000;LogLevel=5
```

To enable both Scala and JDBC driver `DEBUG` logging, without having to modify
the JDBC URL:

```
sc.setLogLevel("DEBUG")
```

## Mapping Spark to Kinetica

Some *Spark* data types and functions may need custom mappings to *Kinetica*.

The following dialect snippet is a custom mapping, which maps:

* *Spark's* CLOB/VARCHAR types to the *Kinetica* `VARCHAR` type
* *Spark's* BLOB type to the *Kinetica* `BLOB` type
* *Spark's* boolean type to the *Kinetica* `TINYINT` type
* The truncate command (which does `DROP`/`CREATE`, by default) to
  *Kinetica's* `TRUNCATE TABLE` command

```scala theme={null}
import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcDialect, JdbcType}
import java.sql.Types
import java.util.Locale

import org.apache.spark.sql.types._

val KineticaDialect = new JdbcDialect {
    override def canHandle(url: String): Boolean =
        url.toLowerCase(Locale.ROOT).startsWith("jdbc:kinetica")
    override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
        case StringType => Some(JdbcType("TEXT", java.sql.Types.VARCHAR))
        case BinaryType => Some(JdbcType("VARBINARY", java.sql.Types.VARBINARY))
        case BooleanType => Option(JdbcType("TINYINT", java.sql.Types.TINYINT))
        case _ => None
    }
    override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
    override def getTruncateQuery(
        table: String,
        cascade: Option[Boolean] = isCascadingTruncateTable
    ): String = { s"TRUNCATE TABLE $table" }
}
JdbcDialects.registerDialect(KineticaDialect)
```

After running the application code, the dialect can be unregistered as follows:

```scala theme={null}
JdbcDialects.unregisterDialect(KineticaDialect)
```
