Kinetica Dialect for SQLAlchemy

Kinetica provides a SQL interface through its SQLAlchemy dialect, which makes use of the DBAPI interface of Kinetica's Python API.


Installation

The Kinetica Dialect for SQLAlchemy can be installed from PyPI using pip:

pip3 install sqlalchemy_kinetica

This will also install the supporting version of the Kinetica Python API.

The package is also available in GitHub:

https://github.com/kineticadb/sqlalchemy-kinetica

Connecting

The standard SQLAlchemy create_engine call can be used to connect to a Kinetica database instance.

For HTTPS-based installations, the bypass_ssl_cert_check option can be used to bypass any server certificate validation.

Standard
1
2
3
4
5
6
7
8
engine = create_engine(
    "kinetica://",
    connect_args = {
            "url": url,
            "username": username,
            "password": password
    }
)
SSL Cert Bypass
1
2
3
4
5
6
7
8
9
engine = create_engine(
    "kinetica://",
    connect_args = {
            "url": url,
            "username": username,
            "password": password,
            "bypass_ssl_cert_check": True
    }
)

Managing Connections

SQLAlchemy connections can either be automatically or manually managed. When manually managed, connections must be explicitly closed after use.

Automatic
1
2
3
4
from sqlalchemy import text

with engine.connect() as conn:
    conn.execute(text("SELECT NOW()"))
Manual
1
2
3
4
5
6
7
from sqlalchemy import text

conn = engine.connect()
try:
    conn.execute(text("SELECT NOW()"))
finally:
    conn.close()

Data Types

All Kinetica data types are available to be used in SQLAlchemy through either native SQLAlchemy types or Kinetica custom dialect types.

CategoryKinetica Data TypeSQLAlchemy Type Class
NumberBOOLEANsqlalchemy.Boolean
TINYINTsqlalchemy_kinetica.kinetica_types.TINYINT
SMALLINTsqlalchemy_kinetica.kinetica_types.SMALLINT
INTEGERsqlalchemy.Integer
BIGINTsqlalchemy.BIGINT
UNSIGNED BIGINTsqlalchemy_kinetica.kinetica_types.UnsignedBigInteger
REALsqlalchemy.REAL
DOUBLEsqlalchemy.DOUBLE
DECIMAL(P,S)sqlalchemy_kinetica.kinetica_types.DECIMAL
StringVARCHARsqlalchemy.VARCHAR
IPV4sqlalchemy_kinetica.kinetica_types.IPV4
UUIDsqlalchemy.UUID
JSONsqlalchemy_kinetica.kinetica_types.JSON
Date/TimeDATEsqlalchemy.DATE
DATETIMEsqlalchemy.DATETIME
TIMEsqlalchemy.TIME
TIMESTAMPsqlalchemy.TIMESTAMP
BinaryBLOBsqlalchemy_kinetica.kinetica_types.BLOB
GeospatialGEOMETRYsqlalchemy_kinetica.kinetica_types.GEOMETRY
BLOB(WKT)sqlalchemy_kinetica.kinetica_types.BlobWKT
CompositeBOOLEAN[N]sqlalchemy.ARRAY, sqlalchemy.Boolean
INTEGER[N]sqlalchemy.ARRAY, sqlalchemy.Integer
BIGINT[N]sqlalchemy.ARRAY, sqlalchemy.BIGINT
VECTOR(N)sqlalchemy_kinetica.kinetica_types.VECTOR
REAL[N]sqlalchemy.ARRAY, sqlalchemy.REAL
DOUBLE[N]sqlalchemy.ARRAY, sqlalchemy.DOUBLE
VARCHAR[N]sqlalchemy.ARRAY, sqlalchemy.VARCHAR

Creating Tables

The full set of Kinetica table operations, table options, and column types are available through the execution of Kinetica SQL as string literals. A large portion of the Kinetica table feature set is available via SQLAlchemy objects and the Kinetica dialect.

Column Types

The following demonstrates creating a table with various column types using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from sqlalchemy import text

SQL_CREATE_TABLE = """
    CREATE TABLE sqlalchemy.column_types
    (
        i    INTEGER NOT NULL,                                 /* non-nullable native integer, part of primary key (defined at end)                  */
        ti   TINYINT,                                          /* native int8                                                                        */
        si   SMALLINT,                                         /* native int16                                                                       */
        bi   BIGINT NOT NULL,                                  /* non-nullable native long, part of primary key (defined at end)                     */
        ubi  UNSIGNED BIGINT,                                  /* native unsigned long                                                               */
        b    BOOLEAN,                                          /* 0s and 1s only                                                                     */
        r    REAL,                                             /* native float                                                                       */
        db   DOUBLE,                                           /* native double                                                                      */
        dc   DECIMAL(10, 4),                                   /* native decimal                                                                     */
        v    VARCHAR(TEXT_SEARCH),                             /* string, searchable, only limited in size by system-configured value                */
        cd   VARCHAR(30, DICT),                                /* char32 using dictionary-encoding of values                                         */
        ct   VARCHAR(256, TEXT_SEARCH),                        /* char256, searchable                                                                */
        ip   IPV4,                                             /* IP address                                                                         */
        u    UUID(INIT_WITH_UUID),                             /* UUID                                                                               */
        d    DATE,                                             /* simple date                                                                        */
        t    TIME,                                             /* simple time                                                                        */
        dt   DATETIME(INIT_WITH_NOW),                          /* date/time                                                                          */
        ts   TIMESTAMP,                                        /* timestamp                                                                          */
        j    JSON,                                             /* JSON string                                                                        */
        bl   BLOB,                                             /* native bytes                                                                       */
        wkt  BLOB(WKT),                                        /* geospatial column for WKT binary data                                              */
        geo  GEOMETRY,                                         /* geospatial column for WKT string data                                              */
        vec  VECTOR(10),                                       /* vector column holding 10 floating point values                                     */
        ab   BOOLEAN[10],                                      /* array column holding 10 BOOLEAN values                                             */
        ai   INTEGER[10],                                      /* array column holding 10 INTEGER values                                             */
        abi  BIGINT[10],                                       /* array column holding 10 BIGINT values                                              */
        ar   REAL[10],                                         /* array column holding 10 REAL values                                                */
        adb  DOUBLE[10],                                       /* array column holding 10 DOUBLE values                                              */
        av   VARCHAR[10],                                      /* array column holding 10 VARCHAR values                                             */
        PRIMARY KEY (i, bi)                                    /* composite primary key on i & bi columns, which must be NOT NULL                    */
    )
"""

