Java UDF Guide

Step-by-step guide to creating UDFs with the Java API

The following guide provides step-by-step instructions to get started writing and running UDFs in Java. This particular example is a simple distributed UDF that copies data from one table to another using a CSV configuration file to determine from which processing node to copy data. Note that only copying data from some processing nodes typically would not have "real" applications and this exercise is purely to demonstrate the many facets of the UDF API.

References

Prerequisites

The general prerequisites for using UDFs in Kinetica can be found on the User-Defined Function Implementation page.

Data Files

There are six files associated with the Java UDF tutorial. All the files can be found in the Java Tutorial Git Repo, which is cloned in the API Download and Installation section.

  • A management file (written in Java) that creates the schema, input & output tables, and creates the proc and executes it.
  • A UDF (written using the Java UDF API) that contains a table copying example.
  • A CSV input file
  • Three Project Object Model (POM) files:
    • The main POM file contained at the top-level of the project that is used to compile the two module JARs (the manager and the UDF)
    • The manager POM file contained within the manager/ sub-directory that is used to compile the manager JAR
    • The UDF POM file contained within the udf/ sub-directory that is used to compile the UDF JAR

Software

  • Java 1.7 (or greater)

    Note

    The location of java should be placed in the PATH environment variable and the JAVA_HOME should be set. If it is not, you'll need to use the full path to java executables in the relevant instructions below.

  • Maven

  • Python 2.7 (or greater) or pip

    Note

    The locations of python and pip should be placed in the PATH environment variable. If they are not, you'll need to use the full path to the python and pip executables in the relevant instructions below. Also, administrative access will most likely be required when installing the Python packages.

API Download and Installation

The Java UDF tutorial requires local access to the Java UDF tutorial repository and the Java UDF API. The native Python API must also be installed to use the UDF simulator (details found in Development).

  1. In the desired directory, run the following to download the Kinetica Java UDF tutorial repository but be sure to replace <kinetica-version> with the name of the installed Kinetica version, e.g., v7.1:

    git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-tutorial-java-udf-api.git
    
  2. In the same directory, run the following to download the Kinetica Java UDF API repository but be sure to replace <kinetica-version> with the name of the installed Kinetica version, e.g., v7.1:

    git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-udf-api-java.git
    
  3. In the same directory, run the following to download the Kinetica Python native API repository but be sure to replace <kinetica-version> with the name of the installed Kinetica version, e.g., v7.1:

    git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-api-python.git
    
  4. Change directory into the newly downloaded native Python API repository:

    cd kinetica-api-python/
    
  5. In the root directory of the repository, install the Kinetica API:

    sudo python setup.py install
    
  6. Change directory into the Java UDF API directory:

    cd ../kinetica-udf-api-java/proc-api/
    
  7. Install the Java UDF API:

    mvn clean package
    mvn install
    
  8. Change directory into the Java UDF tutorial project directory:

    cd ../../kinetica-tutorial-java-udf-api/table-copy/
    

Development

Refer to the Java UDF API Reference page to begin writing your own UDF(s), or use the UDF already provided with the Java UDF tutorial repository. The steps below outline using the UDF Simulator with the UDF included with the Java UDF tutorial repository. The UDF simulator simulates the mechanics of executeProc() without actually calling it in the database; this is useful if you want to optionally develop UDFs piece-by-piece and test incrementally, avoiding memory ramifications for the database.

Compile

The proc files must be compiled into a JAR prior to usage; the files will need to be re-compiled after making any changes to the proc code. Re-compiling this tutorial using the provided main pom.xml file will actually create two JARs: one for the proc itself and one for the manager file.

To compile the Proc & example manager files and create JARs:

mvn clean package

Important

When working on your own UDFs, ensure the the Kinetica Java UDF API is not bundled with your UDF JAR; otherwise, there could be a compilation target platform conflict with the UDF API on the Kinetica server.

POM Detail

The main POM file (at the root of the project) details:

  • Metadata and parent information that will be accessed by the child modules:
1
2
3
4
5
<artifactId>kinetica-udf-table-copy-example</artifactId>
<groupId>com.kinetica</groupId>
<version>1.0</version>
<packaging>pom</packaging>
<name>Kinetica UDF Table Copy Example</name>
  • The list of modules to compile. Modules are based on POM files within sub-directories specified using the <module> tag:
