Version:

Solve Graph - Page Rank (NYC Taxi)

The following is a complete example, using the Python API, of solving a graph created with NYC Taxi data for a page rank problem via the /solve/graph endpoint. For more information on Network Graphs & Solvers, see Network Graphs & Solvers Concepts.

Prerequisites

The prerequisites for running the page rank solve graph example are listed below:

Python API Installation

The native Kinetica Python API is accessible through the following means:

  • For development on the Kinetica server:
  • For development not on the Kinetica server:

Kinetica RPM

In default Kinetica installations, the native Python API is located in the /opt/gpudb/api/python directory. The /opt/gpudb/bin/gpudb_python wrapper script is provided, which sets the execution environment appropriately.

Test the installation:

/opt/gpudb/bin/gpudb_python /opt/gpudb/api/python/examples/example.py

Important

When developing on the Kinetica server, use /opt/gpudb/bin/gpudb_python to run Python programs and /opt/gpudb/bin/gpudb_pip to install dependent libraries.

Git

  1. In the desired directory, run the following but be sure to replace <kinetica-version> with the name of the installed Kinetica version, e.g., v7.0:

    git clone -b release/<kinetica-version> --single-branch https://github.com/kineticadb/kinetica-api-python.git
    
  2. Change directory into the newly downloaded repository:

    cd kinetica-api-python
    
  3. In the root directory of the unzipped repository, install the Kinetica API:

    sudo python setup.py install
    
  4. Test the installation (Python 2.7 (or greater) is necessary for running the API example):

    python examples/example.py
    

PyPI

The Python package manager, pip, is required to install the API from PyPI.

  1. Install the API:

    pip install gpudb --upgrade
    
  2. Test the installation:

    python -c "import gpudb;print('Import Successful')"
    

    If Import Successful is displayed, the API has been installed as is ready for use.

Data File

The example script makes reference to a nyc_neighborhood.csv data file in the current directory. This can be updated to point to a valid path on the host where the file will be located, or the script can be run with the data file in the current directory.

CSV = "nyc_neighborhood.csv"

Script Detail

This example is going to demonstrate solving for the most popular pickup or dropoff point in New York City by ranking pickup and dropoff locations in terms of how frequently passengers are getting picked up/dropped off there. The more connected a point is in relation to other points, the greater its importance.

Constants

Several constants are defined at the beginning of the script:

  • HOST / PORT -- host and port values for the database
  • OPTION_NO_ERROR -- reference to a /clear/table option for ease of use and repeatability
  • TABLE_NYC_N -- the name of the table into which the NYC Neighborhood dataset is loaded. This dataset is joined to the TABLE_TAXI table to create the JOIN_TAXI dataset.
  • TABLE_TAXI -- the name of the table into which the NYC taxi dataset is loaded. This dataset is joined to the TABLE_NYC_N table to create the JOIN_TAXI dataset.
  • TABLE_TAXI_EW -- the name of the projection derived from the JOIN_TAXI dataset that serves as the edges for the GRAPH_T graph.
  • TABLE_TAXI_N -- the name of the union derived from the JOIN_TAXI dataset that serves as the nodes for the GRAPH_T graph.
  • TABLE_TAXI_N_S -- the same as TABLE_TAXI_N but later sharded so it can be joined to the nyctaxi_graph_id graph.
  • JOIN_TAXI -- the name of the join view that represents the dataset of all the trips found in the TABLE_TAXI dataset that overlap with the neighborhood boundaries found in the TABLE_NYC_N dataset
  • JOIN_PR_RESULTS -- the name of the join view that represents the dataset of all the nodes found in the TABLE_TAXI_N where the ID matches the SOLVERS_NODE_ID found in GRAPH_T_PRSOLVED_S
  • GRAPH_T -- the NYC taxi graph
  • GRAPH_T_PRSOLVED -- the solved NYC taxi graph using the PAGE_RANK solver type
  • GRAPH_T_PRSOLVED_S -- the same as GRAPH_T_PRSOLVED but later sharded so it can be joined to the nyctaxi_nodes_sharded table.
  • TABLE_GRAPH_T -- the name of the table representing the NYC taxi graph

HOST = "127.0.0.1"
CSV = "nyc_neighborhood.csv"