conn.execute(text(SQL_CREATE_TABLE))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from sqlalchemy import (
    MetaData, Table, Column,
    Boolean, Integer, BIGINT, REAL, DOUBLE,
    UUID, DATE, TIME, DATETIME, TIMESTAMP, VARCHAR, ARRAY
)
from sqlalchemy_kinetica.kinetica_types import (
    TINYINT, SMALLINT, UnsignedBigInteger, FLOAT, DECIMAL,
    IPV4, GEOMETRY, BlobWKT, JSON, BLOB, VECTOR, JSONArray
)

metadata = MetaData()

various_types = Table(
    "column_types",
    metadata,
    Column("i", Integer, primary_key = True),
    Column("ti", TINYINT),
    Column("si", SMALLINT),
    Column("bi", BIGINT, primary_key = True),
    Column("ubi", UnsignedBigInteger),
    Column("b", Boolean),
    Column("r", REAL),
    Column("db", DOUBLE),
    Column("dc", DECIMAL(10, 4)),
    Column("v", VARCHAR, info = {"text_search": True}),
    Column("cd", VARCHAR(30), info = {"dict": True}),
    Column("ct", VARCHAR(256), info = {"text_search": True}),
    Column("ip", IPV4),
    Column("u", UUID, info = {"init_with_uuid": True}),
    Column("d", DATE),
    Column("t", TIME),
    Column("dt", DATETIME, info = {"init_with_now": True}),
    Column("ts", TIMESTAMP),
    Column("j", JSON),
    Column("bl", BLOB),
    Column("wkt", BlobWKT),
    Column("geo", GEOMETRY),
    Column("vec", VECTOR(10)),
    Column("ab", ARRAY(Boolean, dimensions = 10)),
    Column("ai", ARRAY(Integer, dimensions = 10)),
    Column("abi", ARRAY(BIGINT, dimensions = 10)),
    Column("ar", ARRAY(REAL, dimensions = 10)),
    Column("adb", ARRAY(DOUBLE, dimensions = 10)),
    Column("av", ARRAY(VARCHAR, dimensions = 10)),
    schema = "sqlalchemy"
)

metadata.create_all(conn.engine)

Replicated Tables

The following demonstrates creating a replicated table using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
from sqlalchemy import text

SQL_CREATE_TABLE = """
    CREATE REPLICATED TABLE sqlalchemy.employee
    (
        id INTEGER NOT NULL,
        dept_id INTEGER NOT NULL,
        manager_id INTEGER,
        first_name VARCHAR(30),
        last_name VARCHAR(30),
        sal DECIMAL(18,4),
        hire_date DATE,
        work_district WKT,
        office_longitude REAL,
        office_latitude REAL,
        profile VECTOR(10),
        PRIMARY KEY (id)
    )
"""

conn.execute(text(SQL_CREATE_TABLE))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from sqlalchemy import MetaData, Table, Column, Integer, REAL, DATE, VARCHAR
from sqlalchemy_kinetica.kinetica_types import DECIMAL, BlobWKT, VECTOR

metadata = MetaData()

employee = Table(
    "employee",
    metadata,
    Column("id", Integer, nullable = False, primary_key = True),
    Column("dept_id", Integer, nullable = False, primary_key = True),
    Column("manager_id", Integer),
    Column("first_name", VARCHAR(30)),
    Column("last_name", VARCHAR(30)),
    Column("sal", DECIMAL(18, 4)),
    Column("hire_date", DATE),
    Column("work_district", BlobWKT),
    Column("office_longitude", REAL),
    Column("office_latitude", REAL),
    Column("profile", VECTOR(10)),
    schema = "sqlalchemy",
    prefixes = ["REPLICATED"]
)

metadata.create_all(conn.engine)

Sharded Tables with Options

The following demonstrates creating a sharded table with a variety of table options applied, using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from sqlalchemy import text

SQL_CREATE_TABLE = """
    CREATE OR REPLACE TABLE sqlalchemy.employee
    (
        id INTEGER NOT NULL,
        dept_id INTEGER NOT NULL,
        manager_id INTEGER,
        first_name VARCHAR(30),
        last_name VARCHAR(30),
        sal DECIMAL(18,4),
        hire_date DATE,
        work_district WKT,
        office_longitude REAL,
        office_latitude REAL,
        profile VECTOR(10),
        PRIMARY KEY (id, dept_id),
        SHARD KEY (dept_id)
    )
    PARTITION BY RANGE (YEAR(hire_date))
    PARTITIONS
    (
        order_2018_2020 MIN(2018) MAX(2021),
        order_2021                MAX(2022),
        order_2022                MAX(2023),
        order_2023                MAX(2024)
    )
    TIER STRATEGY
    (
        ( ( VRAM 1, RAM 7, PERSIST 5 ) )
    )
    INDEX (dept_id)
    CHUNK SKIP INDEX (id)
    GEOSPATIAL INDEX (work_district)
    GEOSPATIAL INDEX (office_longitude, office_latitude)
    CAGRA INDEX (profile)
    USING TABLE PROPERTIES (CHUNK SIZE = 1000000, NO_ERROR_IF_EXISTS = true, TTL = 120)
"""

conn.execute(text(SQL_CREATE_TABLE))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from sqlalchemy import MetaData, Table, Column, Integer, REAL, DATE, VARCHAR
from sqlalchemy_kinetica.kinetica_types import DECIMAL, BlobWKT, VECTOR

metadata = MetaData()

partition_clause = """
    PARTITION BY RANGE (YEAR(hire_date))
    PARTITIONS
    (
        order_2018_2020 MIN(2018) MAX(2021),
        order_2021                MAX(2022),
        order_2022                MAX(2023),
        order_2023                MAX(2024)
    )
"""

