import os
from pathlib import Path
import uuid
from enum import Enum
from typing import Tuple
import gpudb
from gpudb import GPUdb
FILE_SIZE_THRESHOLD = 62914560 # 60 MB
KIFS_PATH_SEPARATOR = "/"
class MultipartOperation(Enum):
"""Enum for MultipartOperation stage indicators
"""
NONE = "none"
INIT = "init"
UPLOAD_PART="upload_part"
COMPLETE="complete"
CANCEL="cancel"
class OpMode(Enum):
"""Enum indicating whether the operation is an upload or download
"""
UPLOAD = "upload"
DOWNLOAD = "download"
class KifsFileInfo(object):
"""KifsFileInfo - the class for storing/accessing the KIFS
file name and size.
"""
def __init__(self, file_name: str, file_size: int):
self.file_name = file_name
self.file_size = file_size
@property
def file_name(self):
"""The file_name property."""
return self._file_name
@file_name.setter
def file_name(self, value):
self._file_name = value
@property
def file_size(self):
"""The file_size property."""
return self._file_size
@file_size.setter
def file_size(self, value):
self._file_size = value
[docs]
class GPUdbFileHandler(object):
"""This class exposes convenience methods to upload/download
files to/from KIFS from local/KIFS directory.
Methods
-------
1. upload_files - Upload a list of files - :meth:`upload_files`
2. upload_file - Upload a single file - :meth:`upload_file`
3. download_files - Download a list of files - :meth:`download_files`
4. download_file - Download a single file - :meth:`download_file`
Example
::
file_handler = GPUdbFileHandler.from_url_info(host = "http://127.0.0.1.2:9191", username="user", password="password")
file_handler.upload_file(file_name="/home/user/some_file_name", kifs_path="~anonymous")
file_handler.download_file(file_name="~anonymous/some_file_name", local_dir="/home/user/download")
"""
def __init__(self, db: GPUdb) -> None:
""" Initialize a :class:`GPUdbFileHandler`.
Args:
db (GPUdb): A :class:`GPUdb` instance
"""
self._db = db
@classmethod
def __from(cls, db: GPUdb):
""" Create an instance of :class:`GPUdbFileHandler` from a :class:`GPUdb` instance.
Args:
db (GPUdb): A :class:`GPUdb` instance
Returns:
GPUdbFileHandler: a :class:`GPUdbFileHandler` instance
"""
return cls(db)
[docs]
@classmethod
def from_url_info(cls, host: str = "http://127.0.0.1:9191", username: str = None, password: str = None):
""" Create a :class:`GPUdbFileHandler` instance from a host URL, user name, and password.
Args:
host (str, optional): A Kinetica host URL. Defaults to "http://127.0.0.1:9191".
username (str, optional): Kinetica user name. Defaults to None.
password (str, optional): Password for the user. Defaults to None.
Returns:
GPUdbFileHandler: a :class:`GPUdbFileHandler` instance
"""
options = GPUdb.Options()
options.username = username
options.password = password
db = GPUdb(host=host, options=options)
return cls.__from( db )
[docs]
@classmethod
def from_db_instance(cls, db: GPUdb):
""" Create a :class:`GPUdbFileHandler` from a :class:`GPUdb` instance.
Args:
db (GPUdb): a :class:`GPUdb` instance
Returns:
GPUdbFileHandler: a :class:`GPUdbFileHandler` instance
"""
return cls.__from(db)
def __is_multi_part(self, file_name: str, op_mode: OpMode) -> Tuple[bool, int]:
if op_mode == OpMode.UPLOAD:
size = self.__get_local_file_size(file_name)
return size > FILE_SIZE_THRESHOLD, size
else:
sf_resp = self._db.show_files([file_name])
size = sf_resp["sizes"][0]
return size > FILE_SIZE_THRESHOLD, size
def __upload_multi_part(self, file_name: str, kifs_path: str):
kifs_file_name = kifs_path + KIFS_PATH_SEPARATOR + Path(file_name).name
upload_id = self.__upload_multi_part_init(kifs_file_name)
buffer_size = FILE_SIZE_THRESHOLD
part_number = 1
with open(file_name, mode="rb") as f:
chunk: bytes = f.read(buffer_size)
if chunk:
self.__upload_multi_part_part(kifs_file_name, upload_id, part_number, chunk)
else:
self.__upload_multi_part_cancel(kifs_file_name, upload_id)
raise gpudb.GPUdbException("No data found")
while chunk:
chunk: bytes = f.read(buffer_size)
part_number += 1
self.__upload_multi_part_part(kifs_file_name, upload_id, part_number, chunk)
self.__upload_multi_part_complete(kifs_file_name, upload_id)
def __upload_multi_part_init(self, file_name: str, options: dict = {}) -> uuid.uuid4:
upload_id: uuid.uuid4 = uuid.uuid4()
options["multipart_upload_uuid"] = upload_id
options["multipart_operation"] = MultipartOperation.INIT.value
resp = self._db.upload_files([file_name], [], options)
status = resp["status_info"]["status"]
if status == "ERROR":
status_message = resp["status_info"]["message"]
self.__upload_multi_part_cancel(file_name, upload_id)
raise gpudb.GPUdbException(status_message)
return upload_id
def __upload_multi_part_part(self, file_name: str, id: uuid.uuid4, part_number: int, data: bytes, options: dict = {}) -> None:
options["multipart_upload_uuid"] = id
options["multipart_upload_part_number"] = part_number
options["multipart_operation"] = MultipartOperation.UPLOAD_PART.value
resp = self._db.upload_files([file_name], [data], options)
status = resp["status_info"]["status"]
if status == "ERROR":
status_message = resp["status_info"]["message"]
self.__upload_multi_part_cancel(file_name, id)
raise gpudb.GPUdbException(status_message)
def __upload_multi_part_complete(self, file_name: str, id: uuid.uuid4, options: dict = {}) -> None:
options["multipart_upload_uuid"] = id
options["multipart_operation"] = MultipartOperation.COMPLETE.value
resp = self._db.upload_files([file_name], [], options)
status = resp["status_info"]["status"]
if status == "ERROR":
status_message = resp["status_info"]["message"]
self.__upload_multi_part_cancel(file_name, id)
raise gpudb.GPUdbException(status_message)
def __upload_multi_part_cancel(self, file_name: str, id: uuid.uuid4, options: dict = {}) -> None:
options["multipart_upload_uuid"] = id
options["multipart_operation"] = MultipartOperation.CANCEL.value
self._db.upload_files([file_name], None, options)
def __upload_full(self, file_name: str, kifs_path: str) -> None:
kifs_file_name = kifs_path + KIFS_PATH_SEPARATOR + Path(file_name).name
with open(file_name, mode="rb") as f:
chunk: bytes = f.read(self.__get_local_file_size(file_name))
if chunk:
resp: dict = self._db.upload_files([kifs_file_name], [chunk])
status = resp["status_info"]["status"]
if status == "ERROR":
status_message = resp["status_info"]["message"]
raise gpudb.GPUdbException(status_message)
[docs]
def upload_file(self, file_name: str, kifs_path: str) -> None:
""" Upload a single file to a KIFS directory.
Args:
file_name (str): Full path to the local file to upload
kifs_path (str): A KIFS directory to upload (must be existing)
"""
if not self.__check_local_file(file_name):
raise gpudb.GPUdbException(f"${file_name} is not valid, cannot upload ...")
is_multi_part, _ = self.__is_multi_part(file_name, op_mode=OpMode.UPLOAD)
if is_multi_part:
self.__upload_multi_part(file_name, kifs_path)
else:
self.__upload_full(file_name, kifs_path)
[docs]
def upload_files(self, file_names: list, kifs_path: str) -> None:
""" Upload a list of files to a KIFS directory.
Args:
file_names (list): List of full local file paths
kifs_path (str): Name of an existent KIFS directory
"""
for file in file_names:
self.upload_file(file, kifs_path)
def __download_full(self, file_name: str, file_size: int, local_dir: str) -> None:
resp: dict = self._db.download_files([file_name], [], [], {})
local_file_name = local_dir + os.sep + file_name.split(KIFS_PATH_SEPARATOR)[-1]
with open(local_file_name, mode="bw") as f:
written = f.write(resp["file_data"][0])
if written != file_size:
raise gpudb.GPUdbException(f"Failed to write file ${file_name}")
def __download_multi_part(self, file_name: str, file_size: int, local_dir: str) -> None:
local_file_name: str = local_dir + os.sep + file_name.split(KIFS_PATH_SEPARATOR)[-1]
offset: int = 0
with open(local_file_name, mode="bw") as f:
while offset < file_size:
resp: dict = self._db.download_files([file_name], [offset], [FILE_SIZE_THRESHOLD])
if resp["file_data"]:
f.write(resp["file_data"][0])
offset += FILE_SIZE_THRESHOLD
pass
[docs]
def download_file(self, file_name: str, local_dir: str) -> None:
""" Download a single file to a local directory.
A large file greater than 60MB in size will be downloaded in parts.
Args:
file_name (str): Name of the file to download (full KIFS path)
local_dir (str): Name of the local directory to save the file in
Raises:
gpudb.GPUdbException: In case of an exception thrown by the server or in case the
local directory doesn't exist
"""
if not self.__check_local_dir(local_dir):
raise gpudb.GPUdbException("Local directory does not exist; cannot download ...")
is_multi_part, size = self.__is_multi_part(file_name, op_mode=OpMode.DOWNLOAD)
if is_multi_part:
self.__download_multi_part(file_name, size, local_dir)
else:
self.__download_full(file_name, size, local_dir)
[docs]
def download_files(self, file_names: list, local_dir: str) -> None:
""" Download a list of files from KIFS.
Args:
file_names (list): A list of file names (full KIFS paths)
local_dir (str): Name of the local directory to save the files in
Raises:
gpudb.GPUdbException: In case of an exception thrown by the server or in case the
local directory doesn't exist
"""
if not self.__check_local_dir(local_dir):
raise gpudb.GPUdbException("Local directory does not exist; cannot download ...")
for file in file_names:
self.download_file(file, local_dir)
def __check_local_dir(self, local_dir: str) -> bool:
""" Determine whether the given path is an existing local directory.
Args:
local_dir (str): Local directory path, whose existence on the file system will be
determined
Returns:
bool: Whether or not the given directory exists on the local file system
"""
return os.path.isdir(local_dir)
def __check_local_file(self, file_path: str) -> bool:
return os.path.isfile(file_path)
def __get_local_file_size(self, file_name: str) -> int:
return os.stat(file_name).st_size
def __get_kifs_file_info(self, file_name: str) -> KifsFileInfo:
resp = self._db.show_files([file_name])
kifs_file_info = KifsFileInfo()
kifs_file_info.file_name = resp["file_names"][0]
kifs_file_info.file_size = resp["sizes"][0]
return kifs_file_info