1
2
3
4
<modules>
    <module>manager</module>
    <module>udf</module>
</modules>
  • Maven Antrun plugin configuration that copies the CSV file and compiled module JARs to an output/ directory at the top-level directory of the project:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<plugin>
    <artifactId>maven-antrun-plugin</artifactId>
    <version>${maven-antrun-plugin.version}</version>
    <executions>
        <execution>
            <id>copy</id>
            <phase>package</phase>
            <configuration>
                <target>
                    <copy todir="${basedir}/output" overwrite="true" flatten="true">
                        <fileset dir="${basedir}">
                            <include name="rank_tom.csv" />
                            <include name="manager/target/*.jar" />
                            <include name="udf/target/*.jar" />
                        </fileset>
                    </copy>
                </target>
            </configuration>
            <goals>
                <goal>run</goal>
            </goals>
        </execution>
    </executions>
</plugin>

The manager POM file (contained within the manager/ sub-directory of the project) details:

  • References to the parent POM's metadata:
1
2
3
4
5
<parent>
    <groupId>com.kinetica</groupId>
    <artifactId>kinetica-udf-table-copy-example</artifactId>
    <version>1.0</version>
</parent>
  • Metadata used for naming the package and build component versions:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
<artifactId>kinetica-table-copy-manager</artifactId>
<packaging>jar</packaging>

<properties>
    <gpudb-api.version>[7.1.0.0,7.2.0.0-SNAPSHOT)</gpudb-api.version>
    <kinetica-proc-api.version>[7.1.0.0,7.2.0.0-SNAPSHOT)</kinetica-proc-api.version>
    <java.version>1.7</java.version>
    <maven-compiler-plugin.version>3.1</maven-compiler-plugin.version>
    <maven-assembly-plugin.version>2.6</maven-assembly-plugin.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
  • Outlining the Kinetica Java API as a dependency as well as which repositories to search for it in:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
<repositories>
    <repository>
        <id>gpudb-releases</id>
        <url>https://nexus.kinetica.com/repository/releases/</url>
    </repository>

    <repository>
        <id>gpudb-snapshots</id>
        <url>https://nexus.kinetica.com/repository/snapshots/</url>
    </repository>
</repositories>

<dependencies>
    <dependency>
        <groupId>com.gpudb</groupId>
        <artifactId>gpudb-api</artifactId>
        <version>${gpudb-api.version}</version>
        <type>jar</type>
    </dependency>
