Distributed Key Lookup

Distributed Key Lookup (Multi-Head Egress) is a mechanism that allows sharded data to be retrieved directly from cluster nodes, bypassing the overhead of pulling the data through the head node from the cluster nodes. This greatly increases the speed of retrieval by spreading the network traffic across multiple nodes.

Operationally, the key lookup mechanism is the retrieval of a given shard key value in a given table or view, filtering out any records that don't match an optional expression.

API Support

LanguageDistributed Key Lookup Mechanism
C++RecordRetriever
C#RecordRetriever
JavaRecordRetriever
JavascriptX
Node.jsX
PythonRecordRetriever
RESTX
SQLUsing KI_HINT_KEY_LOOKUP hint in a query; see SQL below for details

Configuration

Distributed operations are enabled through configuration:

  • Distributed operations needs to be enabled on the server (default)
  • Client-accessible URLs need to be set for each node in the cluster

Server-Side

In order for the cluster nodes to transmit data directly to a key lookup client, the configuration on each node needs to be set to allow the incoming HTTP requests for that data. The /opt/gpudb/core/etc/gpudb.conf file needs to have the following property set for distributed key lookup to work properly:

# Enable worker HTTP servers...
enable_worker_http_servers = true

Each node can be configured with a public URL for the API to use to connect to it, in this section of the gpudb.conf:

# Optionally, specify a public URL for each worker HTTP server that clients
# should use to connect for multi-head operations.
#
# NOTE: If specified for any ranks, a public URL must be specified for all
# ranks. Cannot be used in conjunction with nplus1.

rank0.public_url = http://192.168.0.10:9191
rank1.public_url = http://192.168.0.10:9192
rank2.public_url = http://192.168.0.11:9193
rank3.public_url = http://192.168.0.11:9194

If a public_url is not defined, each node can be connected to on any of its available interfaces, taking HA and HTTPD configurations, as well as any general network security restrictions into account.

Client-Side

The list of URLs for connecting to each worker node is automatically created when using the following:

  • Java API default RecordRetriever
  • Python API GPUdbTable configured for background distributed operations
Java Default RecordRetriever
1
2
3
4
5
6
7
8
9
import com.gpudb.GPUdb;
import com.gpudb.RecordRetriever;
import com.gpudb.GenericRecord;

GPUdb db = new GPUdb("http://<db.host>:9191", new GPUdb.Options());

// Distributed Key Lookup Object Instantiation
RecordRetriever<GenericRecord> recordRetriever =
    new RecordRetriever<>(db, tableName, Type.fromTable(db, tableName));
Python GPUdbTable
1
2
3
4
5
from gpudb import GPUdb, GPUdbTable

db = GPUdb("http://<db.host>:9191", username = "<user>", password = "<pass>")

t = GPUdbTable(name = table_name, db = db, use_multihead_io = True)

The list will need to be manually configured in all other cases by using the connection object to retrieve the list of available cluster nodes from the database itself. Below is a code snippet showing an automatically populated worker list and subsequent creation of a distributed key lookup object with it:

Java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import com.gpudb.GPUdb;
import com.gpudb.RecordRetriever;
import com.gpudb.GenericRecord;
import com.gpudb.WorkerList;

GPUdb db = new GPUdb("http://<db.host>:9191", new GPUdb.Options());
WorkerList workers = new WorkerList(db);

// Distributed Key Lookup Object Instantiation
RecordRetriever<GenericRecord> recordRetriever =
    new RecordRetriever<>(db, tableName, Type.fromTable(db, tableName), workers);
Python
1
2
3
4
5
6
7
8
9
from gpudb import GPUdb, GPUdbTable
from gpudb_multihead_io import GPUdbWorkerList, RecordRetriever

db = GPUdb("http://<db.host>:9191", username = "<user>", password = "<pass>")
workers = GPUdbWorkerList(db)

# Distributed Key Lookup Object Instantiation
table_type = GPUdbTable(name = table_name, db = db).get_table_type()
bulk_retriever = RecordRetriever( db, table_name, table_type, workers )

