Version:

Beam Developer Manual

The following guide provides step-by-step instructions to get started integrating Kinetica with Beam.

This project is aimed to make Kinetica accessible to Beam pipelines, both for ingest and egress, meaning data can be transformed from a Kinetica table (or Kinetica view) or to a Kinetica table via a Beam pipeline.

Important

The Kinetica Beam connector currently only supports Java Beam pipelines

The two generic classes that integrate Kinetica with Beam are:

  • com.kinetica.beam.io.KineticaIO.Read<T> -- A class that takes a standard Kinetica table type class to enable reading the data from the given table or view type
  • com.kinetica.beam.io.KineticaIO.Write<T> -- A class that takes a standard Kinetica table type class to enable writing data to the given table type

Source code for the connector can be found in the Kinetica Beam Project on Github.

Transforming Data from Kinetica into a Pipeline

The Read<T> class (and its respective method .<T>read()) is used to transform data read from a table or doc:view <../concepts/views> in Kinetica into a Beam pipeline using a Beam PCollection class object. A PCollection represents the data set the Beam pipeline will operate on. To access the data in Kinetica, a PCollection backed by the Kinetica table type class is necessary; once the PCollection is established, the .<T>read() transform method is applied to the PCollection.

The .<T>read() transform method is configured using the following methods that are sourced from a Read<T> class object:

Method Name Required Description
withHeadNodeURL() Y The URL of the Kinetica head node (host and port)
withUsername() N Username for authentication
withPassword() N Password for authentication
withTable() Y The name of the table or view in the database
withEntity() Y The name of the table or view class
withCoder() Y A serializable coder of the table or view class

Transforming Data from a Pipeline into Kinetica

The Write<T> class (and its respective method .<T>write()) is used to transform data from a Beam pipeline (backed by a Beam PCollection class object) and write it to a table in Kinetica. A PCollection represents the data set the Beam pipeline will operate on. To access the data in the pipeline, the table in Kinetica needs to be mapped to the pipeline using the .<T>write() transform method.

The .<T>write() transform method is configured using the following methods that are sourced from a Write<T> class object:

Method Name Required Description
withHeadNodeURL() Y The URL of the Kinetica head node (host and port)
withUsername() N Username for authentication
withPassword() N Password for authentication
withTable() Y The name of the table in the database
withEntity() Y The name of the table class

Important

Before attempting to use a pipeline to write to a table in Kinetica, the table must be created. It does not matter if the table is created within the pipeline code (since the Beam connector requires the Java API anyway) or before the pipeline is used.

Installation

One JAR file is produced by this project:

  • apache-beam-kineticaio-<ver>.jar -- complete connector JAR

To install and use the connector:

  1. Copy the apache-beam-kineticaio-<ver>.jar library to the target server
  2. Build your pipeline project against the JAR

Important

Because of the Kinetica Beam connector's dependence on the Kinetica table type class, your pipeline code requires access to the Kinetica Java API

Testing

This test will demonstrate using the Kinetica Beam Connector and Docker to connect several containers together and run a Beam pipeline to read data from Kinetica and write it back into Kinetica. The containers used are:

  • Two Kinetica containers: one head node and one worker node
  • One Spark container, containing a small cluster with one worker
  • One edge node container, running the Beam example JAR

Prerequisites

  1. Install Docker on your machine. See the Docker website for more information.

  2. Clone the Kinetica Beam Connector repository:

    git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-connector-beam.git
    
  3. Download the following software and place it in the kinetica-connector-beam/resources directory:

    • Java 8 JDK 162+
    • Apache Maven 3.5.3+
    • Apache Spark 2.3.1+
  4. Install the Kinetica Beam Connector API locally and copy it into the kinetica-connector-beam/resources directory:

    cd kinetica-connector-beam/api
    mvn clean package
    cp target/apache-beam-kineticaio-1.0.jar ../resources
    
  5. Obtain a Kinetica license key. To receive a license key, contact support at support@kinetica.com.

Docker Environment Setup

  1. Allocate an appropriate amount of resources to run all the testing containers simultaneously. We recommend that the following be allocated for the Docker engine:

    • >= 4 CPUs
    • >= 14GB of Memory
    • >= 1GB of Swap Memory
  2. Choose a subnet that works for your environment. For example, 172.19.0.0/16.

    Important

    The selected subnet will be used later, so make note of it.

  3. Create the Docker network bridge:

    docker network create beamnet --subnet=<subnet>
    