OPTION_NO_ERROR = {"no_error_if_not_exists": "true"}

TABLE_NYC_N = "nyc_neighborhood"
TABLE_TAXI = "nyctaxi"
TABLE_TAXI_EW = TABLE_TAXI + "_edges_weights_ids"
TABLE_TAXI_N = TABLE_TAXI + "_nodes"
TABLE_TAXI_N_S = TABLE_TAXI_N + "_sharded"

JOIN_TAXI = "taxi_tables_joined"

Graph Creation

One graph is used for this example: nyctaxi_graph_id, a graph utilizing IDs based on a modified version of the standard NYC Taxi dataset included with Kinetica installations.

To filter out data that could skew graph nyctaxi_graph_id, the NYC Neighborhood dataset must be inserted into Kinetica and joined to the NYC Taxi dataset using STXY_CONTAINS to remove any trip points in the NYC Taxi dataset that are not contained within the geospatial boundaries of the NYC Neighborhood dataset:

print(
    "Joining {} to {} to filter out data that could skew the taxi "
    "graphs.".format(TABLE_TAXI, JOIN_TAXI)
)
join_taxi_tables_response = kinetica.create_join_table(
    join_table_name=JOIN_TAXI,
    table_names=[TABLE_TAXI + " as t", TABLE_NYC_N + " as n"],
    column_names=[
        "CONCAT(CHAR32(pickup_longitude), CHAR32(pickup_latitude)) as pickup_name",
        "t.pickup_longitude", 
        "t.pickup_latitude",
        "HASH(t.pickup_longitude + t.pickup_latitude) as pickup_id",
        "CONCAT(CHAR32(dropoff_longitude), CHAR32(dropoff_latitude)) as dropoff_name",
        "t.dropoff_longitude",
        "t.dropoff_latitude",
        "HASH(t.dropoff_longitude + t.dropoff_latitude) as dropoff_id",
        "t.total_amount"
    ],
    expressions=[
        "(STXY_CONTAINS(n.geom, t.pickup_longitude, t.pickup_latitude)) AND"
        "(STXY_CONTAINS(n.geom, t.dropoff_longitude, t.dropoff_latitude)) "
    ]
)["status_info"]["status"]

Before nyctaxi_graph_id can be created, the edges must be derived from the taxi_tables_joined dataset's XY pickup and dropoff pairs to create the nyctaxi_edges_weights_id dataset:

# Union the JOIN_TAXI view to itself to collapse pickup & dropoff
# locations into a unified set of endpoint locations to serve as node IDs
print(
    "Unioning {} to itself contain the graph's nodes.".format(
        JOIN_TAXI
    )
)
sql = "CREATE TABLE " + TABLE_TAXI_N + " AS " \
      "SELECT " \
            "pickup_id as id, " \
            "pickup_longitude as lon, " \
            "pickup_latitude as lat " \
      "FROM " + JOIN_TAXI + " " \
      "UNION " \
      "SELECT " \
            "dropoff_id, " \
            "dropoff_longitude, " \
            "dropoff_latitude " \
      "FROM " + JOIN_TAXI
nodes_response = kinetica.execute_sql(
    statement=sql,
    offset=0,
    limit=gpudb.GPUdb.END_OF_SET,
    encoding="json",
    options={}
)["status_info"]["status"]
# Create a projection to contain the graph edges (based on NODE_ID)
print(
    "Creating a projection from {} to contain the graph's edges.".format(
        JOIN_TAXI
    )
)
edges_id_response = kinetica.create_projection(
    table_name=JOIN_TAXI,
    projection_name=TABLE_TAXI_EW,
    column_names=["pickup_id", "dropoff_id"]
)["status_info"]["status"]

Now, nyctaxi_graph_id is created with the following characteristics:

  • It is not directed because direction is irrelevant to this example
  • The nodes in this graph are represented using the IDs created from the union of pickup and dropoff points IDs of the nyctaxi_nodes table (NODE_ID).
  • The edges in this graph are using the individual pickup and dropoff point IDs of the nyctaxi_edges_weights_id table as the edge endpoints (EDGE_NODE1_ID / EDGE_NODE2_ID).
  • It has no weights because one does not need to influence the ranking in any way
  • It has no inherent restrictions for any of the nodes or edges in the graph
  • It will be replaced with this instance of the graph if a graph of the same name exists (recreate).