Note that in cases where no public_url was configured on each server node, workers may have more than one IP address, not all of which may be accessible to the client. The API worker list constructor uses the first IP in the list returned by the server for each worker, by default. To override this behavior, a regular expression Pattern or prefix String can be used to match the correct worker IP addresses:

Java
1
2
3
4
// Match 172.X.Y.Z addresses
WorkerList(db, Pattern.compile("172\\..*"));
// or
WorkerList(db, "172.");
Python
1
GPUdbWorkerList(db, "172.")

Considerations

There are several factors to consider when using distributed key lookup:

  • Only primary key/shard key value lookups are allowed for sharded tables; arbitrary queries are not supported.
  • Lookup values cannot be expression functions.
  • The shard key and any columns used in the expression must all be indexed.
  • There is an implicit index for a primary key, but queries that make use of this index for the lookup cannot have any other columns in the expression.
  • If column indexes exist on all columns being filtered, additional indexed columns can be added to the lookup expression using either equality or inequality relations.

SQL

SQL has wide support for distributed key lookups. The following configurations will result in successful lookups:

  • Specifying only the primary key columns/values of any table with a primary key
  • Specifying only the shard key columns/values of a sharded table with attribute indexes on each of the shard key columns
  • Specifying any indexed columns of a replicated table

Note

As noted under Considerations, if all key lookup columns have column indexes, additional indexed columns can be added to the filter. See below for an example.

The KI_HINT_KEY_LOOKUP hint is used to request a fast key lookup. If the table configuration and query given aren't a match for the key lookup, the query will be executed as any other query would--without the fast key lookup. A warning will be returned that the key lookup was not possible, but the query will still be executed and results returned.

Syntax

The following simple query syntax is supported for key lookups:

SQL Key Lookup Syntax
1
2
3
4
SELECT * /* KI_HINT_KEY_LOOKUP */
FROM [<schema name>.]<table_name>
WHERE <key_column_1> = <key_value_1> [... AND <key_column_N> = <key_value_N>]
[AND <indexed_column_1> [=] <indexed_value_1> [... AND <indexed_column_N> = <indexed_value_N>]]

Examples

To look up a particular record in a sharded table with an explicit shard key on column id:

Sharded Table with Shard Key Lookup Example
1
2
3
SELECT *  /* KI_HINT_KEY_LOOKUP */
FROM product_sk
WHERE id = 1;

The sharded example above can have an equality or inequality expression added to it to further filter the results, as long as the additional column, stock, has an attribute index.

Sharded Table with Shard Key Lookup and Additional Filter Example
1
2
3
SELECT * /* KI_HINT_KEY_LOOKUP */
FROM product_sk
WHERE id = 2 AND stock > 0;

To look up records matching a given category in a replicated table with attribute indexes on columns id & category:

Replicated Table with Indexed Columns Lookup Example
1
2
3
SELECT *  /* KI_HINT_KEY_LOOKUP */
FROM product_replicated
WHERE id = 2 AND category = 'Furniture';

Python Distributed Key Lookup

In the Python API, the GPUdbTable constructor has a use_multihead_io parameter, which allows a GPUdbTable object to handle all RecordRetriever interactions with the associated table in the background. The following is a Python API example that demonstrates the use of the background RecordRetriever for retrieving data.

Important

This example relies on tables mirroring the stocks data set, which can be imported into Kinetica via GAdmin.

Key Lookup

Ensure the stocks_mh table, which has a one-column shard key on Symbol, is indexed on that column:

1
kinetica.alter_table("example.stocks_mh", "create_index", "Symbol")

Grab a handle to the stocks_mh table, passing the use_multihead_io option to enable background distributed key lookups:

1
stocks_table = gpudb.GPUdbTable(None, name = "example.stocks_mh", db = kinetica, use_multihead_io = True)

Then perform the distributed key lookup, searching the shard column for the value SPGI:

1
response = stocks_table.get_records_by_key(["SPGI"])

Key Lookup with Expression