Kinetica Containers Setup

  1. Change into the kinetica-cluster directory:

    cd ../docker/kinetica-cluster
    
  2. Update the template gpudb.conf file in the resources directory for the following:

    • head_ip_address -- head node address for Kinetica; edit the default value to instead match your chosen subnet
    • rank0.host / rank1.host -- host address for rank 0 and 1 (head node); edit the default value to instead match your chosen subnet
    • rank2.host -- host address for rank 2 (worker node); edit the default value to match your chosen subnet
    • license_key -- license key for Kinetica; add the key you've obtained from Prerequisites
  3. Download the desired Intel Kinetica RPM from repo.kinetica.com. We recommend the latest version.

  4. Copy the Kinetica RPM into the resources directory:

    cp <downloads-dir>/gpudb-intel-license-*.ga-0.el6.x86_64.rpm resources/
    
  5. Create directories for the Docker container to mount that will contain Kinetica logs:

    mkdir -p mount/kinetica-head/gpudb-logs/ && mkdir -p mount/kinetica-worker/gpudb-logs/
    
  6. Update runProject.sh for the IP addresses used for the head and worker nodes in the gpudb.conf file and for the absolute paths to the log directories you created:

    docker run  \
      --name kinetica-6.2-head \
      -d --privileged --rm \
      --network beamnet --ip <head-node-ip-address> \
      -p 8080:8080 -p 8088:8088 -p 8181:8181 -p 9001:9001 -p 9002:9002 -p 9191:9191 -p 9292:9292 \
      -v kinetica-6.2-head-persist:/opt/gpudb/persist \
      -v <absolute-path-to>/kinetica-connector-beam/docker/kinetica-cluster/mount/kinetica-head/gpudb-logs:/opt/gpudb/core/logs/ \
      kinetica/kinetica-6.2-node
    
    docker run \
      --name kinetica-6.2-worker \
       -d --privileged --rm \
       --network beamnet --ip <worker-node-ip-address> \
       -v kinetica-6.2-worker-persist:/opt/gpudb/persist \
       -v <absolute-path-to>/kinetica-connector-beam/docker/kinetica-cluster/mount/kinetica-worker/gpudb-logs:/opt/gpudb/core/logs/ \
       kinetica/kinetica-6.2-node
    
  7. Build the Kinetica Docker image and then create two containers using the image:

    ./build.sh
    ./runProject.sh
    
  8. Once the containers are ready, open a web browser and navigate to localhost:8080. Default login information is admin / admin. Proceed to the Admin page in GAdmin and start the cluster.

Testing the API (Optional)

If you want to see the Kinetica Beam Connector work without setting up Spark, you can use the unit tests in the Beam API JAR.

  1. Change into the builder directory:

    cd ../builder
    
  2. Create a resources directory and copy the JDK RPM and Apache Maven zipped files into it:

    mkdir resources
    cp ../../resources/jdk-8u*-linux-x64.rpm resources && cp ../../resources/apache-maven-*-bin.tar.gz resources
    
  3. Update the Dockerfile environment variables for the downloaded JDK and Apache Maven versions:

    ENV PATH=$PATH:/opt/apache-maven-<version>/bin
    ENV JAVA_HOME /usr/java/jdk1.8.0_<version>/
    
  4. Update run.sh for the absolute paths to the Beam API files and the local resources directory:

    -v <absolute-path-to>/kinetica-connector-beam/api:/usr/local/beam/api \
    ...
    -v <absolute-path-to>/kinetica-connector-beam/docker/builder/resources:/usr/local/beam/resources \
    
  5. Build the Beam builder image and then create a container using the image:

    ./build.sh
    ./run.sh
    

    Note

    This will open a bash prompt inside the container.

  6. Verify the container can access the Kinetica head node container:

    curl http://kinetica-6.2-head:9191
    

    Note

    If the command was successful, the following message will be returned:

    Kinetica is running!
    
  7. Once at the command prompt inside the Beam builder container, change into the api directory and run the unit tests:

    cd api/
    mvn verify
    
  8. Login to GAdmin and verify the scientists table exists.

  9. Exit the container:

    exit
    

Spark Container Setup

  1. Change into the sparkCluster directory:

    cd ../sparkCluster
    
  2. Create a resources directory and copy the JDK RPM and Apache Spark zipped files into it:

    mkdir resources
    cp ../../resources/jdk-8u*-linux-x64.rpm resources && cp ../../resources/spark-*-bin-hadoop2.7.tgz resources
    
  3. Create a directory for the Docker container to mount that will contain Spark logs:

    mkdir -p mount/spark-logs
    
  4. Update run.sh for the absolute paths to the log directory you created and for the Spark version you downloaded:

    -v <absolute-path-to>/kinetica-connector-beam/docker/sparkCluster/mount/spark-logs:/opt/spark-<version>-bin-hadoop2.7/logs
    
  5. Build the Spark cluster image and then create a container using the image:

    ./build.sh
    ./run.sh
    
  6. Once the container is ready, open a web browser and navigate to localhost:8082 to verify the Spark cluster is available.