employee = Table(
    "employee",
    metadata,
    Column("id", Integer, nullable = False, primary_key = True),
    Column("dept_id", Integer, nullable = False, primary_key = True, info = {"shard_key": True}),
    Column("manager_id", Integer),
    Column("first_name", VARCHAR(30)),
    Column("last_name", VARCHAR(30)),
    Column("sal", DECIMAL(18, 4)),
    Column("hire_date", DATE),
    Column("work_district", BlobWKT),
    Column("office_longitude", REAL),
    Column("office_latitude", REAL),
    Column("profile", VECTOR(10)),
    schema = "sqlalchemy",
    prefixes = ["OR REPLACE"],
    info = {
        "CHUNK SIZE": 1000000,
        "NO_ERROR_IF_EXISTS": "TRUE",
        "TTL": 120
    },
    kinetica_index = [["dept_id"]],
    kinetica_chunk_skip_index = "id",
    kinetica_geospatial_index = [["work_district"], ["office_longitude", "office_latitude"]],
    kinetica_cagra_index = ["profile"],
    kinetica_tier_strategy = "( ( VRAM 1, RAM 7, PERSIST 5 ) )",
    kinetica_partition_clause = partition_clause
)

metadata.create_all(conn.engine)

CREATE TABLE...AS

CREATE TABLE ... AS is supported in both native SQL and in the Kinetica Dialect for SQLAlchemy as the following examples show:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from sqlalchemy import text

SQL_CREATE_TABLE = """
    CREATE OR REPLACE REPLICATED TEMP TABLE sqlalchemy.new_temp_employee AS
    (
        SELECT *
        FROM sqlalchemy.employee
    )
"""

conn.execute(text(SQL_CREATE_TABLE))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from sqlalchemy import MetaData, Table, select
from sqlalchemy_kinetica.custom_commands import CreateTableAs

metadata = MetaData()

schema_name = "sqlalchemy"
source_table_name = "employee"
target_table_name = "new_temp_employee"

source_table = Table(source_table_name, metadata, autoload_with = conn, schema = schema_name)

create_stmt = CreateTableAs(
        f"{schema_name}.{target_table_name}",
        select(source_table),
        prefixes = ["OR REPLACE", "REPLICATED", "TEMP"]
).compile(conn)

conn.execute(create_stmt)

External Tables

CREATE EXTERNAL TABLE is supported in both native SQL and in the Kinetica Dialect for SQLAlchemy as the following examples show:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from sqlalchemy import text

SQL_CREATE_TABLE = """
    CREATE EXTERNAL TABLE sqlalchemy.remote_employee
    REMOTE QUERY 'SELECT * EXCLUDE(profile) FROM sqlalchemy.employee'
    WITH OPTIONS
    (
        DATA SOURCE = 'sqlalchemy.jdbc_ds',
        SUBSCRIBE = TRUE,
        REMOTE_QUERY_INCREASING_COLUMN = 'id'
    )
    USING TABLE PROPERTIES (CHUNK SIZE = 1000000, NO_ERROR_IF_EXISTS = true, TTL = 120)
"""

conn.execute(text(SQL_CREATE_TABLE))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from sqlalchemy.sql.ddl import CreateTable
from sqlalchemy import MetaData, Table, text
from sqlalchemy_kinetica.dialect import KineticaDialect

metadata = MetaData()

external_table = Table(
    "remote_employee",
    metadata,
    schema = "sqlalchemy",
    prefixes = ["EXTERNAL"],
    info = {
        "CHUNK SIZE": 1000000,
        "NO_ERROR_IF_EXISTS": "TRUE",
        "TTL": 120
    },
    kinetica_external_table_remote_query = "SELECT id, dept_id, manager_id, first_name, last_name, sal, hire_date FROM sqlalchemy.employee",
    kinetica_external_table_option = {
        'DATA SOURCE': "sqlalchemy.jdbc_ds",
        'SUBSCRIBE': 'TRUE',
        'REMOTE_QUERY_INCREASING_COLUMN': 'id'
    }
)

create_stmt = CreateTable(external_table).compile(dialect = KineticaDialect())

conn.execute(create_stmt)

Managing Data

The full set of Kinetica data manipulation operations are available through both the execution of Kinetica SQL as string literals as well as via SQLAlchemy objects and the Kinetica dialect.

Inserting Data as Values

The following demonstrates inserting data as raw values using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from sqlalchemy import text

SQL_INSERT = """
    INSERT INTO sqlalchemy.employee (id, dept_id, manager_id, first_name, last_name, sal, hire_date)
    VALUES /* KI_HINT_UPDATE_ON_EXISTING_PK */
        (1, 1, null, 'Anne',     'Arbor',   200000,      '2000-01-01'),
        (2, 2,    1, 'Brooklyn', 'Bridges', 100000,      '2000-02-01'),
        (3, 3,    1, 'Cal',      'Cutta',   100000,      '2000-03-01'),
        (4, 2,    2, 'Dover',    'Della',   150000,      '2000-04-01'),
        (5, 2,    2, 'Elba',     'Eisle',    50000,      '2000-05-01'),
        (6, 4,    1, 'Frank',    'Furt',     12345.6789, '2000-06-01')
"""

conn.execute(text(SQL_INSERT))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
from sqlalchemy import MetaData, Table
from sqlalchemy_kinetica.custom_commands import ki_insert

metadata = MetaData()

employee = Table("employee", metadata, autoload_with = conn, schema = "sqlalchemy")

records = [
    {"id": 1, "dept_id": 1, "manager_id": None, "first_name": "Anne",     "last_name": "Arbor",   "sal": 200000,      "hire_date": "2000-01-01"},
    {"id": 2, "dept_id": 2, "manager_id":    1, "first_name": "Brooklyn", "last_name": "Bridges", "sal": 100000,      "hire_date": "2000-02-01"},
    {"id": 3, "dept_id": 3, "manager_id":    1, "first_name": "Cal",      "last_name": "Cutta",   "sal": 100000,      "hire_date": "2000-03-01"},
    {"id": 4, "dept_id": 2, "manager_id":    2, "first_name": "Dover",    "last_name": "Della",   "sal": 150000,      "hire_date": "2000-04-01"},
    {"id": 5, "dept_id": 2, "manager_id":    2, "first_name": "Elba",     "last_name": "Eisle",   "sal":  50000,      "hire_date": "2000-05-01"},
    {"id": 6, "dept_id": 4, "manager_id":    1, "first_name": "Frank",    "last_name": "Furt",    "sal":  12345.6789, "hire_date": "2000-06-01"},
]

# Use the UPDATE_ON_EXISTING_PK hint to invoke upsert mode,
# which will overwrite an existing record with a new record if the PKs match
insert_stmt = ki_insert(employee, insert_hint = "KI_HINT_UPDATE_ON_EXISTING_PK")

