The following guide provides step-by-step instructions to get started integrating Kinetica with Beam.
This project is aimed to make Kinetica accessible to Beam pipelines, both for ingest and egress, meaning data can be transformed from a Kinetica table (or Kinetica view) or to a Kinetica table via a Beam pipeline.
Important
The Kinetica Beam connector currently only supports Java Beam pipelines
The two generic classes that integrate Kinetica with Beam are:
com.kinetica.beam.io.KineticaIO.Read<T>
-- A class that takes a standard
Kinetica table type class to enable reading the
data from the given table or view typecom.kinetica.beam.io.KineticaIO.Write<T>
-- A class that takes a standard
Kinetica table type class to enable writing data
to the given table typeSource code for the connector can be found in the Kinetica Beam Project on Github.
The Read<T>
class (and its respective method .<T>read()
) is used to
transform data read from a table or
doc:view <../concepts/views> in Kinetica into a Beam pipeline using a
Beam PCollection
class object. A PCollection
represents the data set
the Beam pipeline will operate on. To access the data in Kinetica, a
PCollection
backed by the Kinetica table
type class is necessary; once the PCollection
is
established, the .<T>read()
transform method is applied to the
PCollection
.
The .<T>read()
transform method is configured using the following methods
that are sourced from a Read<T>
class object:
Method Name | Required | Description |
---|---|---|
withHeadNodeURL() |
Y | The URL of the Kinetica head node (host and port) |
withUsername() |
N | Username for authentication |
withPassword() |
N | Password for authentication |
withTable() |
Y | The name of the table or view in the database |
withEntity() |
Y | The name of the table or view class |
withCoder() |
Y | A serializable coder of the table or view class |
The Write<T>
class (and its respective method .<T>write()
) is used to
transform data from a Beam pipeline (backed by a Beam PCollection
class object) and write it to a table in Kinetica.
A PCollection
represents the data set the Beam pipeline will operate on.
To access the data in the pipeline, the table in Kinetica needs
to be mapped to the pipeline using the .<T>write()
transform method.
The .<T>write()
transform method is configured using the following methods
that are sourced from a Write<T>
class object:
Method Name | Required | Description |
---|---|---|
withHeadNodeURL() |
Y | The URL of the Kinetica head node (host and port) |
withUsername() |
N | Username for authentication |
withPassword() |
N | Password for authentication |
withTable() |
Y | The name of the table in the database |
withEntity() |
Y | The name of the table class |
Important
Before attempting to use a pipeline to write to a table in Kinetica, the table must be created. It does not matter if the table is created within the pipeline code (since the Beam connector requires the Java API anyway) or before the pipeline is used.
One JAR file is produced by this project:
apache-beam-kineticaio-<ver>.jar
-- complete connector JARTo install and use the connector:
apache-beam-kineticaio-<ver>.jar
library to the target serverImportant
Because of the Kinetica Beam connector's dependence on the Kinetica table type class, your pipeline code requires access to the Kinetica Java API
This test will demonstrate using the Kinetica Beam Connector and Docker to connect several containers together and run a Beam pipeline to read data from Kinetica and write it back into Kinetica. The containers used are:
Install Docker on your machine. See the Docker website for more information.
Clone the Kinetica Beam Connector repository:
git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-connector-beam.git
Download the following software and place it in the
kinetica-connector-beam/resources
directory:
Install the Kinetica Beam Connector API locally and copy it into the
kinetica-connector-beam/resources
directory:
cd kinetica-connector-beam/api
mvn clean package
cp target/apache-beam-kineticaio-1.0.jar ../resources
Obtain a Kinetica license key. To receive a license key, contact support at support@kinetica.com.
Allocate an appropriate amount of resources to run all the testing containers simultaneously. We recommend that the following be allocated for the Docker engine:
Choose a subnet that works for your environment. For example,
172.19.0.0/16
.
Important
The selected subnet will be used later, so make note of it.
Create the Docker network bridge:
docker network create beamnet --subnet=<subnet>
Change into the kinetica-cluster
directory:
cd ../docker/kinetica-cluster
Update the template gpudb.conf
file in the resources
directory for
the following:
head_ip_address
-- head node address for Kinetica; edit the default
value to instead match your chosen subnetrank0.host
/ rank1.host
-- host address for rank 0 and 1 (head
node); edit the default value to instead match your chosen subnetrank2.host
-- host address for rank 2 (worker node); edit the default
value to match your chosen subnetlicense_key
-- license key for Kinetica; add the key you've obtained
from PrerequisitesDownload the desired Intel Kinetica RPM from repo.kinetica.com. We recommend the latest version.
Copy the Kinetica RPM into the resources
directory:
cp <downloads-dir>/gpudb-intel-license-*.ga-0.el6.x86_64.rpm resources/
Create directories for the Docker container to mount that will contain Kinetica logs:
mkdir -p mount/kinetica-head/gpudb-logs/ && mkdir -p mount/kinetica-worker/gpudb-logs/
Update runProject.sh
for the IP addresses used for the head and worker
nodes in the gpudb.conf
file and for the absolute paths to the
log directories you created:
docker run \
--name kinetica-6.2-head \
-d --privileged --rm \
--network beamnet --ip <head-node-ip-address> \
-p 8080:8080 -p 8088:8088 -p 8181:8181 -p 9001:9001 -p 9002:9002 -p 9191:9191 -p 9292:9292 \
-v kinetica-6.2-head-persist:/opt/gpudb/persist \
-v <absolute-path-to>/kinetica-connector-beam/docker/kinetica-cluster/mount/kinetica-head/gpudb-logs:/opt/gpudb/core/logs/ \
kinetica/kinetica-6.2-node
docker run \
--name kinetica-6.2-worker \
-d --privileged --rm \
--network beamnet --ip <worker-node-ip-address> \
-v kinetica-6.2-worker-persist:/opt/gpudb/persist \
-v <absolute-path-to>/kinetica-connector-beam/docker/kinetica-cluster/mount/kinetica-worker/gpudb-logs:/opt/gpudb/core/logs/ \
kinetica/kinetica-6.2-node
Build the Kinetica Docker image and then create two containers using the image:
./build.sh
./runProject.sh
Once the containers are ready, open a web browser and navigate to
localhost:8080
. Default login information is admin
/ admin
.
Proceed to the Admin page in
GAdmin and start the cluster.
If you want to see the Kinetica Beam Connector work without setting up Spark, you can use the unit tests in the Beam API JAR.
Change into the builder
directory:
cd ../builder
Create a resources
directory and copy the JDK RPM and Apache Maven
zipped files into it:
mkdir resources
cp ../../resources/jdk-8u*-linux-x64.rpm resources && cp ../../resources/apache-maven-*-bin.tar.gz resources
Update the Dockerfile
environment variables for the downloaded JDK and
Apache Maven versions:
ENV PATH=$PATH:/opt/apache-maven-<version>/bin
ENV JAVA_HOME /usr/java/jdk1.8.0_<version>/
Update run.sh
for the absolute paths to the Beam API files and the
local resources
directory:
-v <absolute-path-to>/kinetica-connector-beam/api:/usr/local/beam/api \
...
-v <absolute-path-to>/kinetica-connector-beam/docker/builder/resources:/usr/local/beam/resources \
Build the Beam builder image and then create a container using the image:
./build.sh
./run.sh
Note
This will open a bash prompt inside the container.
Verify the container can access the Kinetica head node container:
curl http://kinetica-6.2-head:9191
Note
If the command was successful, the following message will be returned:
Kinetica is running!
Once at the command prompt inside the Beam builder container, change into
the api
directory and run the unit tests:
cd api/
mvn verify
Login to GAdmin and verify the
scientists
table exists.
Exit the container:
exit
Change into the sparkCluster
directory:
cd ../sparkCluster
Create a resources
directory and copy the JDK RPM and Apache Spark
zipped files into it:
mkdir resources
cp ../../resources/jdk-8u*-linux-x64.rpm resources && cp ../../resources/spark-*-bin-hadoop2.7.tgz resources
Create a directory for the Docker container to mount that will contain Spark logs:
mkdir -p mount/spark-logs
Update run.sh
for the absolute paths to the log directory you created
and for the Spark version you downloaded:
-v <absolute-path-to>/kinetica-connector-beam/docker/sparkCluster/mount/spark-logs:/opt/spark-<version>-bin-hadoop2.7/logs
Build the Spark cluster image and then create a container using the image:
./build.sh
./run.sh
Once the container is ready, open a web browser and navigate to
localhost:8082
to verify the Spark cluster is available.
Change into the edgeNode
directory:
cd ../edgeNode
Create a resources
directory and copy the JDK RPM, Apache Spark and
Apache Maven zipped files, and the Beam API JAR into it:
mkdir resources
cp ../../resources/jdk-8u*-linux-x64.rpm resources && cp ../../resources/spark-*-bin-hadoop2.7.tgz resources
cp ../../resources/apache-maven-*-bin.tar.gz resources && ../../resources/apache-beam-kineticaio-1.0.jar resources
Update the Dockerfile
environment variables for the downloaded JDK,
Apache Spark, and Apache Maven versions:
ENV PATH=$PATH:/opt/apache-maven-<version>/bin
ENV JAVA_HOME /usr/java/jdk1.8.0_<version>/
Update run.sh
for the absolute paths to the Beam Connector example
files and the local resources
directory:
-v <absolute-path-to>/kinetica-connector-beam/example/:/usr/local/beam/example-project \
...
-v <absolute-path-to>/kinetica-connector-beam/docker/edgeNode/resources:/usr/local/beam/resources \
Build the Beam edge node image and then create a container using the image:
./build.sh
./run.sh
Note
This will open a bash prompt inside the container.
Once at the command prompt inside the Beam edge node container, ping the other containers to verify connectivity:
ping kinetica-6.2-head
ping kinetica-6.2-worker
ping spark-cluster
Install the Beam API JAR into the Maven repository on the edge node container:
mvn install:install-file -Dfile='resources/apache-beam-kineticaio-1.0.jar' -DgroupId='com.kinetica' -DartifactId='apache-beam-kineticaio' -Dversion='1.0' -Dpackaging=jar
At the command prompt inside the Beam edge node container, change into
the example-project
directory and compile the Beam Connector example
JAR:
cd /usr/local/beam/example-project
mvn -Pspark-runner clean package
Back out one directory level and copy the Beam Connector example JAR into the current directory:
cd ..
cp example-project/target/apache-beam-kineticaio-example-1.0-shaded.jar .
Submit the Beam job to Spark, ensuring you replace the
kineticaPassword
if necessary:
/opt/spark-2.3.1-bin-hadoop2.7/bin/spark-submit --master spark://`getent hosts spark-cluster | cut -d ' ' -f1`:7077 \
--conf spark.executor.memory=2G --class com.kinetica.beam.example.Test1 apache-beam-kineticaio-example-1.0-shaded.jar \
--kineticaPassword=admin --kineticaTable=scientists \
--kineticaURL=http://`getent hosts kinetica-6.2-head | cut -d ' ' -f1`:9191 \
--kineticaUsername=admin --runner=SparkRunner
Exit the container:
exit
Verify the job ran via the Spark cluster UI (localhost:8082
-- there
should be two completed applications: one for the read test and one for the
write test), via the scientists
table in
GAdmin (localhost:8080
), or via application logs:
docker exec -it spark-cluster bash
cd /opt/spark-<version>-bin-hadoop2.7/work/<application-id>
If you exited the container, enter the Beam edge node container again:
kinetica-connector-beam/docker/edgeNode/run.sh
At the command prompt inside the Beam edge node container, change into
the example-project
directory and compile the Beam Connector example
JAR:
cd /usr/local/beam/example-project
mvn -Pdirect-runner clean package
Back out one directory level and copy the Beam Connector example JAR into the current directory:
cd ..
cp example-project/target/apache-beam-kineticaio-example-1.0-shaded.jar .
Submit the Beam job to the direct runner (via Java), ensuring you replace
the kineticaPassword
if necessary:
java -cp apache-beam-kineticaio-example-1.0-shaded.jar \
com.kinetica.beam.example.Test1 \
--kineticaURL="http://`getent hosts kinetica-6.2-head | cut -d ' ' -f1`:9191" \
--kineticaUsername=admin \
--kineticaPassword=admin \
--kineticaTable=scientists \
--runner=DirectRunner
Exit the container:
exit
Verify the job ran via the scientists
table in GAdmin
(localhost:8080
)