</dependencies>
  • Maven compiler and assembly plugin configuration that determine what and how to compile:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<build>
    <sourceDirectory>../src/main/java</sourceDirectory>

    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>${maven-compiler-plugin.version}</version>
            <configuration>
                <includes>
                    <include>**/manager/**</include>
                </includes>
                <source>${java.version}</source>
                <target>${java.version}</target>
            </configuration>
        </plugin>

        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>${maven-assembly-plugin.version}</version>
            <executions>
                <execution>
                    <id>manager</id>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>UdfTcManager</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    <finalName>kinetica-udf-table-copy-manager</finalName>
                    </configuration>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

The UDF POM file (contained within the udf/ sub-directory of the project) details:

  • References to the parent POM's metadata:
1
2
3
4
5
<parent>
    <groupId>com.kinetica</groupId>
    <artifactId>kinetica-udf-table-copy-example</artifactId>
    <version>1.0</version>
</parent>
  • Metadata used for naming the package and build component versions:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
<artifactId>kinetica-table-copy-udf</artifactId>
<packaging>jar</packaging>

<properties>
    <kinetica-proc-api.version>[7.1.0.0,7.2.0.0-SNAPSHOT)</kinetica-proc-api.version>
    <java.version>1.7</java.version>
    <maven-compiler-plugin.version>3.1</maven-compiler-plugin.version>
    <maven-assembly-plugin.version>2.6</maven-assembly-plugin.version>
    <maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
  • Outlining the Kinetica Java UDF API as a dependency:
1
2
3
4
5
6
7
<dependencies>
    <dependency>
        <groupId>com.kinetica</groupId>
        <artifactId>kinetica-proc-api</artifactId>
        <version>${kinetica-proc-api.version}</version>
    </dependency>
</dependencies>
  • Maven compiler and assembly plugin configuration that determine what and how to compile:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<build>
    <sourceDirectory>../src/main/java</sourceDirectory>

    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>${maven-compiler-plugin.version}</version>
            <configuration>
                <includes>
                    <include>**/udf/**</include>
                </includes>
                <source>${java.version}</source>
                <target>${java.version}</target>
            </configuration>
        </plugin>

        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>${maven-assembly-plugin.version}</version>
            <executions>
                <execution>
                    <id>proc</id>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.kinetica.UdfTcJavaProc</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    <finalName>kinetica-udf-table-copy-proc</finalName>
                    </configuration>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

Test

A UDF can be tested using the UDF simulator in the native Python API repository without writing anything to the database.

Note

Skip to Deployment if there is no need to test the UDF first.

  1. Run the UDF manager JAR with the init option, optionally specifying the database host and a username & password:

    java -jar output/kinetica-udf-table-copy-manager-jar-with-dependencies.jar "init" [<hostname> [<username> <password>]]
    
  2. In the native Python API directory, run the UDF simulator in execute mode with the following options to simulate running the UDF, where -i is the schema-qualified UDF input table, -o is the schema-qualified UDF output table, and -K is the Kinetica URL (using the appropriate values for your environment). Username (-U) & password (-P) can be specified, if your instance requires authentication:

    python ../../kinetica-api-python/examples/udfsim.py execute -d \
        -i [<schema>.]<input-table> -o [<schema>.]<output-table> \
        -K http://<kinetica-host>:<kinetica-port> \
        [-U <kinetica-user> -P <kinetica-pass>]
    

    For instance:

    python ../../kinetica-api-python/examples/udfsim.py execute -d \
        -i tutorial_udf_java.udf_tc_java_in_table -o tutorial_udf_java.udf_tc_java_out_table \
        -K http://127.0.0.1:9191 \
        -U admin -P admin123
    
  3. Copy & execute the export command output by the previous command; this will prepare the execution environment for simulating the UDF:

    export KINETICA_PCF=/tmp/udf-sim-control-files/kinetica-udf-sim-icf-xMGW32
    

    Important

    The export command shown above is an example of what the udfsim.py script will output--it should not be copied to the terminal in which this example is being run. Make sure to copy & execute the actual command output by udfsim.py in the previous step.

  4. Run the UDF Proc JAR:

    java -jar output/kinetica-udf-table-copy-proc-jar-with-dependencies.jar
    
  5. Run the UDF Simulator in output mode to output the results to Kinetica (use the dry run flag -d to avoid writing to Kinetica), ensuring you replace the Kinetica URL and port with the appropriate values. The results map will be returned (even if there's nothing in it) as well as the amount of records that were (or will be in the case of a dry run) added to the given output table:

    python ../../kinetica-api-python/examples/udfsim.py output \
        -K http://<kinetica-host>:<kinetica-port> \
        [-U <kinetica-user> -P <kinetica-pass>]
    

    For instance:

    python ../../kinetica-api-python/examples/udfsim.py output \
        -K http://127.0.0.1:9191 \
        -U admin -P admin123
    

    This should output the following:

    No results
    Output:
    
    tutorial_udf_java.udf_tc_java_out_table: 10000 records
    
  6. Clean the control files output by the UDF simulator:

    python ../../kinetica-api-python/examples/udfsim.py clean
    

    Important

    The clean command is only necessary if data was output to Kinetica; otherwise, the UDF simulator can be re-run as many times as desired without having to clean the output files and enter another export command.

Deployment

If satisfied after testing your UDF with the UDF simulator or if you want to see your UDF in action, the UDF can be created and executed using the official UDF endpoints: /create/proc and /execute/proc (respectively).

  1. Optionally, run the UDF manager JAR with the init option to reset the example tables, optionally specifying the database host and a username & password:

    java -jar output/kinetica-udf-table-copy-manager-jar-with-dependencies.jar "init" [<hostname> [<username> <password>]]
    
  2. Run the UDF manager JAR with the exec option, optionally specifying the database host and a username & password:

    java -jar output/kinetica-udf-table-copy-manager-jar-with-dependencies.jar "exec" [<hostname> [<username> <password>]]
    

Execution Detail

As mentioned previously, this section details a simple distributed UDF that copies data from one table to another. While the table copy UDF can run against multiple tables, the example run will use a single schema-qualified table, tutorial_udf_java.udf_tc_java_in_table, as input and a similar schema-qualified table, tutorial_udf_java.udf_tc_java_out_table, for output.

The input table will contain one int16 column (id) and two float columns (x and y). The id column will be an ordered integer field, with the first row containing 1, the second row containing 2, etc. Both float columns will contain 10,000 pairs of randomly-generated numbers:

+------+-----------+-----------+
| id   | x         | y         |
+======+===========+===========+
| 1    | 2.57434   | -3.357401 |
+------+-----------+-----------+
| 2    | 0.0996761 | 5.375546  |
+------+-----------+-----------+
| ...  | ...       | ...       |
+------+-----------+-----------+

The output table will also contain one int16 column (id) and two float columns (a and b). No data is inserted:

+------+-----------+-----------+
| id   | a         | b         |
+======+===========+===========+
|      |           |           |
+------+-----------+-----------+

The UDF will first read from a given CSV file to determine from which processing node container and processing node to copy data:

1
2
3
rank_num,tom_num
1,0
2,0

The tom_num column values refer to processing nodes that contains some of the many shards of data inside the database. The rank_num column values refer to processing node containers that hold some of the processing nodes for the database. For example, the given CSV file determines that the data from tutorial_udf_java.udf_tc_java_in_table on processing node container 1, processing node 0 and processing node container 2, processing node 0 will be copied to tutorial_udf_java.udf_tc_java_out_table.

Once the UDF is executed, a UDF instance (OS process) is spun up for each processing node to execute the given code against its assigned processing node. The UDF then determines if the processing node container/processing node pair it's currently running on matches one of the pairs of values in the CSV file. If there is a match, the UDF will loop through the input tables, match the output tables' size to the input tables', and copy the appropriate data from the input tables to the output tables. If there isn't a match, the code will complete.

Initialization (UdfTcManager.java init mode)

The init mode calls the init() method of the UdfTcManager.java file. This method will create the schema, an input type and table for the UDF to copy data from, and an output type and table to copy data to. Sample data will also be generated and placed in the input table.

To create tables using the Java API, a Type needs to be defined in the system first. The type is a class, extended from RecordObject, using annotations to describe which class instance variables are fields (i.e. columns), what type they are, and any special handling they should receive. Each field consists of a name and a data type:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static class InTable extends RecordObject
{
    @RecordObject.Column(order=0, properties = {"int16", "primary_key"})
    public Integer id;
    @RecordObject.Column(order=1)
    public Float x;
    @RecordObject.Column(order=2)
    public Float y;

    public InTable() {}

    public InTable(Integer id, Float x, Float y)
    {
        this.id = id;
        this.x = x;
        this.y = y;
    }
}

public static class OutTable extends RecordObject
{
    @RecordObject.Column(order=0, properties = {"int16", "primary_key"})
    public Integer id;
    @RecordObject.Column(order=1)
    public Float a;
    @RecordObject.Column(order=2)
    public Float b;

    public OutTable() {}

    public OutTable(Integer id, Float a, Float b)
    {
        this.id = id;
        this.a = a;
        this.b = b;
    }
}

To interact with Kinetica, you must first instantiate an object of the GPUdb class while providing the connection URL, including the host of the database server and optional username and password to use for logging in. This database object is later passed to the init() and exec() methods, which take a database object hDB:

1
2
3
4
GPUdbBase.Options options = new GPUdbBase.Options();
options.setUsername(username);
options.setPassword(password);
GPUdb kinetica = new GPUdb("http://" + host + ":9191", options);
1
2
3
4
Map<String, String> createSchemaOptions = GPUdb.options(
    CreateSchemaRequest.Options.NO_ERROR_IF_EXISTS, CreateSchemaRequest.Options.TRUE
);
hDb.createSchema(SCHEMA, createSchemaOptions);

The InTable type and table are created, but the table is removed first if it already exists. Then the table creation is verified using showTable():

1
2
3
4
5
6
hDb.clearTable(INPUT_TABLE, null, GPUdb.options("no_error_if_not_exists", "true"));
String inTableId = RecordObject.createType(InTable.class, hDb);
hDb.createTable(INPUT_TABLE, inTableId, null);
System.out.println("Input table successfully created:");
ShowTableResponse showInputTable = hDb.showTable(INPUT_TABLE, null);
System.out.println(showInputTable.getTableNames().get(0) + " with type id " + showInputTable.getTypeIds().get(0));

Next, sample data is generated and inserted into the new input table:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
ArrayList<InTable> allRecords = new ArrayList<>();
for (int i = 0; i < MAX_RECORDS; i++) {
    InTable singleRecord = new InTable();
    singleRecord.id = i;
    singleRecord.x = (float) rand.nextGaussian() * 1 + 1;
    singleRecord.y = (float) rand.nextGaussian() * 1 + 2;
    allRecords.add(singleRecord);
}
hDb.insertRecords(INPUT_TABLE, allRecords, null);
GetRecordsResponse getRecordsResponse = hDb.getRecords(INPUT_TABLE, 0, GPUdbBase.END_OF_SET, null);
System.out.println("Number of records inserted into the input table: " + getRecordsResponse.getTotalNumberOfRecords());
System.out.println();

Lastly, an OutTable type and table are created, but the table is removed first if it already exists. Then the table creation is verified using showTable():

1
2
3
4
5
6
hDb.clearTable(OUTPUT_TABLE, null, GPUdb.options("no_error_if_not_exists", "true"));
String outTableId = RecordObject.createType(OutTable.class, hDb);
hDb.createTable(OUTPUT_TABLE, outTableId, null);
System.out.println("Output table successfully created:");
ShowTableResponse showOutputTable = hDb.showTable(OUTPUT_TABLE, null);
System.out.println(showOutputTable.get(0) + " with type id " + showOutputTable.get(3));

UDF (UdfTcJavaProc.java)

First, instantiate a handle to the ProcData class:

1
ProcData procData = ProcData.get();

Initialize a boolean that will be switched to true if a rank/TOM pair-CSV file value match is found:

1
boolean foundMatch = false;

Retrieve each pair of uniquely-identifying rank/TOM pairs from the CSV file containing the list of processing nodes whose data should be copied by the UDF:

1
2
final String procRankNum = procData.getRequestInfo().get("rank_number");
final String procTomNum = procData.getRequestInfo().get("tom_number");

Then, the CSV file mentioned in Data Files is read (skipping the header):

1
2
3
Scanner scanner = new Scanner(new File("rank_tom.csv"));
scanner.nextLine();
while (scanner.hasNextLine())

Compare the rank and TOM of the current UDF instance's processing node to each rank/TOM pair in the file to determine if the current UDF instance should copy the data on its corresponding processing node:

1
2
3
4
String[] row = scanner.nextLine().split(",", -1);
final String fileRankNum = row[0];
final String fileTomNum = row[1];
if (procRankNum.equals(fileRankNum) && procTomNum.equals(fileTomNum))

For each input and output table found in the inputData and outputData objects (respectively), set the output tables' size to the input tables' size. This will allocate enough memory to copy all input records to the output table:

1
2
3
ProcData.InputTable inputTable = procData.getInputData().getTable(i);
ProcData.OutputTable outputTable = procData.getOutputData().getTable(i);
outputTable.setSize(inputTable.getSize());

For each input column in the input table(s) and for each output column in the output table(s), copy the input columns' values to the output columns:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
for (int j = 0; j < inputTable.getColumnCount(); j++)
{
    ProcData.InputColumn inputColumn = inputTable.getColumn(j);
    ProcData.OutputColumn outputColumn = outputTable.getColumn(j);

    for (long k = 0; k < inputTable.getSize(); k++)
    {
        switch (inputColumn.getType())
        {
            case BYTES: outputColumn.appendVarBytes(inputColumn.getVarBytes(k)); break;
            case CHAR1: outputColumn.appendChar(inputColumn.getChar(k)); break;
            case CHAR2: outputColumn.appendChar(inputColumn.getChar(k)); break;
            case CHAR4: outputColumn.appendChar(inputColumn.getChar(k)); break;
            case CHAR8: outputColumn.appendChar(inputColumn.getChar(k)); break;
            case CHAR16: outputColumn.appendChar(inputColumn.getChar(k)); break;
            case CHAR32: outputColumn.appendChar(inputColumn.getChar(k)); break;
            case CHAR64: outputColumn.appendChar(inputColumn.getChar(k)); break;
            case CHAR128: outputColumn.appendChar(inputColumn.getChar(k)); break;
            case CHAR256: outputColumn.appendChar(inputColumn.getChar(k)); break;
            case DATE: outputColumn.appendCalendar(inputColumn.getCalendar(k)); break;
            case DATETIME: outputColumn.appendCalendar(inputColumn.getCalendar(k)); break;
            case DECIMAL: outputColumn.appendBigDecimal(inputColumn.getBigDecimal(k)); break;
            case DOUBLE: outputColumn.appendDouble(inputColumn.getDouble(k)); break;
            case FLOAT: outputColumn.appendFloat(inputColumn.getFloat(k)); break;
            case INT: outputColumn.appendInt(inputColumn.getInt(k)); break;
            case INT8: outputColumn.appendByte(inputColumn.getByte(k)); break;
            case INT16: outputColumn.appendShort(inputColumn.getShort(k)); break;
            case IPV4: outputColumn.appendInet4Address(inputColumn.getInet4Address(k)); break;
            case LONG: outputColumn.appendLong(inputColumn.getLong(k)); break;
            case STRING: outputColumn.appendVarString(inputColumn.getVarString(k)); break;
            case TIME: outputColumn.appendCalendar(inputColumn.getCalendar(k)); break;
            case TIMESTAMP: outputColumn.appendLong(inputColumn.getLong(k)); break;
            default:
                throw new RuntimeException();
        }
    }
}

If no matches were found, finish processing:

1
2
if (!foundMatch)
    System.out.println("No rank or tom matches");

Call complete() to tell Kinetica the proc code is finished:

1
procData.complete();

Execution (UdfTcManager.java exec mode)

The exec mode calls the exec() method of the UdfTcManager.java file. This method will read files in as bytes, create a proc, and upload the files to the proc. The method will then execute the proc.

Note

As noted earlier, a database object is instantiated prior to calling the exec() method, which takes a database object hDB.

To upload the UdfTcManager.jar and rank_tom.csv files to Kinetica, they will first need to be read in as bytes and added to a file data map:

1
2
3
4
5
6
7
// String CsvFilePath = CsvFile.getAbsolutePath();
// File ProcFile = new File(PROC_JAR_FILE);
// String ProcFilePath = ProcFile.getAbsolutePath();

// For the given files, add them to a byte array and put them in a
// map
Map<String, ByteBuffer> filesMap = new HashMap<>();

After the files are placed in a data map, the distributed UdfTcJavaProc proc can be created in Kinetica and the files can be associated with it:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
for (String fileName : Arrays.asList(CSV_FILE, PROC_JAR_FILE))
{
    byte [] fileAsBytes = Files.readAllBytes(new File(fileName).toPath());
    ByteBuffer fileByteBuffer = ByteBuffer.wrap(fileAsBytes);
    filesMap.put(fileName, fileByteBuffer);
}

if (hDb.hasProc(PROC_NAME, null).getProcExists())
    hDb.deleteProc(PROC_NAME, null);

// Create the proc
System.out.println("Registering distributed proc...");
CreateProcResponse createProcResponse = hDb.createProc(
        PROC_NAME,
        "distributed",
        filesMap,
        "java",
        Arrays.asList("-jar", PROC_JAR_FILE),
        null
);

Note

The proc requires the proper command and args to be executed, in this case, the assembled command line would be:

java -jar output/kinetica-udf-table-copy-proc-jar-with-dependencies.jar

Finally, after the proc is created, it can be executed. The input table and output table created in the Initialization section are passed in here:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
System.out.println("Proc created successfully:");
System.out.println(createProcResponse);
System.out.println();

// Execute the proc
System.out.println("Executing proc...");
ExecuteProcResponse executeProcResponse = hDb.executeProc(
        PROC_NAME,
        null,
        null,
        Collections.singletonList(INPUT_TABLE),
        null,
        Collections.singletonList(OUTPUT_TABLE),
        null
);