Source code for gpudb.dbapi.aio

"""
Pure Python DB API wrapper around Kinetica Python API. This
async implementation uses the sync implementation and asyncio.to_thread.


"""
from typing import Any, Dict, overload
from gpudb.dbapi.pep249.type_constructors import *
from gpudb.dbapi.core.exceptions import (
    DatabaseError,
    DataError,
    Error,
    InterfaceError,
    IntegrityError,
    InternalError,
    NotSupportedError,
    OperationalError,
    ProgrammingError,
)
from .connection import AsyncKineticaConnection

from .cursor import AsyncCursor

# from .. import __version__

__all__ = [
    "apilevel",
    "threadsafety",
    "paramstyle",
    "aconnect",
    "AsyncKineticaConnection",
    "AsyncCursor",
    "Binary",
    "STRING",
    "BINARY",
    "NUMBER",
    "DATETIME",
    "ROWID",
    "Date",
    "Time",
    "Timestamp",
    "DateFromTicks",
    "TimeFromTicks",
    "TimestampFromTicks",
    "Error",
    "InterfaceError",
    "DatabaseError",
    "DataError",
    "IntegrityError",
    "InternalError",
    "NotSupportedError",
    "OperationalError",
    "ProgrammingError",
]

# pylint: disable=invalid-name
apilevel = "2.0"
threadsafety = 1
paramstyle = "qmark"


@overload
def aconnect(
    connection_string: str = "kinetica://", *, connect_args: Dict[str, Any] = ...
) -> AsyncKineticaConnection:
    """The global method to return an async Kinetica connection

    Example
    ::

        con = gpudb.connect("kinetica://",
                     connect_args={
                         'url': 'http://localhost:9191',
                         'username': 'user',
                         'password': 'password',
                         'options': {'bypass_ssl_cert_check': True},
                     })


    Args:
        connection_string (str): the connection string which must be 'kinetica://'
        connect_args (Dict[str, Any], optional): a mandatory `dict` like

            connect_args={
                'url': 'http://localhost:9191',

                'username': 'user',

                'password': 'password',

                'options': {'bypass_ssl_cert_check': True},
            })

            The keys that are valid for the `options` dict within `connect_args` 
            is the same set that is allowed by the class :class:`GPUDB.Options` 
            in the module :py:mod:`gpudb`

    Returns:
        KineticaConnection: an instance of the :class:`AsyncKineticaConnection`
    """
    ...


[docs] def aconnect( connection_string: str = "kinetica://", **kwargs: Dict[str, Any], ) -> AsyncKineticaConnection: """The global method to return an async Kinetica connection Example :: # Basic authentication con = gpudb.connect("kinetica://", connect_args={ 'url': 'http://localhost:9191', 'username': 'user', 'password': 'password', 'options': {'bypass_ssl_cert_check': True}, }) # oauth2 authentication con = gpudb.connect("kinetica://", connect_args={ 'url': 'http://localhost:9191', 'oauth_token': 'token_value', 'options': {'bypass_ssl_cert_check': True}, }) Args: connection_string (str, optional): the connection string which must be 'kinetica://' **kwargs (Dict[str, Any]): the arguments passed to the overloaded method :meth:`connect` Raises: ProgrammingError: Raised in case wrong connection parameters are detected or a connection fails for some other reason Returns: KineticaConnection: a :class:`KineticaConnection` instance """ if connection_string is None or connection_string != "kinetica://": raise ProgrammingError("'connection_string' has to be 'kinetica://'") connection_args = kwargs.pop("connect_args", None) if not connection_args or len(connection_args) == 0: raise ProgrammingError("'connection_args' cannot be None or empty") def extract_connect_args(connect_args, *values): return (connect_args.get(arg, None) for arg in values) url, username, password, oauth_token, options = extract_connect_args( connection_args, "url", "username", "password", "oauth_token", "options" ) if not url or not len(url) > 0: raise ProgrammingError( "Valid 'URL' not found in the 'connect_args' dict ..." ) username = username if username else "" password = password if password else "" oauth_token = oauth_token if oauth_token else "" return AsyncKineticaConnection( url=url, username=username, password=password, oauth_token=oauth_token, connection_options=options if options else {}, )