conn.execute(insert_stmt, records)

Inserting Data from a Query

The following demonstrates inserting data from a query using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from sqlalchemy import text

SQL_INSERT = """
    INSERT INTO sqlalchemy.employee_backup (id, dept_id, manager_id, first_name, last_name, sal)
    SELECT id, dept_id, manager_id, first_name, last_name, sal
    FROM sqlalchemy.employee
    WHERE hire_date >= '2000-04-01'
"""

conn.execute(text(SQL_INSERT))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from sqlalchemy import MetaData, Table, insert, select
from sqlalchemy_kinetica.custom_commands import ki_insert

metadata = MetaData()

# Create handles to the source & target tables
employee = Table("employee", metadata, autoload_with = conn, schema = "sqlalchemy").alias("e")
employee_backup = Table("employee_backup", metadata, autoload_with = conn, schema = "sqlalchemy")

# Prepare the insert statement with a select clause
insert_stmt = insert(employee_backup).from_select(
    employee_backup.c["id", "dept_id", "manager_id", "first_name", "last_name", "sal"],      # The columns to insert into
    (                                                                                        # The select statement providing the data
        select(employee.c["id", "dept_id", "manager_id", "first_name", "last_name", "sal"])
        .where(employee.c["hire_date"] >= '2000-04-01')
    )
).compile(conn, compile_kwargs={"literal_binds": True})

conn.execute(insert_stmt)

Inserting Data using a CTE

The following demonstrates inserting data with a CTE using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from sqlalchemy import text

SQL_INSERT = """
    INSERT INTO sqlalchemy.dept2_emp_mgr_roster (emp_first_name, emp_last_name, mgr_first_name, mgr_last_name)
    WITH
        dept2_emp AS
        (
            SELECT first_name, last_name, manager_id
            FROM sqlalchemy.employee
            WHERE dept_id = 2
        ),
        dept2_mgr AS
        (
            SELECT first_name, last_name, id
            FROM sqlalchemy.employee
            WHERE dept_id = 2
        )
    SELECT d2emp.first_name, d2emp.last_name, d2mgr.first_name, d2mgr.last_name
    FROM
        dept2_emp as d2emp
        JOIN dept2_mgr as d2mgr ON d2emp.manager_id = d2mgr.id
"""

conn.execute(text(SQL_INSERT))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from sqlalchemy import MetaData, Table, select
from sqlalchemy_kinetica.custom_commands import ki_insert

metadata = MetaData()

# Create handles to the source & target tables
employee = Table("employee", metadata, autoload_with = conn, schema = "sqlalchemy").alias("e")
dept2_roster = Table("dept2_emp_mgr_roster", metadata, autoload_with = conn, schema = "sqlalchemy")

# Define the first CTE for dept2_emp
dept2_emp = (
    select(employee.c.first_name, employee.c.last_name, employee.c.manager_id)
    .where(employee.c.dept_id == 2)
    .cte(name = "dept2_emp")
)

# Define the second CTE for dept2_mgr
dept2_mgr = (
    select(employee.c.first_name, employee.c.last_name, employee.c.id)
    .where(employee.c.dept_id == 2)
    .cte(name = "dept2_mgr")
)

# Prepare the insert statement with a select clause
insert_stmt = dept2_roster.insert().from_select(
    [
        "emp_first_name",
        "emp_last_name",
        "mgr_first_name",
        "mgr_last_name"
    ],
    (
        select(
            dept2_emp.c.first_name.label("emp_first_name"),
            dept2_emp.c.last_name.label("emp_last_name"),
            dept2_mgr.c.first_name.label("mgr_first_name"),
            dept2_mgr.c.last_name.label("mgr_last_name")
        )
        .select_from(
            dept2_emp.join(dept2_mgr, dept2_emp.c.manager_id == dept2_mgr.c.id)
        )
    )
).compile(conn, compile_kwargs = {"literal_binds": True})

conn.execute(insert_stmt)

Updating Data with Constants

The following demonstrates updating data with raw values using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
1
2
3
4
5
6
7
8
from sqlalchemy import text

SQL_UPDATE = """
    UPDATE sqlalchemy.employee
    SET sal = sal * 1.05
"""

conn.execute(text(SQL_UPDATE))
SQLAlchemy/Kinetica Dialect Objects
1
2
3
4
5
6
7
8
9
from sqlalchemy import MetaData, Table

metadata = MetaData()

employee = Table("employee", metadata, autoload_with = conn, schema = "sqlalchemy")

update_stmt = employee.update().values(sal = employee.c.sal * 1.05)

conn.execute(update_stmt)

Updating Data with Subquery Filter

The following demonstrates updating data with a subquery filter (subquery in the WHERE clause) using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from sqlalchemy import text

SQL_UPDATE = """
    UPDATE sqlalchemy.employee b
    SET sal = sal * 1.05
    WHERE sal =
    (
        SELECT MIN(sal)
        FROM sqlalchemy.employee l
        WHERE b.dept_id = l.dept_id
    )
"""

conn.execute(text(SQL_UPDATE))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from sqlalchemy import MetaData, Table, select, update, alias, func

metadata = MetaData()

# Create a handle to the lookup table
e_lookup = Table("employee", metadata, autoload_with = conn, schema = "sqlalchemy")

# Create an alias for the base table being updated
e_base = alias(e_lookup, name = "b")

# Subquery to find the MIN(sal) for each department
min_sal_in_dept = (
    select(func.min(e_lookup.c.sal))
    .where(e_base.c.dept_id == e_lookup.c.dept_id)
    .scalar_subquery()
)

# Update statement
update_stmt = (
    update(e_base)
    .where(e_base.c.sal == min_sal_in_dept)
    .values(sal = e_base.c.sal * 1.05)
).compile(conn, compile_kwargs = {"literal_binds": True})

conn.execute(update_stmt)

Updating Data with Subquery Assignment

The following demonstrates updating data with a subquery assignment (subquery in the SET clause) using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from sqlalchemy import text

SQL_UPDATE = """
    UPDATE  sqlalchemy.employee b
    SET     sal =
            (
                SELECT MAX(sal)
                FROM sqlalchemy.employee l
                WHERE l.dept_id = b.dept_id
            ) * .1 + sal * .9
"""