Ensure the stocks_mh table, which has a one-column shard key on Symbol, is indexed on that column:

1
kinetica.alter_table("example.stocks_mh", "create_index", "Symbol")

Since any column involved in a distributed key lookup must be indexed, also add an index to the Date column, which will be used in the example filter:

1
kinetica.alter_table("example.stocks_mh", "create_index", "Date")

Grab a handle to the stocks_mh table, passing the use_multihead_io option to enable background distributed key lookups:

1
stocks_table = gpudb.GPUdbTable(None, name = "example.stocks_mh", db = kinetica, use_multihead_io = True)

Then perform the distributed key lookup with an extra filtering expression, searching the shard column for the value SPGI and filtering out records prior to 2017:

1
response = stocks_table.get_records_by_key(["SPGI"], "Date >= '2017-01-01'")

Multi-Column Key Lookup

Ensure the stocks_mh_mc table, which has a two-column shard key on Symbol & Industry, is indexed on those columns:

1
2
kinetica.alter_table("example.stocks_mh_mc", "create_index", "Industry")
kinetica.alter_table("example.stocks_mh_mc", "create_index", "Symbol")

Grab a handle to the stocks_mh_mc table, passing the use_multihead_io option to enable background distributed key lookups:

1
stocks_table = gpudb.GPUdbTable(None, name = "example.stocks_mh_mc", db = kinetica, use_multihead_io = True)

Then perform the distributed key lookup, searching the sharded Industry & Symbol columns for the values Financial Exchanges & Data & SPGI, respectively:

1
2
3
response = stocks_table.get_records_by_key(
        {"Industry": "Financial Exchanges & Data", "Symbol": "SPGI"}
)

Multi-Column Key Lookup with Expression

Ensure the stocks_mh_mc table, which has a two-column shard key on Symbol & Industry, is indexed on those columns:

1
2
kinetica.alter_table("example.stocks_mh_mc", "create_index", "Industry")
kinetica.alter_table("example.stocks_mh_mc", "create_index", "Symbol")

Since any column involved in a distributed key lookup must be indexed, also add an index to the Date column, which will be used in the example filter:

1
kinetica.alter_table("example.stocks_mh_mc", "create_index", "Date")

Grab a handle to the stocks_mh_mc table, passing the use_multihead_io option to enable background distributed key lookups:

1
stocks_table = gpudb.GPUdbTable(None, name = "example.stocks_mh_mc", db = kinetica, use_multihead_io = True)

Then perform the distributed key lookup with an extra filtering expression, searching the sharded Industry & Symbol columns for the values Financial Exchanges & Data & SPGI, respectively, and filtering out records prior to 2017:

1
2
3
4
response = stocks_table.get_records_by_key(
        {"Industry": "Financial Exchanges & Data", "Symbol": "SPGI"},
        "Date >= '2017-01-01'"
)

Result

The results of a distributed key lookup call can be output from response as follows:

1
2
3
4
5
6
7
8
9
for record in response["data"]:
    
    print("{:10s} {:8.4f} {:8.4f} {:8.4f} {:8.4f}".format(
            datetime.fromtimestamp(record["Date"]/1000.0).strftime("%Y-%m-%d"),
            record["Open"],
            record["Low"],
            record["High"],
            record["Close"]
    ))

The distributed key lookup should consistently return faster than the same query not using it. The results of a sample test run bear this out:

Lookup TypeTime (Without DKL)Time (With DKL)Record Count
Single-Column Key Only0.00430.0020222
Single-Column Key + Expression0.00310.000953
Multi-Column Key0.00460.0024222
Multi-Column Key + Expression0.00310.001053

Cautions

If using the Java API and MapReduce, there is a conflict between the version of Avro used by Kinetica and the one used by MapReduce. This conflict can be overcome by using the Maven shade plug-in with the relocation tag:

1
2
3
4
<relocation>
  <pattern>org.apache.avro</pattern>
  <shadedPattern>org.shaded.apache.avro</shadedPattern>
</relocation>