Multi-Head Egress is a mechanism that allows sharded data to be retrieved directly from cluster nodes, bypassing the overhead of pulling the data through the head node from the cluster nodes. This greatly increases the speed of retrieval by parallelizing the output of data and spreading the network traffic across multiple nodes.
Operationally, the egress mechanism is a lookup of a given
shard key value in a given table or
view, filtering out any records that don't match an
optional expression
.
Topics
Language | Multi-head Egress Mechanism |
---|---|
C++ | RecordRetriever |
C# | RecordRetriever |
Java | RecordRetriever |
Javascript | X |
Node.js | X |
Python | RecordRetriever |
REST | X |
SQL | X |
In order for the cluster nodes to send data directly to an egress client,
the configuration on each node needs to be updated to allow the incoming HTTP
requests for that data. The /opt/gpudb/core/etc/gpudb.conf
file
needs to have the following property set for multi-head egress to work
properly:
# Enable worker HTTP servers...
enable_worker_http_servers = true
Both the HA and HTTPD
configurations will be taken into account, as well as any public URLs (which
override all other settings) defined in /opt/gpudb/core/etc/gpudb.conf
when
returning multi-head URLs for the worker nodes. With the exception of the
Python background multi-head process, the multi-head egress object
requires a list of worker nodes to use to distribute the data requests, with
one entry in the list for each node/process. This list can be autopopulated
simply by using a GPUdb
connection object, which can retrieve the list of
available cluster nodes from the database itself. Below is a code snippet
(in Java) showing an automatically populated worker list:
GPUdb gpudb = new GPUdb("http://localhost:9191");
WorkerList workers = new WorkerList(gpudb);
Note that in some cases, workers may be configured to use more than one IP
address, not all of which may be accessible to the client; the worker list
constructor uses the first IP returned by the server for each worker. In cases
where workers may use more than one IP address and public URLs are not
configured, a regular expression Pattern
or prefix String
can be used to
match the correct worker IP addresses:
// Match 172.X.Y.Z addresses
WorkerList(GPUdb gpudb, Pattern.compile("172\\..*"));
// or
WorkerList(GPUdb gpudb, "172.");
There are several factors to consider when using multi-head egress:
expression
must all be
indexed. There is an implicit index for a
primary key, so an explicit column index is not
required when the primary key is the shard key, or when the primary key
is a superset of the shard key and the expression
references all of the
primary key columns that are not part of the shard key.In the Python API,
the GPUdbTable
object has a use_multihead_io
parameter, which allows the
GPUdbTable
class to handle all RecordRetriever
interactions with the
associated table in the background. The following is
a Python API example that demonstrates the use of the background
RecordRetriever
for retrieving data.
Important
This example relies on the stocks data set, which can be imported into Kinetica via GAdmin. That data will be copied to two new tables and have indexes applied, as necessary.
First, connect to the database; here, the host IP address is passed in as a parameter to the Python API call:
h_db = gpudb.GPUdb(encoding = "BINARY", host = args.host, port = "9191")
Next, prepare the target table used by the single-column multi-head key lookup
examples by copying the stocks
table to a new table:
SQL_CREATE_TABLE = """
CREATE TABLE EXAMPLES.stocks_multihead AS
SELECT *
FROM stocks
"""
h_db.execute_sql(SQL_CREATE_TABLE, 0, 0)
Then add an index on the sharded column, Symbol
, which must be the target of
any multi-head lookup on the stocks
table:
h_db.alter_table("stocks_multihead", "create_index", "Symbol")
To prepare the target table used by the multi-column multi-head key lookup
examples, copy, reshard, & index the stocks
table; the new shard key will be
on both the Industry
& Symbol
columns:
SQL_CREATE_TABLE = """
CREATE TABLE EXAMPLES.stocks_multihead_multicolumn AS
SELECT *, KI_SHARD_KEY(Industry, Symbol)
FROM stocks
"""
h_db.execute_sql(SQL_CREATE_TABLE, 0, 0)
# Create a column index on each of the sharded columns
h_db.alter_table("stocks_multihead_multicolumn", "create_index", "Industry")
h_db.alter_table("stocks_multihead_multicolumn", "create_index", "Symbol")
Grab a handle to the stocks_multihead
table, passing the
use_multihead_io
option to enable background multi-head key lookups:
stocks_table = gpudb.GPUdbTable(None, name = "stocks_multihead", db = h_db, use_multihead_io = True)
Then perform the multi-head key lookup, searching the shard column for the
value SPGI
:
response = stocks_table.get_records_by_key(["SPGI"])
Lastly, output the results:
for record in response["data"]:
print("{:10s} {:8.4f} {:8.4f} {:8.4f} {:8.4f}".format(
datetime.fromtimestamp(record["Date"]/1000.0).strftime("%Y-%m-%d"),
record["Open"],
record["Low"],
record["High"],
record["Close"]
))
Since any column involved in a multi-head key lookup must be indexed, first add
an index to the Date
column, which will be used in the example filter:
h_db.alter_table("stocks_multihead", "create_index", "Date")
Then perform the multi-head key lookup with an extra filtering expression,
searching the shard column for the value SPGI
and filtering out records
prior to 2017:
response = stocks_table.get_records_by_key(["SPGI"], "Date >= '2017-01-01'")
The results can be output in the same manner shown under Key Lookup.
Grab a handle to the stocks_multihead_multicolumn
table, passing the
use_multihead_io
option to enable background multi-head key lookups:
stocks_table = gpudb.GPUdbTable(None, name = "stocks_multihead_multicolumn", db = h_db, use_multihead_io = True)
Then perform the multi-head key lookup, searching the sharded Industry
&
Symbol
columns for the values Financial Exchanges & Data
& SPGI
,
respectively:
response = stocks_table.get_records_by_key(
{"Industry": "'Financial Exchanges & Data'", "Symbol": "'SPGI'"}
)
The results can be output in the same manner shown under Key Lookup.
Since any column involved in a multi-head key lookup must be indexed, first add
an index to the Date
column, which will be used in the example filter:
h_db.alter_table("stocks_multihead_multicolumn", "create_index", "Date")
Then perform the multi-head key lookup with an extra filtering expression,
searching the sharded Industry
& Symbol
columns for the values
Financial Exchanges & Data
& SPGI
, respectively, and filtering out
records prior to 2017:
response = stocks_table.get_records_by_key(
{"Industry": "'Financial Exchanges & Data'", "Symbol": "'SPGI'"},
"Date >= '2017-01-01'"
)
The results can be output in the same manner shown under Key Lookup.
The multi-head key lookup should consistently return faster than the same query without using multi-head. The results of a sample test run bear this out:
Lookup Type | Time (No Multi-Head) | Time (Multi-Head) | Record Count |
---|---|---|---|
Single-Column Key Only | 0.0043 | 0.0020 | 222 |
Single-Column Key + Expression | 0.0031 | 0.0009 | 53 |
Multi-Column Key | 0.0046 | 0.0024 | 222 |
Multi-Column Key + Expression | 0.0031 | 0.0010 | 53 |
relocation
tag:<relocation>
<pattern>org.apache.avro</pattern>
<shadedPattern>org.shaded.apache.avro</shadedPattern>
</relocation>