conn.execute(text(SQL_UPDATE))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from sqlalchemy import MetaData, Table, select, update, alias, func

metadata = MetaData()

# Create a handle to the lookup table
e_lookup = Table("employee", metadata, autoload_with = conn, schema = "sqlalchemy")

# Create an alias for the base table being updated
e_base = alias(e_lookup, name = "b")

# Subquery to find the MAX(sal) for each department
max_sal_in_dept = (
    select(func.max(e_lookup.c.sal))
    .where(e_base.c.dept_id == e_lookup.c.dept_id)
    .scalar_subquery()
)

# Update statement
update_stmt = (
    update(e_base)
    .values(sal = max_sal_in_dept * .1 + e_base.c.sal * .9)
).compile(conn, compile_kwargs = {"literal_binds": True})

conn.execute(update_stmt)

Updating Data with Join

The following demonstrates updating data with a join using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from sqlalchemy import text

SQL_UPDATE = """
    UPDATE eb
    SET sal = e.sal, manager_id = e.manager_id
    FROM sqlalchemy.employee_backup eb
    JOIN sqlalchemy.employee e ON eb.id = e.id
"""

conn.execute(text(SQL_UPDATE))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from sqlalchemy import MetaData, Table, select, update, alias, func
from sqlalchemy_kinetica.custom_commands import KiUpdate

metadata = MetaData()

# Create handles to the source & target tables
employee = Table("employee", metadata, autoload_with = conn, schema = "sqlalchemy").alias("e")
employee_backup = Table("employee_backup", metadata, autoload_with = conn, schema = "sqlalchemy")

update_stmt = KiUpdate(
    employee_backup,
    from_table = employee,
    join_condition = employee_backup.c.id == employee.c.id,
).values(
    sal = employee.c.sal,
    manager_id = employee.c.manager_id
).compile(conn)

conn.execute(update_stmt)

Deleting Data with Constants

The following demonstrates deleting data with raw values using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
1
2
3
4
5
6
7
8
9
from sqlalchemy import text

SQL_DELETE = """
    DELETE
    FROM sqlalchemy.employee
    WHERE id = 6
"""

conn.execute(text(SQL_DELETE))
SQLAlchemy/Kinetica Dialect Objects
1
2
3
4
5
6
7
8
9
from sqlalchemy import MetaData, Table

metadata = MetaData()

employee = Table("employee", metadata, autoload_with = conn, schema = "sqlalchemy")

delete_stmt = employee.delete().where(employee.c.id == 6)

conn.execute(delete_stmt)

Deleting Data with Subquery Filter

The following demonstrates deleting data with a subquery filter (subquery in the WHERE clause) using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from sqlalchemy import text

SQL_DELETE = """
    DELETE
    FROM sqlalchemy.employee b
    WHERE id =
        (
            SELECT MAX(l.id)
            FROM sqlalchemy.employee l
            WHERE b.dept_id = l.dept_id
        )
"""

conn.execute(text(SQL_DELETE))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from sqlalchemy import MetaData, Table, select, delete, alias, func

metadata = MetaData()

# Create handles with aliases to the lookup & base tables
e_base = Table("employee", metadata, autoload_with = conn, schema = "sqlalchemy").alias("b")
e_lookup = Table("employee", metadata, autoload_with = conn, schema = "sqlalchemy").alias("l")

max_id_in_dept = (
    select(func.max(e_lookup.c.id))
    .where(e_base.c.dept_id == e_lookup.c.dept_id)
).scalar_subquery()

# Create the DELETE statement with the subquery in the WHERE clause
delete_stmt = delete(e_base).where(e_base.c.id == max_id_in_dept).compile(conn)

conn.execute(delete_stmt)

Querying Data

The full Kinetica query capability is available through the execution of Kinetica SQL as string literals. Much of the Kinetica query feature set is available via SQLAlchemy objects and the Kinetica dialect.

CTE

The following demonstrates a CTE query using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from sqlalchemy import text

SQL_SELECT = """
    WITH
        dept2_emp_sal_by_mgr (manager_id, salary) AS
        (
            SELECT manager_id, sal
            FROM sqlalchemy.employee
            WHERE dept_id = 2
        )
    SELECT
        manager_id dept2_mgr_id,
        MAX(salary) dept2_highest_emp_sal_per_mgr,
        COUNT(*) as dept2_total_emp_per_mgr
    FROM dept2_emp_sal_by_mgr
    GROUP BY manager_id
"""

result = conn.execute(text(SQL_SELECT))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from sqlalchemy import MetaData, Table, select, func

metadata = MetaData()

# Create a handle to the source table
employee = Table("employee", metadata, autoload_with = conn, schema = "sqlalchemy")

# Define the CTE (Common Table Expression)
dept2_emp_sal_by_mgr = (
    select(employee.c.manager_id, employee.c.sal.label("salary"))
    .where(employee.c.dept_id == 2)
    .cte(name = "dept2_emp_sal_by_mgr")
)

# Define the main query using the CTE
query = (
    select(
        dept2_emp_sal_by_mgr.c.manager_id.label("dept2_mgr_id"),
        func.max(dept2_emp_sal_by_mgr.c.salary).label("dept2_highest_emp_sal_per_mgr"),
        func.count().label("dept2_total_emp_per_mgr")
    )
    .group_by(dept2_emp_sal_by_mgr.c.manager_id)
).compile(conn, compile_kwargs = {"literal_binds": True})

result = conn.execute(query)

Join

The following demonstrates a join query using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from sqlalchemy import text

SQL_SELECT = """
    SELECT
        e.last_name || ', ' || e.first_name AS "Employee_Name",
        m.last_name || ', ' || m.first_name AS "Manager_Name"
    FROM
        sqlalchemy.employee e
        LEFT JOIN sqlalchemy.employee m ON e.manager_id = m.id
    WHERE
        e.dept_id IN (1, 2, 3)
    ORDER BY
        m.id ASC NULLS FIRST,
        e.hire_date
"""

result = conn.execute(text(SQL_SELECT))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from sqlalchemy import MetaData, Table, select, nullsfirst, asc

metadata = MetaData()

# Create a handle to the source table
employee = Table("employee", metadata, autoload_with = conn, schema = "sqlalchemy")

# Aliases for self-join
e = employee.alias("e")
m = employee.alias("m")

