Source code for gpudb_dataframe

# ---------------------------------------------------------------------------
# gpudb_dataframe.py - Kinetica API interaction with pandas dataframes.
#
# Copyright (c) 2023 Kinetica DB Inc.
# ---------------------------------------------------------------------------

import logging
import pandas as pd
import numpy as np
from math import log2, ceil
import json
from tqdm.auto import tqdm

from typeguard import typechecked
from typing import List, Optional, Union

from . import GPUdbRecordColumn
from . import GPUdbColumnProperty
from . import GPUdb
from . import GPUdbTable
from . import GPUdbSqlIterator
from . import GPUdbException


[docs] class DataFrameUtils: _COL_TYPE = GPUdbRecordColumn._ColumnType _LOG = logging.getLogger(f"gpudb.DataFrameUtils") TQDM_NCOLS = None BATCH_SIZE = 5000 @classmethod def vec_to_bytes(cls, vec: list) -> bytes: if vec is None: return None return np.array(vec).astype(np.float32).tobytes() @classmethod def bytes_to_vec(cls, bvec: bytes) -> np.ndarray: return None if not bvec else np.frombuffer(bvec, dtype=np.float32)
[docs] @classmethod @typechecked def sql_to_df(cls, db: GPUdb, sql: str, sql_params: list = [], batch_size: int = BATCH_SIZE, sql_opts: dict = {}, show_progress: bool = False) -> Optional[pd.DataFrame]: """Create a :class:`pd.Dataframe` from the results of a SQL query. Args: db (GPUdb): a :class:`GPUdb` instance sql (str): the SQL query sql_params (list): the query parameters. Defaults to None. batch_size (int): the batch size for the SQL execution results. Defaults to :attr:`BATCH_SIZE`. sql_opts (dict): the SQL options as a dict. Defaults to None. show_progress (bool): whether to display progress or not. Defaults to False. Raises: GPUdbException If the SQL query failed Returns: pd.DataFrame: a Pandas :class:`pd.Dataframe` or None if the SQL query has returned no results """ cls._LOG.debug('Getting records from <{}>'.format(sql)) if(show_progress): print(f"Executing SQL...") result_list = [] with GPUdbSqlIterator(db, sql, batch_size=batch_size, sql_params=sql_params, sql_opts=sql_opts) as sql_iter: if (sql_iter.type_map is None): # If there are no results then we can't infer datatypes. cls._LOG.debug("SQL returned no results.") return None for rec in tqdm(iterable=sql_iter, total=sql_iter.total_count, desc='Fetching Records', disable=(not show_progress), ncols=cls.TQDM_NCOLS): result_list.append(rec) result_df = cls._convert_records_to_df(result_list, sql_iter.type_map) # if (sql_iter.total_count != result_df.shape[0]): # raise GPUdbException(f"Incorrect record count: expected={sql_iter.total_count} retrieved={result_df.shape[0]}") return result_df
TYPE_GPUDB_TO_NUMPY = { _COL_TYPE.LONG: 'Int64', _COL_TYPE.INT: 'Int64', GPUdbColumnProperty.INT16: 'Int16', GPUdbColumnProperty.INT8: 'Int8', _COL_TYPE.DOUBLE: 'Float64', _COL_TYPE.FLOAT: 'Float32', GPUdbColumnProperty.DATETIME: 'string' } @classmethod @typechecked def _convert_records_to_df(cls, records: list, type_map: dict) -> pd.DataFrame: """Create a Pandas :class:`pd.Dataframe` from a list of records Args: records (list): the list of records type_map (dict): the column type mapping Raises: GPUdbException If the SQL query or datatype conversion failed Returns: pd.Dataframe: a Pandas :class:`pd.Dataframe` """ col_major_recs = zip(*records) data_list = [] index = pd.RangeIndex(0, len(records)) # convert each column individually to avoid un-necessary conversions for col_name, raw_data in zip(type_map.keys(), col_major_recs): try: gpudb_type = type_map[col_name] numpy_type = cls.TYPE_GPUDB_TO_NUMPY.get(gpudb_type) col_data = pd.Series(data=raw_data, name=col_name, dtype=numpy_type, index=index) # do special conversion if (gpudb_type == cls._COL_TYPE.BYTES): col_data = col_data.map(cls.bytes_to_vec) elif (gpudb_type == GPUdbColumnProperty.TIMESTAMP): col_data = pd.to_datetime(col_data, unit='ms') elif (gpudb_type == GPUdbColumnProperty.DATETIME): col_data = pd.to_datetime(col_data, format='%Y-%m-%d %H:%M:%S.%f') except Exception as ex: msg = "Error converting column <{}> with data type <{}/{}>: {}".format(col_name, gpudb_type, numpy_type, ex) raise GPUdbException(msg) from ex data_list.append(col_data) return pd.concat(data_list, axis=1)
[docs] @classmethod @typechecked def table_to_df(cls, db: GPUdb, table_name: str, batch_size: int = BATCH_SIZE, show_progress: bool = False) -> pd.DataFrame: """Convert a Kinetica table into a :class:`pd.Dataframe` and load data into it. Args: db (GPUdb): a :class:`GPUdb` instance table_name (str): name of the Kinetica table batch_size (int): the batch size for the SQL execution results. Defaults to :attr:`BATCH_SIZE`. show_progress (bool): whether to display progress or not. Defaults to False. Returns: pd.Dataframe: Returns a Pandas :class:`pd.Dataframe` created from the Kinetica table """ sql = "SELECT * FROM {}".format(table_name) return cls.sql_to_df(db=db, sql=sql, batch_size=batch_size, show_progress=show_progress)
[docs] @classmethod @typechecked def table_type_as_df(cls, gpudb_table: GPUdbTable) -> pd.DataFrame: """Convert the type schema (column list) of a :class:`GPUdbTable` into a :class:`pd.Dataframe`. Args: gpudb_table (GPUdbTable): a :class:`GPUdbTable` instance Returns: pd.DataFrame: a Pandas :class:`pd.Dataframe` created by analyzing the table column types """ table_type = gpudb_table.get_table_type() col_list = [] for col in table_type.columns: col_type = [col.name, col.column_type, col.column_properties] col_list.append(col_type) return pd.DataFrame(col_list, columns=['name', 'type', 'properties'])
[docs] @classmethod @typechecked def df_to_table(cls, df: pd.DataFrame, db: GPUdb, table_name: str, column_types: dict = {}, clear_table: bool = False, create_table: bool = True, load_data: bool = True, show_progress: bool = False, batch_size: int = BATCH_SIZE, **kwargs) -> GPUdbTable: """ Load a :class:`pd.Dataframe` into a table; optionally dropping any existing table, creating it if it doesn't exist, and loading data into it; and then returning a :class:`GPUdbTable` reference to the table. Args: df (pd.DataFrame) The Pandas :class:`pd.Dataframe` to load into a table db (GPUdb) :class:`GPUdb` instance table_name (str) Name of the target Kinetica table for the :class:`pd.Dataframe` loading column_types (dict) Optional Kinetica column properties to apply to the column type definitions inferred from the :class:`pd.Dataframe`; map of column name to a list of column properties for that column, excluding the inferred base type. Defaults to empty map. For example:: { "middle_name": [ 'char64', 'nullable' ], "state": [ 'char2', 'dict' ] } clear_table (bool) Whether to drop an existing table of the same name or not before creating this one. Defaults to False. create_table (bool) Whether to create the table if it doesn't exist or not. Defaults to True. load_data (bool) Whether to load data into the target table or not. Defaults to True. show_progress (bool) Whether to show progress of the operation on the console. Defaults to False. batch_size (int) The number of records at a time to load into the target table. Defaults to :attr:`BATCH_SIZE`. Raises: GPUdbException If the :class:`pd.Dataframe` is empty, the table doesn't exist and :attr:`create_table` is False, or the data ingest fails Returns: GPUdbTable: a :class:`GPUdbTable` instance created from the :class:`pd.Dataframe` passed in """ if(df.empty): raise GPUdbException(f"Dataframe cannot be empty.") has_table_resp = db.has_table(table_name) GPUdb._check_error(has_table_resp) if (not create_table and not has_table_resp["table_exists"]): raise GPUdbException(f"{table_name}) Table does not exist and create_table=false") if (clear_table): cls._LOG.debug(f"Clearing table: {table_name}") clear_resp = db.clear_table(table_name=table_name, options={'no_error_if_not_exists': 'true'}) GPUdb._check_error(clear_resp) cls._LOG.debug(f"Creating table: {table_name}") col_types = cls._table_types_from_df(df, column_types) gpudb_table = GPUdbTable(_type=col_types, name=table_name, db=db, **kwargs) if (load_data): cls.df_insert_into_table(df=df, gpudb_table=gpudb_table, show_progress=show_progress, batch_size=batch_size) return gpudb_table
[docs] @classmethod @typechecked def df_insert_into_table(cls, df: pd.DataFrame, gpudb_table: GPUdbTable, batch_size: int = BATCH_SIZE, show_progress: bool = False) -> int: """Load a Pandas :class:`pd.Dataframe` into a :class:`GPUdbTable`. Args: df (pd.Dataframe): a Pandas :class:`pd.Dataframe` gpudb_table (GPUdbTable): a :class:`GPUdbTable` instance batch_size (int): a batch size to use for loading data into the table. Defaults to :attr:`BATCH_SIZE`. show_progress (bool): whether to show progress of the operation. Defaults to False. Raises: GPUdbException If the data ingest fails Returns: int: the number of rows of the :class:`pd.Dataframe` actually inserted into the Kinetica table """ total_rows = df.shape[0] rows_before = gpudb_table.size() converted_df = cls._table_convert_df_for_insert(df, gpudb_table=gpudb_table) cls._LOG.debug(f"Inserting rows into <{gpudb_table.table_name}>") with tqdm(total=total_rows, desc='Inserting Records', ncols=cls.TQDM_NCOLS, disable=(not show_progress)) as progress_bar: for _offset in range(0, total_rows, batch_size): end = min(total_rows, _offset + batch_size) slice = converted_df.iloc[_offset:end] # Convert to records so we can preserve the column dtypes insert_records = slice.to_records(index=False, column_dtypes=None) # Call item() so the types are converted to python native types insert_rows = [list(x.item()) for x in insert_records] gpudb_table.insert_records(insert_rows) progress_bar.update(len(insert_rows)) rows_inserted = gpudb_table.size() - rows_before cls._LOG.debug(f"Rows inserted: {rows_inserted}") return rows_inserted
@classmethod @typechecked def _table_convert_df_for_insert(cls, df: pd.DataFrame, gpudb_table: GPUdbTable) -> pd.DataFrame: """ Convert :class:`pd.Dataframe` for insert into Kinetica table. """ data_list = [] col_properties = gpudb_table.get_table_type().column_properties cls._LOG.debug(f"col properties: {col_properties}") for col_name, col_data in df.items(): ref_val = col_data.loc[col_data.first_valid_index()] if isinstance(ref_val, pd.Timestamp): col_type = col_properties[col_name][0] if (col_type == GPUdbColumnProperty.TIMESTAMP): col_data = col_data.astype(np.int64) // int(1e6) elif (col_type == GPUdbColumnProperty.DATETIME): col_data = col_data.astype(np.str) else: raise GPUdbException(f"Can't convert {col_name} to {col_type}") elif isinstance(ref_val, list) or isinstance(ref_val, np.ndarray): col_data = col_data.map(cls.vec_to_bytes) data_list.append(col_data) return pd.concat(data_list, axis=1) TYPE_NUMPY_TO_GPUDB = { 'int64': [_COL_TYPE.LONG], 'int32': [_COL_TYPE.INT], 'int16': [_COL_TYPE.INT, GPUdbColumnProperty.INT16], 'int8': [_COL_TYPE.INT, GPUdbColumnProperty.INT8], 'float64': [_COL_TYPE.DOUBLE], 'float32': [_COL_TYPE.FLOAT], 'datetime64[ns]': [_COL_TYPE.LONG, GPUdbColumnProperty.TIMESTAMP], 'uint64': [_COL_TYPE.STRING, GPUdbColumnProperty.ULONG], 'bool': [_COL_TYPE.INT, GPUdbColumnProperty.BOOLEAN] } @classmethod @typechecked def _table_types_from_df(cls, df: pd.DataFrame, col_type_override: dict) -> List: """ Create GPUdb column types from a :class:`pd.Dataframe`. """ type_list = [] # create a copy because we will be modifying this. col_type_override = col_type_override.copy() for col_name, col_data in df.items(): np_type = col_data.dtype.name col_type = cls.TYPE_NUMPY_TO_GPUDB.get(np_type) if col_type is not None: col_type_base = col_type[0] col_type_attr = col_type[1:] else: # need to inspect the type directly ref_val = col_data.loc[col_data.first_valid_index()] if isinstance(ref_val, str): col_type_base = cls._COL_TYPE.STRING max_len = col_data.map(lambda x: len(x) if x is not None else 0).max() max_len = max(max_len,2) spow = 2 ** ceil(log2(max_len)) if(spow <= 256): col_type_attr = [f'char{spow}'] elif isinstance(ref_val, list) or isinstance(ref_val, np.ndarray): vec_dim = len(ref_val) col_type_base = cls._COL_TYPE.BYTES col_type_attr = [f'vector({vec_dim})'] else: raise GPUdbException(f"{col_name}: Type not supported: {type(ref_val)}") # only add nullable if the type is string or vector has_null = col_data.isnull().any() if has_null: col_type_attr.append(GPUdbColumnProperty.NULLABLE) # replace the column attributes, if provided col_attr_override = col_type_override.pop(col_name, None) if (col_attr_override is not None): if isinstance(col_attr_override, str): col_type_attr = [prop.strip() for prop in col_attr_override.split(',')] elif isinstance(col_attr_override, list): col_type_attr = col_attr_override else: raise GPUdbException(f"{col_attr_override}: Type properties not supported: {type(col_attr_override)}") type_def = [col_name, col_type_base] + col_type_attr type_list.append(type_def) if(len(col_type_override) > 0): raise GPUdbException(f"Column type map has unknown columns: {list(col_type_override.keys())}") return type_list
# end class DataFrameUtils