Multi-Head Egress

Multi-Head Egress (Distributed Key Lookup) is a mechanism that allows sharded data to be retrieved directly from cluster nodes, bypassing the overhead of pulling the data through the head node from the cluster nodes. This greatly increases the speed of retrieval by spreading the network traffic across multiple nodes.

Operationally, the egress mechanism is a lookup of a given shard key value in a given table or view, filtering out any records that don't match an optional expression.

API Support

LanguageMulti-head Egress Mechanism
C++RecordRetriever
C#RecordRetriever
JavaRecordRetriever
JavascriptX
Node.jsX
PythonRecordRetriever
RESTX
SQLUsing KI_HINT_KEY_LOOKUP hint in a query; see SQL below for details

Configuration

Server-Side

In order for the cluster nodes to transmit data directly to an egress client, the configuration on each node needs to be set to allow the incoming HTTP requests for that data. The /opt/gpudb/core/etc/gpudb.conf file needs to have the following property set for multi-head egress to work properly:

# Enable worker HTTP servers...
enable_worker_http_servers = true

Each node can be configured with a public URL for the API to use to connect to it, in this section of the gpudb.conf:

# Optionally, specify a public URL for each worker HTTP server that clients
# should use to connect for multi-head operations.
#
# NOTE: If specified for any ranks, a public URL must be specified for all
# ranks. Cannot be used in conjunction with nplus1.

rank0.public_url = http://192.168.0.10:9191
rank1.public_url = http://192.168.0.10:9192
rank2.public_url = http://192.168.0.11:9193
rank3.public_url = http://192.168.0.11:9194

If a public_url is not defined, each node can be connected to on any of its available interfaces, taking HA and HTTPD configurations, as well as any general network security restrictions into account.

Client-Side

With the exception of the Python background multi-head process, the multi-head egress object of each API requires a list of worker nodes to use to distribute the retrieval requests, with one entry in the list for each node/process. This list can be auto-populated simply by using a GPUdb connection object, which can retrieve the list of available cluster nodes from the database itself. Below is a code snippet showing an automatically populated worker list and subsequent creation of a multi-head egress object with it:

Java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import com.gpudb.GPUdb;
import com.gpudb.WorkerList;
import com.gpudb.GenericRecord;
import com.gpudb.RecordRetriever;

GPUdb db = new GPUdb("http://<db.host>:9191");
WorkerList workers = new WorkerList(db);

//Multi-head Egress Object Instantiation
RecordRetriever<GenericRecord> recordRetriever =
    new RecordRetriever<>(db, tableName, recordType, workers);
Python
1
2
3
4
5
6
7
8
from gpudb import GPUdb
from gpudb_multihead_io import GPUdbWorkerList, RecordRetriever

db = GPUdb(host=["http://<db.host>:9191"])
workers = GPUdbWorkerList(db)

# Multi-head Egress Object Instantiation
bulk_retriever = RecordRetriever( db, table_name, record_type, workers )

Note that in cases where no public_url was configured on each server node, workers may have more than one IP address, not all of which may be accessible to the client. The API worker list constructor uses the first IP in the list returned by the server for each worker, by default. To override this behavior, a regular expression Pattern or prefix String can be used to match the correct worker IP addresses:

Java
1
2
3
4
//Match 172.X.Y.Z addresses
WorkerList(db, Pattern.compile("172\\..*"));
//or
WorkerList(db, "172.");
Python
1
GPUdbWorkerList(db, "172.")

Considerations

There are several factors to consider when using multi-head egress:

  • Only primary key/shard key value lookups are allowed for sharded tables; arbitrary queries are not supported.
  • Null lookup values are not supported.
  • Lookup values cannot be expression functions.
  • The shard key and any columns used in the expression must all be indexed.
  • There is an implicit index for a primary key, but queries that make use of this index for the lookup cannot have any other columns in the expression.
  • If column indexes exist on all columns being filtered, additional indexed columns can be added to the lookup expression using either equality or inequality relations.

SQL

SQL has wide support for key lookups. The following configurations will result in successful lookups:

  • Specifying only the primary key columns/values of any table with a primary key
  • Specifying only the shard key columns/values of a sharded table with attribute indexes on each of the shard key columns
  • Specifying any indexed columns of a replicated table

Note

As noted under Considerations, if all key lookup columns have column indexes, additional indexed columns can be added to the filter. See below for an example.

The KI_HINT_KEY_LOOKUP hint is used to request a fast key lookup. If the table configuration and query given aren't a match for the key lookup, the query will be executed as any other query would--without the fast key lookup. A warning will be returned that the key lookup was not possible, but the query will still be executed and results returned.

Syntax

The following simple query syntax is supported for key lookups:

SQL Key Lookup Syntax
1
2
3
4
SELECT * /* KI_HINT_KEY_LOOKUP */
FROM [<schema name>.]<table_name>
WHERE <key_column_1> = <key_value_1> [... AND <key_column_N> = <key_value_N>]
[AND <indexed_column_1> [=] <indexed_value_1> [... AND <indexed_column_N> = <indexed_value_N>]]

Examples

To look up a particular record in a sharded table with an explicit shard key on column id:

Sharded Table with Shard Key Lookup Example
1
2
3
SELECT *  /* KI_HINT_KEY_LOOKUP */
FROM product_sk
WHERE id = 1;

The sharded example above can have an equality or inequality expression added to it to further filter the results, as long as the additional column, stock, has an attribute index.

Sharded Table with Shard Key Lookup and Additional Filter Example
1
2
3
SELECT * /* KI_HINT_KEY_LOOKUP */
FROM product_sk
WHERE id = 2 AND stock > 0;