# Construct the query
query = select(
    (e.c.last_name + ", " + e.c.first_name).label("Employee_Name"),
    (m.c.last_name + ", " + m.c.first_name).label("Manager_Name")
).select_from(
    e.outerjoin(m, e.c.manager_id == m.c.id)
).where(
    e.c.dept_id.in_([1, 2, 3])
).order_by(
    nullsfirst(asc(m.c.id)),
    e.c.hire_date
).compile(conn, compile_kwargs = {"literal_binds": True})

result = conn.execute(query)

ASOF Join

The following demonstrates an ASOF join query using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from sqlalchemy import text

SQL_SELECT = """
    SELECT
        t.id,
        t.dt AS execution_dt,
        q.open_dt AS quote_dt,
        t.price AS execution_price,
        q.open_price
    FROM
        sqlalchemy.trades t
        LEFT JOIN sqlalchemy.quotes q ON
            t.ticker = q.symbol AND
            ASOF(t.dt, q.open_dt, INTERVAL '-1' DAY, INTERVAL '0' DAY, MAX)
"""

result = conn.execute(text(SQL_SELECT))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from sqlalchemy import MetaData, Table, select, text
from sqlalchemy_kinetica.custom_commands import Asof

metadata = MetaData()

# Create handles to the source tables
q = Table("quotes", metadata, autoload_with = conn, schema = "sqlalchemy").alias("q")
t = Table("trades", metadata, autoload_with = conn, schema = "sqlalchemy").alias("t")

# Define the ASOF function in the query
asof_condition = Asof(
    t.c.dt,
    q.c.open_dt,
    text("INTERVAL '-1' DAY"),
    text("INTERVAL '0' DAY"),
    text("MAX")
)

# Construct the SELECT statement
query = select(
    t.c.id,
    t.c.dt.label("execution_dt"),
    q.c.open_dt.label("quote_dt"),
    t.c.price.label("execution_price"),
    q.c.open_price
).select_from(
    t.outerjoin(
        q, (t.c.ticker == q.c.symbol) & asof_condition
    )
).compile(conn, compile_kwargs = {"literal_binds": True})

result = conn.execute(query)

Aggregation (GROUP BY)

The following demonstrates an aggregation query using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
1
2
3
4
5
6
7
8
9
from sqlalchemy import text

SQL_SELECT = """
    SELECT dept_id AS "DepartmentID", COUNT(*) AS "TotalEmployees", SUM(sal) AS "TotalSalary"
    FROM sqlalchemy.employee
    GROUP BY dept_id
"""

result = conn.execute(text(SQL_SELECT))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from sqlalchemy import MetaData, Table, select, func

metadata = MetaData()

# Create handles to the source tables
e = Table("employee", metadata, autoload_with = conn, schema = "sqlalchemy").alias("e")

# Construct the SELECT statement
query = select(
    e.c.dept_id.label("DepartmentID"),
    func.count().label("TotalEmployees"),
    func.sum(e.c.sal).label("TotalSalary")
).group_by(
    e.c.dept_id
).order_by(
    e.c.dept_id.asc()
).compile(conn)

result = conn.execute(query)

Aggregation (ROLLUP)

The following demonstrates an aggregated roll-up query using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from sqlalchemy import text

SQL_SELECT = """
    SELECT
        CASE
            WHEN (GROUPING(dept_id) = 1) THEN '<Total>'
            ELSE NVL(STRING(dept_id), '<No Department>')
        END AS "DepartmentID",
        COUNT(*) AS "TotalEmployees",
        SUM(sal) AS "TotalSalary"
    FROM sqlalchemy.employee
    GROUP BY ROLLUP(dept_id)
    ORDER BY "DepartmentID"
"""

result = conn.execute(text(SQL_SELECT))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from sqlalchemy import MetaData, Table, select, case, func, VARCHAR

metadata = MetaData()

# Create handles to the source tables
e = Table("employee", metadata, autoload_with = conn, schema = "sqlalchemy").alias("e")

# Define the grouping column
dept_group = case(
    (func.grouping(e.c.dept_id) == 1, "<Total>"),
    else_ = func.nvl(func.cast(e.c.dept_id, VARCHAR), "<No Department>")
).label("DepartmentID")

# Construct the SELECT statement
query = select(
    dept_group,
    func.count().label("TotalEmployees"),
    func.sum(e.c.sal).label("TotalSalary")
).group_by(
    func.rollup(e.c.dept_id)
).order_by(
    dept_group.asc()
).compile(conn, compile_kwargs = {"literal_binds": True})

result = conn.execute(query)

Aggregation (GROUPING SETS)

The following demonstrates an aggregated grouping sets query using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from sqlalchemy import text

SQL_SELECT = """
    SELECT
        CASE
            WHEN (GROUPING(dept_id) = 1) THEN '<Total>'
            ELSE NVL(STRING(dept_id), '<No Department>')
        END AS "DepartmentID",
        CASE
            WHEN (GROUPING(manager_id) = 1) THEN '<Total>'
            ELSE NVL(STRING(manager_id), '<No Manager>')
        END AS "ManagerID",
        COUNT(*) AS "TotalEmployees",
        SUM(sal) AS "TotalSalary"
    FROM sqlalchemy.employee
    GROUP BY GROUPING SETS ((dept_id), (manager_id), ())
    ORDER BY "DepartmentID", "ManagerID"
"""

result = conn.execute(text(SQL_SELECT))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from sqlalchemy import MetaData, Table, text, select, case, func, VARCHAR

metadata = MetaData()

# Create handles to the source tables
e = Table("employee", metadata, autoload_with = conn, schema = "sqlalchemy").alias("e")

# Define the department grouping column
dept_group = case(
    (func.grouping(e.c.dept_id) == 1, "<Total>"),
    else_ = func.nvl(func.cast(e.c.dept_id, VARCHAR), "<No Department>")
).label("DepartmentID")

# Define the manager grouping column
manager_group = case(
    (func.grouping(e.c.manager_id) == 1, "<Total>"),
    else_ = func.nvl(func.cast(e.c.manager_id, VARCHAR), "<No Manager>")
).label("ManagerID")

# Use raw SQL for GROUPING SETS
# There is no direct construct in SQLAlchemy to support this
# so, it's best to model it as raw text SQL
grouping_sets_clause = text("GROUPING SETS((dept_id), (manager_id), ())")

