2 using System.Collections.Generic;
5 using System.Threading.Tasks;
6 using Microsoft.Extensions.FileSystemGlobbing;
7 using Microsoft.Extensions.FileSystemGlobbing.Abstractions;
9 namespace kinetica.FileSystem;
55 private readonly
Options _options;
72 _db = db ??
throw new ArgumentNullException(nameof(db));
73 _options = options ??
new Options();
81 #region Upload Methods 88 public void Upload(
string fileName,
string remoteDirName)
102 Upload(
new List<string> { fileName }, remoteDirName, uploadOptions, callback);
110 public void Upload(IList<string> fileNames,
string remoteDirName)
124 if (fileNames ==
null || fileNames.Count == 0)
125 throw new KineticaException(
"List of local files to upload cannot be null or empty");
127 if (
string.IsNullOrEmpty(remoteDirName))
131 throw new KineticaException($
"Remote directory '{remoteDirName}' does not exist");
136 var resolvedFiles =
new List<string>();
137 foreach (var fileName
in fileNames)
139 if (
string.IsNullOrEmpty(fileName))
142 var files = ResolveLocalFiles(fileName, uploadOptions.Recursive);
143 if (files.Count == 0)
146 resolvedFiles.AddRange(files);
150 var fullFiles =
new List<(string localPath, string remotePath)>();
151 var multiPartFiles =
new List<(string localPath, string remotePath)>();
153 foreach (var localPath
in resolvedFiles)
155 var fileInfo =
new FileInfo(localPath);
156 var remotePath = BuildRemotePath(remoteDirName, fileInfo.Name);
158 if (fileInfo.Length <= _options.FileSizeToSplit)
160 fullFiles.Add((localPath, remotePath));
164 multiPartFiles.Add((localPath, remotePath));
169 if (fullFiles.Count > 0)
171 UploadFullFiles(fullFiles, uploadOptions, callback);
175 foreach (var (localPath, remotePath) in multiPartFiles)
177 UploadMultiPartFile(localPath, remotePath, uploadOptions, callback);
190 await Task.Run(() =>
Upload(fileNames, remoteDirName, uploadOptions, callback));
196 var batch =
new List<string>();
197 var batchData =
new List<byte[]>();
199 foreach (var (localPath, remotePath) in files)
203 var data = File.ReadAllBytes(localPath);
204 batch.Add(remotePath);
208 if (batch.Count >= _options.MaxFilesPerBatch)
210 ExecuteUploadBatch(batch, batchData, options, callback);
215 catch (IOException ex)
217 throw new KineticaException($
"Error reading file '{localPath}': {ex.Message}", ex);
224 ExecuteUploadBatch(batch, batchData, options, callback);
230 var requestOptions =
new Dictionary<string, string>();
234 requestOptions[
"ttl"] = options.
Ttl.ToString();
243 _db.uploadFiles(request);
246 if (callback !=
null)
250 FileNames = fileNames,
261 var fileInfo =
new FileInfo(localPath);
262 var uuid = Guid.NewGuid().ToString();
263 var partSize = _options.FileSizeToSplit;
264 var totalParts = (int)Math.Ceiling((
double)fileInfo.Length / partSize);
265 var results =
new List<FileOperationResult>();
270 var initOptions =
new Dictionary<string, string>
282 new List<string> { remotePath },
285 _db.uploadFiles(initRequest);
288 using var stream = File.OpenRead(localPath);
289 var buffer =
new byte[partSize];
291 for (
int partNumber = 1; partNumber <= totalParts; partNumber++)
293 var bytesRead = stream.Read(buffer, 0, buffer.Length);
294 var partData =
new byte[bytesRead];
295 Array.Copy(buffer, partData, bytesRead);
297 var partOptions =
new Dictionary<string, string>
305 new List<string> { remotePath },
306 new List<byte[]> { partData },
308 _db.uploadFiles(partRequest);
313 FileName = remotePath,
320 PartNumber = partNumber,
321 TotalParts = totalParts,
325 results.Add(partResult);
330 var completeOptions =
new Dictionary<string, string>
337 new List<string> { remotePath },
340 _db.uploadFiles(completeRequest);
349 var cancelOptions =
new Dictionary<string, string>
356 new List<string> { remotePath },
359 _db.uploadFiles(cancelRequest);
366 throw new KineticaException($
"Failed to upload file '{localPath}': {ex.Message}", ex);
372 #region Download Methods 379 public void Download(
string fileName,
string localDirName)
393 Download(
new List<string> { fileName }, localDirName, downloadOptions, callback);
401 public void Download(IList<string> fileNames,
string localDirName)
415 if (fileNames ==
null || fileNames.Count == 0)
418 if (
string.IsNullOrEmpty(localDirName))
421 if (!Directory.Exists(localDirName))
428 var response = _db.downloadFiles(request);
431 var downloadedFiles =
new List<string>();
432 for (
int i = 0; i < response.file_names.Count; i++)
434 var remoteName = response.file_names[i];
435 var localFileName = Path.GetFileName(remoteName);
436 var localPath = Path.Combine(localDirName, localFileName);
440 throw new KineticaException($
"File '{localPath}' already exists and OverwriteExisting is false");
443 File.WriteAllBytes(localPath, response.file_data[i]);
444 downloadedFiles.Add(localPath);
459 if (!Directory.Exists(localDirName))
462 var showFilesResponse = _db.showFiles(
new List<string> { remoteDirName },
new Dictionary<string, string>());
464 if (showFilesResponse.file_names.Count > 0)
466 Download(showFilesResponse.file_names, localDirName, downloadOptions, callback);
479 await Task.Run(() =>
Download(fileNames, localDirName, downloadOptions, callback));
484 #region Directory Operations 502 var options =
new Dictionary<string, string>
509 _db.createDirectory(remoteDirName, options);
527 public void DeleteDirectory(
string remoteDirName,
bool recursive,
bool noErrorIfNotExists)
529 var options =
new Dictionary<string, string>
539 _db.deleteDirectory(remoteDirName, options);
549 if (remoteDirNames ==
null || remoteDirNames.Count == 0)
550 throw new KineticaException(
"List of KiFS directory names cannot be null or empty");
552 var result =
new List<KifsDirectoryInfo>();
554 foreach (var dirName
in remoteDirNames)
556 if (
string.IsNullOrEmpty(dirName?.Trim()))
559 var response = _db.showDirectories(dirName,
new Dictionary<string, string>());
561 for (
int i = 0; i < response.directories.Count; i++)
565 KifsPath = response.directories[i],
566 CreatedBy = response.users[i],
567 Permission = response.permissions[i],
568 CreationTime = response.creation_times[i]
583 if (
string.IsNullOrEmpty(remoteDirName))
586 var response = _db.showDirectories(remoteDirName,
new Dictionary<string, string>());
587 var result =
new List<KifsDirectoryInfo>();
589 for (
int i = 0; i < response.directories.Count; i++)
593 KifsPath = response.directories[i],
594 CreatedBy = response.users[i],
595 Permission = response.permissions[i],
596 CreationTime = response.creation_times[i]
609 var response = _db.showDirectories(
"",
new Dictionary<string, string>());
610 var result =
new List<KifsDirectoryInfo>();
612 for (
int i = 0; i < response.directories.Count; i++)
616 KifsPath = response.directories[i],
617 CreatedBy = response.users[i],
618 Permission = response.permissions[i],
619 CreationTime = response.creation_times[i]
628 #region File Operations 644 public void DeleteFiles(IList<string> fileNames,
bool noErrorIfNotExists)
646 var options =
new Dictionary<string, string>
653 _db.deleteFiles(fileNames, options);
672 var showFilesResponse = _db.showFiles(
new List<string> { remoteDirName },
new Dictionary<string, string>());
673 DeleteFiles(showFilesResponse.file_names, noErrorIfNotExists);
681 public IList<KifsFileInfo>
ShowFiles(IList<string> remotePaths)
683 if (remotePaths ==
null || remotePaths.Count == 0)
686 var response = _db.showFiles(remotePaths,
new Dictionary<string, string>());
687 var result =
new List<KifsFileInfo>();
689 for (
int i = 0; i < response.file_names.Count; i++)
693 FileName = response.file_names[i],
694 FileSize = response.sizes[i],
695 CreatedBy = response.users[i],
696 CreationTime = response.creation_times[i],
706 #region Existence Checks 715 if (
string.IsNullOrEmpty(dirName))
720 var response = _db.showDirectories(dirName,
new Dictionary<string, string>());
721 return response.directories.Contains(dirName);
736 if (dirNames ==
null || dirNames.Count == 0)
741 var response = _db.showDirectories(
"",
new Dictionary<string, string>());
742 var existingDirs =
new HashSet<string>(response.directories);
743 return dirNames.All(d => existingDirs.Contains(d));
758 if (
string.IsNullOrEmpty(fileName))
763 var response = _db.showFiles(
new List<string> { fileName },
new Dictionary<string, string>());
764 return response.file_names.Contains(fileName);
774 #region Helper Methods 776 private IList<string> ResolveLocalFiles(
string pattern,
bool recursive)
779 if (!ContainsGlobCharacters(pattern))
781 if (File.Exists(pattern))
783 return new List<string> { Path.GetFullPath(pattern) };
785 return new List<string>();
789 var matcher =
new Matcher();
792 var (baseDir, globPattern) = SplitPathAndPattern(pattern);
796 matcher.AddInclude(
"**/" + globPattern);
800 matcher.AddInclude(globPattern);
803 var directoryInfo =
new DirectoryInfo(baseDir);
804 if (!directoryInfo.Exists)
806 return new List<string>();
809 var result = matcher.Execute(
new DirectoryInfoWrapper(directoryInfo));
810 return result.Files.Select(f => Path.GetFullPath(Path.Combine(baseDir, f.Path))).ToList();
813 private static bool ContainsGlobCharacters(
string path)
815 return path.Contains(
'*') || path.Contains(
'?') || path.Contains(
'[');
818 private static (
string baseDir,
string pattern) SplitPathAndPattern(
string path)
820 var normalizedPath = path.Replace(
'\\',
'/');
821 var lastSeparatorBeforeGlob = -1;
823 for (
int i = 0; i < normalizedPath.Length; i++)
825 char c = normalizedPath[i];
826 if (c ==
'*' || c ==
'?' || c ==
'[')
829 lastSeparatorBeforeGlob = i;
832 if (lastSeparatorBeforeGlob < 0)
834 return (Directory.GetCurrentDirectory(), normalizedPath);
837 return (normalizedPath.Substring(0, lastSeparatorBeforeGlob), normalizedPath.Substring(lastSeparatorBeforeGlob + 1));
840 private static string BuildRemotePath(
string remoteDirName,
string fileName)
844 return remoteDirName + fileName;
851 #region Options Class 903 throw new KineticaException($
"FileSizeToSplit must be between 1 and {DefaultFileSizeToSplit}");
915 if (size <= 0 || size > Environment.ProcessorCount)
916 throw new KineticaException($
"ThreadPoolSize must be between 1 and {Environment.ProcessorCount}");
int ThreadPoolSize
Gets or sets the thread pool size for concurrent file operations.
OperationMode
Indicates the type of file operation.
const string NO_ERROR_IF_NOT_EXISTS
If TRUE, no error is returned if a specified file does not exist.
const string NO_ERROR_IF_EXISTS
If TRUE, does not return an error if the directory already exists.
void Upload(IList< string > fileNames, string remoteDirName)
Uploads multiple files to a KiFS directory using default options.
const long DefaultFileSizeToSplit
Default file size threshold for multi-part uploads (60 MB).
Options SetThreadPoolSize(int size)
Sets the thread pool size for concurrent file operations.
static DownloadOptions Default
Returns the default download options.
bool KifsFileExists(string fileName)
Checks whether a KiFS file exists.
bool KifsDirectoriesExist(ISet< string > dirNames)
Checks whether multiple KiFS directories exist.
Interface for receiving callbacks during file upload operations.
Main class for handling file operations with Kinetica's KiFS (Kinetica File System).
void DeleteFilesInDirectory(string remoteDirName, bool noErrorIfNotExists)
Deletes all files in a KiFS directory.
void DeleteFilesInDirectory(string remoteDirName)
Deletes all files in a KiFS directory.
KineticaFileHandler(Kinetica db, Options options)
Constructs a KineticaFileHandler with the specified Kinetica connection and options.
bool DeleteIfExists
Gets or sets whether to delete existing files before uploading.
void Download(IList< string > fileNames, string localDirName)
Downloads multiple files from KiFS to a local directory using default options.
const string RECURSIVE
If TRUE, will delete directory and all files residing in it.
void Upload(IList< string > fileNames, string remoteDirName, UploadOptions? uploadOptions, IFileUploadListener? callback)
Uploads multiple files to a KiFS directory.
bool OverwriteExisting
Gets or sets whether to overwrite existing files on the local file system.
const int DefaultThreadPoolSize
Default thread pool size for file operations.
void OnFullFileUpload(FileOperationResult result)
Called when a full file upload (non-multi-part) has been completed.
const string CANCEL
Cancel the specified multipart file upload.
Options for uploading files to KiFS.
const string COMPLETE
Complete the specified multipart file upload.
void Upload(string fileName, string remoteDirName, UploadOptions? uploadOptions, IFileUploadListener? callback)
Uploads a single file to a KiFS directory.
A set of string constants for the parameter options.
const string UPLOAD_PART
Uploads a part of the specified multipart file upload.
const string DELETE_IF_EXISTS
If TRUE, any existing files specified in file_names will be deleted prior to start of upload.
A set of parameters for Kinetica.deleteFiles.
Options(Options other)
Creates a copy of the specified options.
void CreateDirectory(string remoteDirName, bool noErrorIfExists)
Creates a KiFS directory.
void OnFullFileDownload(IList< string > fileNames)
Called when one or more full file downloads (non-multi-part) have been completed.
void DeleteFiles(IList< string > fileNames)
Deletes files from KiFS, suppressing error if files don't exist.
const string MULTIPART_UPLOAD_PART_NUMBER
Incremental part number for each part in a multipart upload.
void DeleteDirectory(string remoteDirName, bool recursive, bool noErrorIfNotExists)
Deletes a KiFS directory.
IList< KifsDirectoryInfo > ShowAllDirectories()
Returns statistics about all KiFS directories.
void Download(string fileName, string localDirName, DownloadOptions? downloadOptions, IFileDownloadListener? callback)
Downloads a single file from KiFS to a local directory.
async Task UploadAsync(IList< string > fileNames, string remoteDirName, UploadOptions? uploadOptions=null, IFileUploadListener? callback=null)
Uploads files asynchronously to a KiFS directory.
const string KifsPathPrefix
Prefix to use when referencing KiFS files (e.g., for file ingest).
const string MULTIPART_OPERATION
Multipart upload operation to perform.
void Download(IList< string > fileNames, string localDirName, DownloadOptions? downloadOptions, IFileDownloadListener? callback)
Downloads multiple files from KiFS to a local directory.
const string KifsPathSeparator
Separator character between a KiFS directory and file name.
bool KifsDirectoryExists(string dirName)
Checks whether a KiFS directory exists.
Options FileHandlerOptions
Gets the options for this file handler.
void Upload(string fileName, string remoteDirName)
Uploads a single file to a KiFS directory using default options.
Options for configuring the KineticaFileHandler behavior.
Options SetFileSizeToSplit(long size)
Sets the file size threshold for multi-part uploads.
Options()
Creates a new Options instance with default values.
KineticaFileHandler(Kinetica db)
Constructs a KineticaFileHandler with the specified Kinetica connection.
void DeleteFiles(IList< string > fileNames, bool noErrorIfNotExists)
Deletes files from KiFS.
void Download(string fileName, string localDirName)
Downloads a single file from KiFS to a local directory using default options.
int MaxFilesPerBatch
Gets or sets the maximum number of files to upload in a single batch.
A set of string constants for the parameter options.
Interface for receiving callbacks during file download operations.
void OnMultiPartUploadComplete(IList< FileOperationResult > results)
Called when all parts of a multi-part upload have been completed.
const string NO_ERROR_IF_NOT_EXISTS
If TRUE, no error is returned if specified directory does not exist.
Options for downloading files from KiFS.
static UploadOptions Default
Returns the default upload options.
void DeleteDirectory(string remoteDirName)
Deletes a KiFS directory and all files under it, suppressing error if it doesn't exist.
A set of parameters for Kinetica.uploadFiles.
Contains information about a file stored in KiFS.
void CreateDirectory(string remoteDirName)
Creates a KiFS directory, suppressing error if it already exists.
Contains information about a multi-part upload operation.
void DownloadDirectory(string remoteDirName, string localDirName, DownloadOptions? downloadOptions=null, IFileDownloadListener? callback=null)
Downloads all files in a KiFS directory to a local directory.
async Task DownloadAsync(IList< string > fileNames, string localDirName, DownloadOptions? downloadOptions=null, IFileDownloadListener? callback=null)
Downloads files asynchronously from KiFS to a local directory.
A set of parameters for Kinetica.deleteDirectory.
A set of string constants for the parameter options.
IList< KifsDirectoryInfo > ShowDirectories(IList< string > remoteDirNames)
Returns statistics about the given KiFS directories.
const string RemoteUserHomeDirPrefix
Alias for a user's home directory.
A set of parameters for Kinetica.createDirectory.
const string MULTIPART_UPLOAD_UUID
UUID to uniquely identify a multipart upload.
int Ttl
Gets or sets the time-to-live (TTL) value for uploaded files in minutes.
IList< KifsFileInfo > ShowFiles(IList< string > remotePaths)
Returns statistics about the given KiFS files.
long FileSizeToSplit
Gets or sets the file size threshold for multi-part uploads in bytes.
Contains the result of a file upload or download operation.
IList< KifsDirectoryInfo > ShowDirectory(string remoteDirName)
Returns statistics about a single KiFS directory.
Contains information about a directory in KiFS.
void OnPartUpload(FileOperationResult result)
Called when a single part of a multi-part upload has been completed.
const string INIT
Initialize a multipart file upload.
A set of parameters for Kinetica.downloadFiles.
A set of string constants for the parameter options.