To look up records matching a given category in a replicated table with attribute indexes on columns id & category:

Replicated Table with Indexed Columns Lookup Example
1
2
3
SELECT *  /* KI_HINT_KEY_LOOKUP */
FROM product_replicated
WHERE id = 2 AND category = 'Furniture';

Python

In the Python API, the GPUdbTable constructor has a use_multihead_io parameter, which allows a GPUdbTable object to handle all RecordRetriever interactions with the associated table in the background. The following is a Python API example that demonstrates the use of the background RecordRetriever for retrieving data.

Important

This example relies on the stocks data set, which can be imported into Kinetica via GAdmin. That data will be copied to two new tables and have indexes applied, as necessary.

Setup

First, connect to the database; here, the cluster head node URL is passed in as a parameter to the Python API call:

1
kinetica = gpudb.GPUdb(host = [args.url], username = args.username, password = args.password)

Next, prepare the target table used by the single-column multi-head key lookup examples by copying the stocks table to a new table:

1
2
3
4
5
6
7
SQL_CREATE_TABLE = """
    CREATE TABLE example.stocks_multihead AS
    SELECT *
    FROM demo.stocks
"""

kinetica.execute_sql(SQL_CREATE_TABLE, 0, 0)

Then add an index on the sharded column, Symbol, which must be the target of any multi-head lookup on the stocks table:

1
kinetica.alter_table("example.stocks_multihead", "create_index", "Symbol")

To prepare the target table used by the multi-column multi-head key lookup examples, copy, reshard, & index the stocks table; the new shard key will be on both the Industry & Symbol columns:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
SQL_CREATE_TABLE = """
    CREATE TABLE example.stocks_multihead_multicolumn AS
    SELECT *, KI_SHARD_KEY(Industry, Symbol) 
    FROM demo.stocks
"""

kinetica.execute_sql(SQL_CREATE_TABLE, 0, 0)

# Create a column index on each of the sharded columns
kinetica.alter_table("example.stocks_multihead_multicolumn", "create_index", "Industry")
kinetica.alter_table("example.stocks_multihead_multicolumn", "create_index", "Symbol")

Key Lookup

Grab a handle to the stocks_multihead table, passing the use_multihead_io option to enable background multi-head key lookups:

1
stocks_table = gpudb.GPUdbTable(None, name = "example.stocks_multihead", db = kinetica, use_multihead_io = True)

Then perform the multi-head key lookup, searching the shard column for the value SPGI:

1
response = stocks_table.get_records_by_key(["SPGI"])

Lastly, output the results:

1
2
3
4
5
6
7
8
9
for record in response["data"]:
    
    print("{:10s} {:8.4f} {:8.4f} {:8.4f} {:8.4f}".format(
            datetime.fromtimestamp(record["Date"]/1000.0).strftime("%Y-%m-%d"),
            record["Open"],
            record["Low"],
            record["High"],
            record["Close"]
    ))

Key Lookup with Expression

Since any column involved in a multi-head key lookup must be indexed, first add an index to the Date column, which will be used in the example filter:

1
kinetica.alter_table("example.stocks_multihead", "create_index", "Date")

Then perform the multi-head key lookup with an extra filtering expression, searching the shard column for the value SPGI and filtering out records prior to 2017:

1
response = stocks_table.get_records_by_key(["SPGI"], "Date >= '2017-01-01'")

The results can be output in the same manner shown under Key Lookup.

Multi-Column Key Lookup

Grab a handle to the stocks_multihead_multicolumn table, passing the use_multihead_io option to enable background multi-head key lookups:

1
stocks_table = gpudb.GPUdbTable(None, name = "example.stocks_multihead_multicolumn", db = kinetica, use_multihead_io = True)

Then perform the multi-head key lookup, searching the sharded Industry & Symbol columns for the values Financial Exchanges & Data & SPGI, respectively:

1
2
3
response = stocks_table.get_records_by_key(
        {"Industry": "Financial Exchanges & Data", "Symbol": "SPGI"}
)

The results can be output in the same manner shown under Key Lookup.

Multi-Column Key Lookup with Expression

Since any column involved in a multi-head key lookup must be indexed, first add an index to the Date column, which will be used in the example filter:

1
kinetica.alter_table("example.stocks_multihead_multicolumn", "create_index", "Date")

Then perform the multi-head key lookup with an extra filtering expression, searching the sharded Industry & Symbol columns for the values Financial Exchanges & Data & SPGI, respectively, and filtering out records prior to 2017:

1
2
3
4
response = stocks_table.get_records_by_key(
        {"Industry": "Financial Exchanges & Data", "Symbol": "SPGI"},
        "Date >= '2017-01-01'"
)

The results can be output in the same manner shown under Key Lookup.

Result

The multi-head key lookup should consistently return faster than the same query without using multi-head. The results of a sample test run bear this out:

Lookup TypeTime (No Multi-Head)Time (Multi-Head)Record Count
Single-Column Key Only0.00430.0020222
Single-Column Key + Expression0.00310.000953
Multi-Column Key0.00460.0024222
Multi-Column Key + Expression0.00310.001053

Cautions

If using the Java API and MapReduce, there is a conflict between the version of Avro used by Kinetica and the one used by MapReduce. This conflict can be overcome by using the Maven shade plug-in with the relocation tag:

1
2
3
4
<relocation>
  <pattern>org.apache.avro</pattern>
  <shadedPattern>org.shaded.apache.avro</shadedPattern>
</relocation>