# Construct the SELECT statement
query = select(
    dept_group,
    manager_group,
    func.count().label("TotalEmployees"),
    func.sum(e.c.sal).label("TotalSalary")
).group_by(
    grouping_sets_clause
).order_by(
    dept_group.asc(),
    manager_group.asc()
).compile(conn, compile_kwargs = {"literal_binds": True})

result = conn.execute(query)

Window Functions (Rolling Sum)

The following demonstrates a rolling sum window function query using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from sqlalchemy import text

SQL_SELECT = """
    SELECT
        ticker AS "Stock",
        dt AS "TradeDateTime",
        price AS "TradePrice",
        DECIMAL
        (
            SUM(price) OVER
                (
                    PARTITION BY ticker
                    ORDER BY dt
                )
        ) AS "TotalTraded"
    FROM sqlalchemy.trades
    ORDER BY ticker, dt
"""

result = conn.execute(text(SQL_SELECT))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from sqlalchemy import MetaData, Table, select, func

metadata = MetaData()

# Create handles to the source tables
t = Table("trades", metadata, autoload_with = conn, schema = "sqlalchemy").alias("t")

# Define the rolling sum window function
rolling_sum = func.sum(t.c.price).over(
    partition_by = t.c.ticker,
    order_by = t.c.dt
)

# Construct the SELECT statement
query = select(
    t.c.ticker.label("Stock"),
    t.c.dt.label("TradeDateTime"),
    t.c.price.label("TradePrice"),
    rolling_sum.label("TotalTraded")
).order_by(
    t.c.ticker,
    t.c.dt
).compile(conn)

result = conn.execute(query)

Window Functions (Moving Average)

The following demonstrates a moving average window function query using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
from sqlalchemy import text

SQL_SELECT = """
    SELECT
        ticker AS "Stock",
        dt AS "TradeDateTime",
        price AS "TradePrice",
        DECIMAL
        (
            AVG(price) OVER
                (
                    PARTITION BY ticker
                    ORDER BY dt
                    RANGE BETWEEN INTERVAL 2 DAYS PRECEDING AND INTERVAL 1 DAY FOLLOWING
                )
        ) AS "AverageTradePrice"
    FROM sqlalchemy.trades
    ORDER BY ticker, dt
"""

result = conn.execute(text(SQL_SELECT))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from sqlalchemy import MetaData, Table, select, func, text, BIGINT
from sqlalchemy_kinetica.kinetica_types import DECIMAL

metadata = MetaData()

# Create handles to the source tables
t = Table("trades", metadata, autoload_with = conn, schema = "sqlalchemy").alias("t")

# Define the moving average window function
moving_avg = func.avg(t.c.price).over(
    partition_by = t.c.ticker,
    order_by = func.cast(t.c.dt, BIGINT),
    range_ = (-2 * 24 * 60 * 60 * 1000, 1 * 24 * 60 * 60 * 1000)  # This sets range between 2 days preceding and 1 following
)

# Construct the SELECT statement
query = select(
    t.c.ticker.label("Stock"),
    t.c.dt.label("TradeDateTime"),
    t.c.price.label("TradePrice"),
    func.cast(moving_avg, DECIMAL(10,2)).label("AverageTradePrice")
).order_by(
    t.c.ticker,
    t.c.dt
).compile(conn, compile_kwargs = {"literal_binds": True})

result = conn.execute(query)

Window Functions (Ranking)

The following demonstrates a ranking window function query using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from sqlalchemy import text

SQL_SELECT = """
    SELECT
        ticker AS "Stock",
        dt AS "TradeDateTime",
        price AS "TradePrice",
        RANK() OVER
            (
                PARTITION BY ticker
                ORDER BY price
            ) AS "RankedTrade",
        DECIMAL
        (
            PERCENT_RANK() OVER
                (
                    PARTITION BY ticker
                    ORDER BY price
                )
        ) * 100 AS "PercentRankedTrade"
    FROM sqlalchemy.trades
    ORDER BY ticker, dt
"""

result = conn.execute(text(SQL_SELECT))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from sqlalchemy import MetaData, Table, select, func
from sqlalchemy_kinetica.kinetica_types import DECIMAL

metadata = MetaData()

# Create handles to the source tables
t = Table("trades", metadata, autoload_with = conn, schema = "sqlalchemy").alias("t")

# Define the ranking window functions
ranked_trade = func.rank().over(
    partition_by = t.c.ticker,
    order_by = t.c.price
)

percent_ranked_trade = func.percent_rank().over(
            partition_by = t.c.ticker,
            order_by = t.c.price
        ) * 100

# Construct the SELECT statement
query = select(
    t.c.ticker.label("Stock"),
    t.c.dt.label("TradeDateTime"),
    t.c.price.label("TradePrice"),
    ranked_trade.label("RankedTrade"),
    func.cast(percent_ranked_trade, DECIMAL(10,2)).label("PercentRankedTrade")
).order_by(
    t.c.ticker,
    t.c.dt
).compile(conn, compile_kwargs = {"literal_binds": True})

result = conn.execute(query)

Pivot

The following demonstrates a pivot query using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from sqlalchemy import text

SQL_SELECT = """
    SELECT
        name,
        Home_Phone,
        Work_Phone,
        Cell_Phone
    FROM
        sqlalchemy.phone_list
    PIVOT
    (
        MAX(phone_number) AS Phone
        FOR phone_type IN ('Home', 'Work', 'Cell')
    )
    ORDER BY name
"""

result = conn.execute(text(SQL_SELECT))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from sqlalchemy import MetaData, Table, func, column
from sqlalchemy_kinetica.custom_commands import PivotSelect

metadata = MetaData()

# Create a handle to the source table
phone_list = Table("phone_list", metadata, autoload_with = conn, schema = "sqlalchemy")

# Define the aggregate expressions and pivot details
aggregate_expressions = [(func.max(column("phone_number")), "Phone")]
pivot_column = column("phone_type")
pivot_values = ["Home", "Work", "Cell"]

# Create the Pivot object
query = (
    PivotSelect(
        column("name"),
        column("Home_Phone"),
        column("Work_Phone"),
        column("Cell_Phone")
    )
    .select_from(phone_list)
    .pivot("max(phone_number) AS Phone", "phone_type", ["'Home'", "'Work'", "'Cell'"])
    .order_by(column("name"))
).compile(conn)