print("Creating {}".format(GRAPH_T))
create_t_graph_response = kinetica.create_graph(
    graph_name=GRAPH_T,
    directed_graph=False,
    nodes=[
        TABLE_TAXI_N + ".id AS NODE_ID"
    ],
    edges=[
        TABLE_TAXI_EW + ".pickup_id AS EDGE_NODE1_ID",
        TABLE_TAXI_EW + ".dropoff_id AS EDGE_NODE2_ID"
    ],
    weights=[],
    restrictions=[],
    options={
        "recreate": "true"
    }
)

Page Rank

The graph is solved:

solve_pr_graph_response = kinetica.solve_graph(
    graph_name=GRAPH_T,
    solver_type="PAGE_RANK",
    source_nodes=["129341667930495514"],
    destination_nodes=[],
    solution_table=GRAPH_T_PRSOLVED,
    options={}
)["status_info"]["status"]

Important

A source node ID was selected at random from the nyctaxi_nodes. Since page rank is ranking each node's connectedness in relation to other nodes, any node can be the source

A sharded version of the nyctaxi_nodes union is created earlier is so it can be joined:

nodes_sharded_response = kinetica.create_projection(
    table_name=TABLE_TAXI_N,
    projection_name=TABLE_TAXI_N_S,
    column_names=["id", "lon", "lat"],
    options={"shard_key": "id"}
)["status_info"]["status"]

A sharded version of the nyctaxi_graph_id_page_rank_solved table is also created so it can be joined to the nyctaxi_nodes union:

graph_sharded_response = kinetica.create_projection(
    table_name=GRAPH_T_PRSOLVED,
    projection_name=GRAPH_T_PRSOLVED_S,
    column_names=["SOLVERS_NODE_ID", "SOLVERS_NODE_COSTS"],
    options={"shard_key": "SOLVERS_NODE_ID"}
)["status_info"]["status"]

The union and graph results table are joined on ID:

# Join the TABLE_TAXI_N_S and GRAPH_T_PRSOLVED_S tables to pair the page
# rank results IDs and costs with the longitude/latitude pair of the node
join_pr_response = kinetica.create_join_table(
    join_table_name=JOIN_PR_RESULTS,
    table_names=[
        TABLE_TAXI_N_S + " as n",
        GRAPH_T_PRSOLVED_S + " as s"
    ],
    column_names=[
        "n.lon", 
        "n.lat", 
        "s.SOLVERS_NODE_ID", 
        "s.SOLVERS_NODE_COSTS"
    ],
    expressions=["n.id = s.SOLVERS_NODE_ID"],
    options={}
)["status_info"]["status"]

The top 10 nodes (sorted by descending cost) are retrieved. The higher the cost, the more frequently the node was visited:

Top 10 nodes sorted by cost (highest to lowest):
Longitude Latitude                   ID            Cost
========= ======== ==================== ===============
-73.98232 40.74015 -5733631352137098805 0.000197256
-74.00276 40.76059 -5733631352137098805 0.000197256
-73.99351 40.75133 -5733631352137098805 0.000197256
 -73.9873 40.74513 -5733631352137098805 0.000197256
-73.97713 40.78787  7374672091319971430 0.000197256
-73.95725 40.76799  7374672091319971430 0.000197256
-73.96481 40.77554  7374672091319971430 0.000197256
 -73.9651 40.77584  7374672091319971430 0.000197256
-73.93757  40.7583  5145699326395297532 0.0001881493
-73.93774 40.75847  5145699326395297532 0.0001881493

Download & Run

Included below is a complete example containing all the above requests, the data files, and output.

To run the complete sample, ensure the solve_graph_nyctaxi_page_rank.py and nyc_neighborhood.csv files are in the same directory (assuming the locations were not changed in the solve_graph_nyctaxi_page_rank.py script) and the nyctaxi dataset has been ingested; then switch to that directory and do the following:

  • If on the Kinetica host:

    /opt/gpudb/bin/gpudb_python solve_graph_nyctaxi_page_rank.py
    
  • If running after using PyPI or GitHub to install the Python API:

    python solve_graph_nyctaxi_page_rank.py