Kinetica supports the concept of user-defined functions (UDF) via a mechanism similar to stored procedures, being a user-defined sequence of operations on a specified data set.
- UDF Implementation -- detail about writing & running and for some examples of UDFs
- Python UDF Tutorial -- example UDF workflow in Python
- Java UDF Tutorial -- example UDF workflow in Java
In Kinetica, there are two types of UDFs:
- distributed - the procedure will be invoked within the database, executing in parallel against each data shard of the specified tables. When distributed, there will be one OS process per processing node in Kinetica
- non-distributed - the procedure will be invoked externally to the database, making use of the existing Kinetica APIs. When non-distributed there will only be a single OS process
A distributed UDF can be passed zero or more input tables and write to zero or more output tables. Each invocation will only be passed the subset of data within that shard, and any output will be written to that same shard. This per-shard execution therefore requires that any output tables that are sharded on a particular key match the shard key of the input tables. If the procedure is doing any calculations, it is important that the data be sharded in such a way that the calculations can be meaningfully performed within the sharded data set. There are three important facets of output tables to note when using a distributed UDF:
- If an output table doesn't exist, it will be created to match the schema of its paired input table; e.g., if a UDF is given five input/output table pairs and the 3rd output table doesn't exist, it will be created to match the schema of the 3rd input table; the primary keys & shard keys of the input table will not be transferred to the output table
- Output tables are not cleared of records by the UDF; existing records will remain untouched
- Output tables can only be appended to; existing output table records will not be available to the UDF, unless the output table is also passed in as an input table, and even then, will not be able to be updated by the UDF
A non-distributed UDF can not currently directly access any table data, but it can use the regular Kinetica APIs to make local calls to the database. Due to its running outside of the database, a connection URL and authentication credentials are required to be passed to it at runtime.
Either type of UDF can also be passed a map of parameters with string or binary values and can return a map of results with string or binary values. For distributed UDFs, each invocation returns its own set of results, making the client responsible for merging them together, if required. These maps can be used to pass around limited bits of data or control information that isn’t table-related.
Control information and table data are passed in and out of the UDF executable using memory-mapped files. In theory, a UDF can be written in any language that can access those files. It’s also possible to have a UDF be a shell script or helper that runs another program that can process the files. The UDF is passed (via environment variable) the path to a primary control file that contains all the necessary parameters and references to other memory-mapped files, as required. The UDF C++ API parses all of this information and wraps the files in a way that is easy to use.
When a UDF is executed via the /execute/proc endpoint, any input table data can be cached for use in a subsequent call to /execute/proc. When data is copied out of the source table(s) into memory-mapped files, those files can be preserved when the UDF completes, thereby making them available for reuse. This facet of UDF saves on execution time if multiple UDFs need to be called with the same or similar inputs. There are options passed to /execute/proc that control this caching and the use of cached input tables.
A distributed UDF can read data through local storage shared among all nodes in the cluster or KiFS.