result = conn.execute(query)

Unpivot

The following demonstrates an unpivot query using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from sqlalchemy import text

SQL_SELECT = """
    SELECT name, phone_type, phone_number
    FROM
    (
        SELECT
            name,
            Home_Phone AS Home,
            Work_Phone AS Work,
            Cell_Phone AS Cell
        FROM
            sqlalchemy.customer_contact
    )
    UNPIVOT (phone_number FOR phone_type IN (Home, Work, Cell))
    ORDER BY name, phone_type
"""

result = conn.execute(text(SQL_SELECT))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from sqlalchemy import MetaData, Table, select, column
from sqlalchemy_kinetica.custom_commands import UnpivotSelect

metadata = MetaData()

# Create a handle to the source table
customer_contact = Table('customer_contact', metadata, autoload_with = conn, schema = "sqlalchemy").alias("cc")

# Create the subquery (as an alias)
subquery = select(
    customer_contact.c.name,
    customer_contact.c.home_phone.label('Home'),
    customer_contact.c.work_phone.label('Work'),
    customer_contact.c.cell_phone.label('Cell')
)

query = (
    UnpivotSelect(column("name"), column("phone_type"), column("phone_number"))
    .select_from(subquery)
    .unpivot("phone_number", "phone_type", ["Home", "Work", "Cell"])
    .order_by(column("name"), column("phone_type"))
).compile(conn)

result = conn.execute(query)

Set Union (Deduplicate)

The following demonstrates a set union query that keeps duplicate records using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from sqlalchemy import text

SQL_SELECT = """
    SELECT
        food_name,
        category,
        price
    FROM
        sqlalchemy.lunch_menu
    UNION
    SELECT
        food_name,
        category,
        price
    FROM
        sqlalchemy.dinner_menu
    ORDER BY
        food_name,
        category,
        price
"""

result = conn.execute(text(SQL_SELECT))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from sqlalchemy import MetaData, Table, select, union

metadata = MetaData()

# Create handles to the source tables
dinner_menu = Table('dinner_menu', metadata, autoload_with = conn, schema = "sqlalchemy").alias('dm')
lunch_menu = Table('lunch_menu', metadata, autoload_with = conn, schema = "sqlalchemy").alias('lm')

# Create the Select statements
lunch_select = select(lunch_menu.c.food_name, lunch_menu.c.category, lunch_menu.c.price)
dinner_select = select(dinner_menu.c.food_name, dinner_menu.c.category, dinner_menu.c.price)

# Create the union query
query = union(lunch_select, dinner_select).order_by("food_name", "category", "price").compile(conn)

result = conn.execute(query)

Set Union (Keep Duplicates)

The following demonstrates a set union query that discards duplicate records using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from sqlalchemy import text

SQL_SELECT = """
    SELECT
        food_name,
        category,
        price
    FROM
        sqlalchemy.lunch_menu
    UNION ALL
    SELECT
        food_name,
        category,
        price
    FROM
        sqlalchemy.dinner_menu
    ORDER BY
        food_name,
        category,
        price
"""

result = conn.execute(text(SQL_SELECT))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from sqlalchemy import MetaData, Table, select, union_all

metadata = MetaData()

# Create handles to the source tables
dinner_menu = Table('dinner_menu', metadata, autoload_with = conn, schema = "sqlalchemy").alias('dm')
lunch_menu = Table('lunch_menu', metadata, autoload_with = conn, schema = "sqlalchemy").alias('lm')

# Create the Select statements
lunch_select = select(lunch_menu.c.food_name, lunch_menu.c.category, lunch_menu.c.price)
dinner_select = select(dinner_menu.c.food_name, dinner_menu.c.category, dinner_menu.c.price)

# Create the union all query
query = union_all(lunch_select, dinner_select).order_by("food_name", "category", "price").compile(conn)

result = conn.execute(query)

Set Intersection

The following demonstrates a set intersection query that discards duplicate records using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from sqlalchemy import text

SQL_SELECT = """
    SELECT
        food_name,
        category,
        price
    FROM
        sqlalchemy.lunch_menu
    INTERSECT
    SELECT
        food_name,
        category,
        price
    FROM
        sqlalchemy.dinner_menu
    ORDER BY
        food_name,
        category,
        price
"""

result = conn.execute(text(SQL_SELECT))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from sqlalchemy import MetaData, Table, select, intersect

metadata = MetaData()

# Create handles to the source tables
dinner_menu = Table('dinner_menu', metadata, autoload_with = conn, schema = "sqlalchemy").alias('dm')
lunch_menu = Table('lunch_menu', metadata, autoload_with = conn, schema = "sqlalchemy").alias('lm')

# Create the Select statements
lunch_select = select(lunch_menu.c.food_name, lunch_menu.c.category, lunch_menu.c.price)
dinner_select = select(dinner_menu.c.food_name, dinner_menu.c.category, dinner_menu.c.price)

# Create the intersect query
query = intersect(lunch_select, dinner_select).order_by("food_name", "category", "price").compile(conn)

result = conn.execute(query)

Set Exception

The following demonstrates a set exception query that discards duplicate records using a SQL literal as compared to using SQLAlchemy objects:

SQL Literal
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from sqlalchemy import text

SQL_SELECT = """
    SELECT
        food_name,
        category,
        price
    FROM
        sqlalchemy.lunch_menu
    EXCEPT
    SELECT
        food_name,
        category,
        price
    FROM
        sqlalchemy.dinner_menu
    ORDER BY
        food_name,
        category,
        price
"""

result = conn.execute(text(SQL_SELECT))
SQLAlchemy/Kinetica Dialect Objects
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from sqlalchemy import MetaData, Table, select, except_

metadata = MetaData()

# Create handles to the source tables
dinner_menu = Table('dinner_menu', metadata, autoload_with = conn, schema = "sqlalchemy").alias('dm')
lunch_menu = Table('lunch_menu', metadata, autoload_with = conn, schema = "sqlalchemy").alias('lm')

# Create the Select statements
lunch_select = select(lunch_menu.c.food_name, lunch_menu.c.category, lunch_menu.c.price)
dinner_select = select(dinner_menu.c.food_name, dinner_menu.c.category, dinner_menu.c.price)

# Create the except query
query = except_(lunch_select, dinner_select).order_by("food_name", "category", "price").compile(conn)

result = conn.execute(query)