Beam Edge Node Container Setup

  1. Change into the edgeNode directory:

    cd ../edgeNode
    
  2. Create a resources directory and copy the JDK RPM, Apache Spark and Apache Maven zipped files, and the Beam API JAR into it:

    mkdir resources
    cp ../../resources/jdk-8u*-linux-x64.rpm resources && cp ../../resources/spark-*-bin-hadoop2.7.tgz resources
    cp ../../resources/apache-maven-*-bin.tar.gz resources && ../../resources/apache-beam-kineticaio-1.0.jar resources
    
  3. Update the Dockerfile environment variables for the downloaded JDK, Apache Spark, and Apache Maven versions:

    ENV PATH=$PATH:/opt/apache-maven-<version>/bin
    ENV JAVA_HOME /usr/java/jdk1.8.0_<version>/
    
  4. Update run.sh for the absolute paths to the Beam Connector example files and the local resources directory:

    -v <absolute-path-to>/kinetica-connector-beam/example/:/usr/local/beam/example-project \
    ...
    -v <absolute-path-to>/kinetica-connector-beam/docker/edgeNode/resources:/usr/local/beam/resources \
    
  5. Build the Beam edge node image and then create a container using the image:

    ./build.sh
    ./run.sh
    

    Note

    This will open a bash prompt inside the container.

  6. Once at the command prompt inside the Beam edge node container, ping the other containers to verify connectivity:

    ping kinetica-6.2-head
    ping kinetica-6.2-worker
    ping spark-cluster
    
  7. Install the Beam API JAR into the Maven repository on the edge node container:

    mvn install:install-file -Dfile='resources/apache-beam-kineticaio-1.0.jar' -DgroupId='com.kinetica' -DartifactId='apache-beam-kineticaio' -Dversion='1.0' -Dpackaging=jar
    

Running the Beam Job using Spark

  1. At the command prompt inside the Beam edge node container, change into the example-project directory and compile the Beam Connector example JAR:

    cd /usr/local/beam/example-project
    mvn -Pspark-runner clean package
    
  2. Back out one directory level and copy the Beam Connector example JAR into the current directory:

    cd ..
    cp example-project/target/apache-beam-kineticaio-example-1.0-shaded.jar .
    
  3. Submit the Beam job to Spark, ensuring you replace the kineticaPassword if necessary:

    /opt/spark-2.3.1-bin-hadoop2.7/bin/spark-submit --master spark://`getent hosts spark-cluster | cut -d ' ' -f1`:7077 \
    --conf spark.executor.memory=2G --class com.kinetica.beam.example.Test1 apache-beam-kineticaio-example-1.0-shaded.jar \
    --kineticaPassword=admin --kineticaTable=scientists \
    --kineticaURL=http://`getent hosts kinetica-6.2-head | cut -d ' ' -f1`:9191 \
    --kineticaUsername=admin --runner=SparkRunner
    
  4. Exit the container:

    exit
    
  5. Verify the job ran via the Spark cluster UI (localhost:8082 -- there should be two completed applications: one for the read test and one for the write test), via the scientists table in GAdmin (localhost:8080), or via application logs:

    docker exec -it spark-cluster bash
    cd /opt/spark-<version>-bin-hadoop2.7/work/<application-id>
    

Running the Beam Job using the Direct Runner

  1. If you exited the container, enter the Beam edge node container again:

    kinetica-connector-beam/docker/edgeNode/run.sh
    
  2. At the command prompt inside the Beam edge node container, change into the example-project directory and compile the Beam Connector example JAR:

    cd /usr/local/beam/example-project
    mvn -Pdirect-runner clean package
    
  3. Back out one directory level and copy the Beam Connector example JAR into the current directory:

    cd ..
    cp example-project/target/apache-beam-kineticaio-example-1.0-shaded.jar .
    
  4. Submit the Beam job to the direct runner (via Java), ensuring you replace the kineticaPassword if necessary:

    java -cp apache-beam-kineticaio-example-1.0-shaded.jar \
    com.kinetica.beam.example.Test1 \
    --kineticaURL="http://`getent hosts kinetica-6.2-head | cut -d ' ' -f1`:9191" \
    --kineticaUsername=admin \
    --kineticaPassword=admin \
    --kineticaTable=scientists \
    --runner=DirectRunner
    
  5. Exit the container:

    exit
    
  6. Verify the job ran via the scientists table in GAdmin (localhost:8080)