HDFS-10515: libhdfs++: Implement mkdirs, rmdir, rename, and remove
This commit is contained in:
parent
193314dc34
commit
88c5768f99
|
@ -170,6 +170,7 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs,
|
|||
if (numEntries != 0) {
|
||||
fprintf(stderr, "hdfsListDirectory set numEntries to "
|
||||
"%d on empty directory.", numEntries);
|
||||
return EIO;
|
||||
}
|
||||
|
||||
/* There should not be any file to open for reading. */
|
||||
|
@ -214,21 +215,26 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs,
|
|||
hdfsFreeFileInfo(dirList, numEntries);
|
||||
|
||||
/* Create many files for ListDirectory to page through */
|
||||
char listDirTest[PATH_MAX];
|
||||
strcpy(listDirTest, paths->prefix);
|
||||
strcat(listDirTest, "/for_list_test/");
|
||||
EXPECT_ZERO(hdfsCreateDirectory(fs, listDirTest));
|
||||
int nFile;
|
||||
for (nFile = 0; nFile < 10000; nFile++) {
|
||||
char filename[PATH_MAX];
|
||||
snprintf(filename, PATH_MAX, "%s/many_files_%d", paths->prefix, nFile);
|
||||
snprintf(filename, PATH_MAX, "%s/many_files_%d", listDirTest, nFile);
|
||||
file = hdfsOpenFile(fs, filename, O_WRONLY, 0, 0, 0);
|
||||
EXPECT_NONNULL(file);
|
||||
EXPECT_ZERO(hdfsCloseFile(fs, file));
|
||||
}
|
||||
dirList = hdfsListDirectory(fs, paths->prefix, &numEntries);
|
||||
dirList = hdfsListDirectory(fs, listDirTest, &numEntries);
|
||||
EXPECT_NONNULL(dirList);
|
||||
if (numEntries != 10002) {
|
||||
fprintf(stderr, "hdfsListDirectory set numEntries to "
|
||||
"%d on directory containing 10002 files.", numEntries);
|
||||
}
|
||||
hdfsFreeFileInfo(dirList, numEntries);
|
||||
if (numEntries != 10000) {
|
||||
fprintf(stderr, "hdfsListDirectory set numEntries to "
|
||||
"%d on directory containing 10000 files.", numEntries);
|
||||
return EIO;
|
||||
}
|
||||
|
||||
/* Let's re-open the file for reading */
|
||||
file = hdfsOpenFile(fs, paths->file1, O_RDONLY, 0, 0, 0);
|
||||
|
@ -272,8 +278,8 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs,
|
|||
EXPECT_ZERO(memcmp(paths->prefix, tmp, expected));
|
||||
EXPECT_ZERO(hdfsCloseFile(fs, file));
|
||||
|
||||
// TODO: Non-recursive delete should fail?
|
||||
//EXPECT_NONZERO(hdfsDelete(fs, prefix, 0));
|
||||
//Non-recursive delete fails
|
||||
EXPECT_NONZERO(hdfsDelete(fs, paths->prefix, 0));
|
||||
EXPECT_ZERO(hdfsCopy(fs, paths->file1, fs, paths->file2));
|
||||
|
||||
EXPECT_ZERO(hdfsChown(fs, paths->file2, NULL, NULL));
|
||||
|
|
|
@ -217,6 +217,39 @@ class FileSystem {
|
|||
virtual Status GetBlockLocations(const std::string & path,
|
||||
std::shared_ptr<FileBlockLocation> * locations) = 0;
|
||||
|
||||
/**
|
||||
* Creates a new directory
|
||||
*
|
||||
* @param path Path to the directory to be created (must be non-empty)
|
||||
* @param permissions Permissions for the new directory (negative value for the default permissions)
|
||||
* @param createparent Create parent directories if they do not exist (may not be empty)
|
||||
*/
|
||||
virtual void Mkdirs(const std::string & path, long permissions, bool createparent,
|
||||
std::function<void(const Status &)> handler) = 0;
|
||||
virtual Status Mkdirs(const std::string & path, long permissions, bool createparent) = 0;
|
||||
|
||||
/**
|
||||
* Delete the given file or directory from the file system.
|
||||
* <p>
|
||||
* same as delete but provides a way to avoid accidentally
|
||||
* deleting non empty directories programmatically.
|
||||
* @param path existing name (must be non-empty)
|
||||
* @param recursive if true deletes a non empty directory recursively
|
||||
*/
|
||||
virtual void Delete(const std::string &path, bool recursive,
|
||||
const std::function<void(const Status &)> &handler) = 0;
|
||||
virtual Status Delete(const std::string &path, bool recursive) = 0;
|
||||
|
||||
/**
|
||||
* Rename - Rename file.
|
||||
* @param oldPath The path of the source file. (must be non-empty)
|
||||
* @param newPath The path of the destination file. (must be non-empty)
|
||||
* @return Returns 0 on success, -1 on error.
|
||||
*/
|
||||
virtual void Rename(const std::string &oldPath, const std::string &newPath,
|
||||
const std::function<void(const Status &)> &handler) = 0;
|
||||
virtual Status Rename(const std::string &oldPath, const std::string &newPath) = 0;
|
||||
|
||||
/*****************************************************************************
|
||||
* FILE SYSTEM SNAPSHOT FUNCTIONS
|
||||
****************************************************************************/
|
||||
|
|
|
@ -59,6 +59,8 @@ class Status {
|
|||
kPermissionDenied = static_cast<unsigned>(std::errc::permission_denied),
|
||||
kPathNotFound = static_cast<unsigned>(std::errc::no_such_file_or_directory),
|
||||
kNotADirectory = static_cast<unsigned>(std::errc::not_a_directory),
|
||||
kFileAlreadyExists = static_cast<unsigned>(std::errc::file_exists),
|
||||
kPathIsNotEmptyDirectory = static_cast<unsigned>(std::errc::directory_not_empty),
|
||||
kException = 256,
|
||||
kAuthenticationFailed = 257,
|
||||
};
|
||||
|
|
|
@ -165,6 +165,14 @@ static int Error(const Status &stat) {
|
|||
errnum = ENOTDIR;
|
||||
default_message = "Not a directory";
|
||||
break;
|
||||
case Status::Code::kFileAlreadyExists:
|
||||
errnum = EEXIST;
|
||||
default_message = "File already exists";
|
||||
break;
|
||||
case Status::Code::kPathIsNotEmptyDirectory:
|
||||
errnum = ENOTEMPTY;
|
||||
default_message = "Directory is not empty";
|
||||
break;
|
||||
default:
|
||||
errnum = ENOSYS;
|
||||
default_message = "Error: unrecognised code";
|
||||
|
@ -509,6 +517,76 @@ void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
|
|||
delete[] hdfsFileInfo;
|
||||
}
|
||||
|
||||
int hdfsCreateDirectory(hdfsFS fs, const char* path) {
|
||||
try {
|
||||
errno = 0;
|
||||
if (!CheckSystem(fs)) {
|
||||
return -1;
|
||||
}
|
||||
if (!path) {
|
||||
return Error(Status::InvalidArgument("hdfsCreateDirectory: argument 'path' cannot be NULL"));
|
||||
}
|
||||
Status stat;
|
||||
//-1 for default permissions and true for creating all non-existant parent directories
|
||||
stat = fs->get_impl()->Mkdirs(path, -1, true);
|
||||
if (!stat.ok()) {
|
||||
return Error(stat);
|
||||
}
|
||||
return 0;
|
||||
} catch (const std::exception & e) {
|
||||
return ReportException(e);
|
||||
} catch (...) {
|
||||
return ReportCaughtNonException();
|
||||
}
|
||||
}
|
||||
|
||||
int hdfsDelete(hdfsFS fs, const char* path, int recursive) {
|
||||
try {
|
||||
errno = 0;
|
||||
if (!CheckSystem(fs)) {
|
||||
return -1;
|
||||
}
|
||||
if (!path) {
|
||||
return Error(Status::InvalidArgument("hdfsDelete: argument 'path' cannot be NULL"));
|
||||
}
|
||||
Status stat;
|
||||
stat = fs->get_impl()->Delete(path, recursive);
|
||||
if (!stat.ok()) {
|
||||
return Error(stat);
|
||||
}
|
||||
return 0;
|
||||
} catch (const std::exception & e) {
|
||||
return ReportException(e);
|
||||
} catch (...) {
|
||||
return ReportCaughtNonException();
|
||||
}
|
||||
}
|
||||
|
||||
int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) {
|
||||
try {
|
||||
errno = 0;
|
||||
if (!CheckSystem(fs)) {
|
||||
return -1;
|
||||
}
|
||||
if (!oldPath) {
|
||||
return Error(Status::InvalidArgument("hdfsRename: argument 'oldPath' cannot be NULL"));
|
||||
}
|
||||
if (!newPath) {
|
||||
return Error(Status::InvalidArgument("hdfsRename: argument 'newPath' cannot be NULL"));
|
||||
}
|
||||
Status stat;
|
||||
stat = fs->get_impl()->Rename(oldPath, newPath);
|
||||
if (!stat.ok()) {
|
||||
return Error(stat);
|
||||
}
|
||||
return 0;
|
||||
} catch (const std::exception & e) {
|
||||
return ReportException(e);
|
||||
} catch (...) {
|
||||
return ReportCaughtNonException();
|
||||
}
|
||||
}
|
||||
|
||||
int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) {
|
||||
try {
|
||||
errno = 0;
|
||||
|
@ -516,7 +594,7 @@ int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) {
|
|||
return -1;
|
||||
}
|
||||
if (!path) {
|
||||
return Error(Status::InvalidArgument("Argument 'path' cannot be NULL"));
|
||||
return Error(Status::InvalidArgument("hdfsCreateSnapshot: argument 'path' cannot be NULL"));
|
||||
}
|
||||
Status stat;
|
||||
if(!name){
|
||||
|
@ -542,10 +620,10 @@ int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const char* name) {
|
|||
return -1;
|
||||
}
|
||||
if (!path) {
|
||||
return Error(Status::InvalidArgument("Argument 'path' cannot be NULL"));
|
||||
return Error(Status::InvalidArgument("hdfsDeleteSnapshot: argument 'path' cannot be NULL"));
|
||||
}
|
||||
if (!name) {
|
||||
return Error(Status::InvalidArgument("Argument 'name' cannot be NULL"));
|
||||
return Error(Status::InvalidArgument("hdfsDeleteSnapshot: argument 'name' cannot be NULL"));
|
||||
}
|
||||
Status stat;
|
||||
stat = fs->get_impl()->DeleteSnapshot(path, name);
|
||||
|
@ -567,7 +645,7 @@ int hdfsAllowSnapshot(hdfsFS fs, const char* path) {
|
|||
return -1;
|
||||
}
|
||||
if (!path) {
|
||||
return Error(Status::InvalidArgument("Argument 'path' cannot be NULL"));
|
||||
return Error(Status::InvalidArgument("hdfsAllowSnapshot: argument 'path' cannot be NULL"));
|
||||
}
|
||||
Status stat;
|
||||
stat = fs->get_impl()->AllowSnapshot(path);
|
||||
|
@ -589,7 +667,7 @@ int hdfsDisallowSnapshot(hdfsFS fs, const char* path) {
|
|||
return -1;
|
||||
}
|
||||
if (!path) {
|
||||
return Error(Status::InvalidArgument("Argument 'path' cannot be NULL"));
|
||||
return Error(Status::InvalidArgument("hdfsDisallowSnapshot: argument 'path' cannot be NULL"));
|
||||
}
|
||||
Status stat;
|
||||
stat = fs->get_impl()->DisallowSnapshot(path);
|
||||
|
|
|
@ -30,6 +30,8 @@ const char * kPathNotFoundException = "org.apache.hadoop.fs.InvalidPathException
|
|||
const char * kPathNotFoundException2 = "java.io.FileNotFoundException";
|
||||
const char * kPathIsNotDirectoryException = "org.apache.hadoop.fs.PathIsNotDirectoryException";
|
||||
const char * kSnapshotException = "org.apache.hadoop.hdfs.protocol.SnapshotException";
|
||||
const char * kFileAlreadyExistsException = "org.apache.hadoop.fs.FileAlreadyExistsException";
|
||||
const char * kPathIsNotEmptyDirectoryException = "org.apache.hadoop.fs.PathIsNotEmptyDirectoryException";
|
||||
|
||||
Status::Status(int code, const char *msg1) : code_(code) {
|
||||
if(msg1) {
|
||||
|
@ -82,6 +84,10 @@ Status Status::Exception(const char *exception_class_name, const char *error_mes
|
|||
return Status(kNotADirectory, error_message);
|
||||
else if (exception_class_name && (strcmp(exception_class_name, kSnapshotException) == 0))
|
||||
return Status(kInvalidArgument, error_message);
|
||||
else if (exception_class_name && (strcmp(exception_class_name, kFileAlreadyExistsException) == 0))
|
||||
return Status(kFileAlreadyExists, error_message);
|
||||
else if (exception_class_name && (strcmp(exception_class_name, kPathIsNotEmptyDirectoryException) == 0))
|
||||
return Status(kPathIsNotEmptyDirectory, error_message);
|
||||
else
|
||||
return Status(kException, exception_class_name, error_message);
|
||||
}
|
||||
|
|
|
@ -362,9 +362,7 @@ void FileSystemImpl::GetFileInfo(
|
|||
<< FMT_THIS_ADDR << ", path="
|
||||
<< path << ") called");
|
||||
|
||||
nn_.GetFileInfo(path, [handler](const Status &stat, const StatInfo &stat_info) {
|
||||
handler(stat, stat_info);
|
||||
});
|
||||
nn_.GetFileInfo(path, handler);
|
||||
}
|
||||
|
||||
Status FileSystemImpl::GetFileInfo(const std::string &path,
|
||||
|
@ -401,9 +399,7 @@ void FileSystemImpl::GetFsStats(
|
|||
LOG_DEBUG(kFileSystem,
|
||||
<< "FileSystemImpl::GetFsStats(" << FMT_THIS_ADDR << ") called");
|
||||
|
||||
nn_.GetFsStats([handler](const Status &stat, const FsInfo &fs_info) {
|
||||
handler(stat, fs_info);
|
||||
});
|
||||
nn_.GetFsStats(handler);
|
||||
}
|
||||
|
||||
Status FileSystemImpl::GetFsStats(FsInfo & fs_info) {
|
||||
|
@ -512,6 +508,115 @@ Status FileSystemImpl::GetListing(const std::string &path, std::shared_ptr<std::
|
|||
return stat;
|
||||
}
|
||||
|
||||
void FileSystemImpl::Mkdirs(const std::string & path, long permissions, bool createparent,
|
||||
std::function<void(const Status &)> handler) {
|
||||
LOG_DEBUG(kFileSystem,
|
||||
<< "FileSystemImpl::Mkdirs(" << FMT_THIS_ADDR << ", path=" << path <<
|
||||
", permissions=" << permissions << ", createparent=" << createparent << ") called");
|
||||
|
||||
if (path.empty()) {
|
||||
handler(Status::InvalidArgument("Mkdirs: argument 'path' cannot be empty"));
|
||||
return;
|
||||
}
|
||||
|
||||
nn_.Mkdirs(path, permissions, createparent, handler);
|
||||
}
|
||||
|
||||
Status FileSystemImpl::Mkdirs(const std::string & path, long permissions, bool createparent) {
|
||||
LOG_DEBUG(kFileSystem,
|
||||
<< "FileSystemImpl::[sync]Mkdirs(" << FMT_THIS_ADDR << ", path=" << path <<
|
||||
", permissions=" << permissions << ", createparent=" << createparent << ") called");
|
||||
|
||||
auto callstate = std::make_shared<std::promise<Status>>();
|
||||
std::future<Status> future(callstate->get_future());
|
||||
|
||||
/* wrap async FileSystem::Mkdirs with promise to make it a blocking call */
|
||||
auto h = [callstate](const Status &s) {
|
||||
callstate->set_value(s);
|
||||
};
|
||||
|
||||
Mkdirs(path, permissions, createparent, h);
|
||||
|
||||
/* block until promise is set */
|
||||
auto returnstate = future.get();
|
||||
Status stat = returnstate;
|
||||
|
||||
return stat;
|
||||
}
|
||||
|
||||
void FileSystemImpl::Delete(const std::string &path, bool recursive,
|
||||
const std::function<void(const Status &)> &handler) {
|
||||
LOG_DEBUG(kFileSystem,
|
||||
<< "FileSystemImpl::Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called");
|
||||
|
||||
if (path.empty()) {
|
||||
handler(Status::InvalidArgument("Delete: argument 'path' cannot be empty"));
|
||||
return;
|
||||
}
|
||||
|
||||
nn_.Delete(path, recursive, handler);
|
||||
}
|
||||
|
||||
Status FileSystemImpl::Delete(const std::string &path, bool recursive) {
|
||||
LOG_DEBUG(kFileSystem,
|
||||
<< "FileSystemImpl::[sync]Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called");
|
||||
|
||||
auto callstate = std::make_shared<std::promise<Status>>();
|
||||
std::future<Status> future(callstate->get_future());
|
||||
|
||||
/* wrap async FileSystem::Delete with promise to make it a blocking call */
|
||||
auto h = [callstate](const Status &s) {
|
||||
callstate->set_value(s);
|
||||
};
|
||||
|
||||
Delete(path, recursive, h);
|
||||
|
||||
/* block until promise is set */
|
||||
auto returnstate = future.get();
|
||||
Status stat = returnstate;
|
||||
|
||||
return stat;
|
||||
}
|
||||
|
||||
void FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPath,
|
||||
const std::function<void(const Status &)> &handler) {
|
||||
LOG_DEBUG(kFileSystem,
|
||||
<< "FileSystemImpl::Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called");
|
||||
|
||||
if (oldPath.empty()) {
|
||||
handler(Status::InvalidArgument("Rename: argument 'oldPath' cannot be empty"));
|
||||
return;
|
||||
}
|
||||
|
||||
if (newPath.empty()) {
|
||||
handler(Status::InvalidArgument("Rename: argument 'newPath' cannot be empty"));
|
||||
return;
|
||||
}
|
||||
|
||||
nn_.Rename(oldPath, newPath, handler);
|
||||
}
|
||||
|
||||
Status FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPath) {
|
||||
LOG_DEBUG(kFileSystem,
|
||||
<< "FileSystemImpl::[sync]Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called");
|
||||
|
||||
auto callstate = std::make_shared<std::promise<Status>>();
|
||||
std::future<Status> future(callstate->get_future());
|
||||
|
||||
/* wrap async FileSystem::Rename with promise to make it a blocking call */
|
||||
auto h = [callstate](const Status &s) {
|
||||
callstate->set_value(s);
|
||||
};
|
||||
|
||||
Rename(oldPath, newPath, h);
|
||||
|
||||
/* block until promise is set */
|
||||
auto returnstate = future.get();
|
||||
Status stat = returnstate;
|
||||
|
||||
return stat;
|
||||
}
|
||||
|
||||
void FileSystemImpl::CreateSnapshot(const std::string &path,
|
||||
const std::string &name,
|
||||
const std::function<void(const Status &)> &handler) {
|
||||
|
@ -519,13 +624,11 @@ void FileSystemImpl::CreateSnapshot(const std::string &path,
|
|||
<< "FileSystemImpl::CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
|
||||
|
||||
if (path.empty()) {
|
||||
handler(Status::InvalidArgument("Argument 'path' cannot be empty"));
|
||||
handler(Status::InvalidArgument("CreateSnapshot: argument 'path' cannot be empty"));
|
||||
return;
|
||||
}
|
||||
|
||||
nn_.CreateSnapshot(path, name, [handler](const Status &stat) {
|
||||
handler(stat);
|
||||
});
|
||||
nn_.CreateSnapshot(path, name, handler);
|
||||
}
|
||||
|
||||
Status FileSystemImpl::CreateSnapshot(const std::string &path,
|
||||
|
@ -533,19 +636,19 @@ Status FileSystemImpl::CreateSnapshot(const std::string &path,
|
|||
LOG_DEBUG(kFileSystem,
|
||||
<< "FileSystemImpl::[sync]CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
|
||||
|
||||
auto callstate = std::make_shared<std::promise<std::tuple<Status>>>();
|
||||
std::future<std::tuple<Status>> future(callstate->get_future());
|
||||
auto callstate = std::make_shared<std::promise<Status>>();
|
||||
std::future<Status> future(callstate->get_future());
|
||||
|
||||
/* wrap async FileSystem::CreateSnapshot with promise to make it a blocking call */
|
||||
auto h = [callstate](const Status &s) {
|
||||
callstate->set_value(std::make_tuple(s));
|
||||
callstate->set_value(s);
|
||||
};
|
||||
|
||||
CreateSnapshot(path, name, h);
|
||||
|
||||
/* block until promise is set */
|
||||
auto returnstate = future.get();
|
||||
Status stat = std::get<0>(returnstate);
|
||||
Status stat = returnstate;
|
||||
|
||||
return stat;
|
||||
}
|
||||
|
@ -557,17 +660,15 @@ void FileSystemImpl::DeleteSnapshot(const std::string &path,
|
|||
<< "FileSystemImpl::DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
|
||||
|
||||
if (path.empty()) {
|
||||
handler(Status::InvalidArgument("Argument 'path' cannot be empty"));
|
||||
handler(Status::InvalidArgument("DeleteSnapshot: argument 'path' cannot be empty"));
|
||||
return;
|
||||
}
|
||||
if (name.empty()) {
|
||||
handler(Status::InvalidArgument("Argument 'name' cannot be empty"));
|
||||
handler(Status::InvalidArgument("DeleteSnapshot: argument 'name' cannot be empty"));
|
||||
return;
|
||||
}
|
||||
|
||||
nn_.DeleteSnapshot(path, name, [handler](const Status &stat) {
|
||||
handler(stat);
|
||||
});
|
||||
nn_.DeleteSnapshot(path, name, handler);
|
||||
}
|
||||
|
||||
Status FileSystemImpl::DeleteSnapshot(const std::string &path,
|
||||
|
@ -575,19 +676,19 @@ Status FileSystemImpl::DeleteSnapshot(const std::string &path,
|
|||
LOG_DEBUG(kFileSystem,
|
||||
<< "FileSystemImpl::[sync]DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
|
||||
|
||||
auto callstate = std::make_shared<std::promise<std::tuple<Status>>>();
|
||||
std::future<std::tuple<Status>> future(callstate->get_future());
|
||||
auto callstate = std::make_shared<std::promise<Status>>();
|
||||
std::future<Status> future(callstate->get_future());
|
||||
|
||||
/* wrap async FileSystem::DeleteSnapshot with promise to make it a blocking call */
|
||||
auto h = [callstate](const Status &s) {
|
||||
callstate->set_value(std::make_tuple(s));
|
||||
callstate->set_value(s);
|
||||
};
|
||||
|
||||
DeleteSnapshot(path, name, h);
|
||||
|
||||
/* block until promise is set */
|
||||
auto returnstate = future.get();
|
||||
Status stat = std::get<0>(returnstate);
|
||||
Status stat = returnstate;
|
||||
|
||||
return stat;
|
||||
}
|
||||
|
@ -598,32 +699,30 @@ void FileSystemImpl::AllowSnapshot(const std::string &path,
|
|||
<< "FileSystemImpl::AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
|
||||
|
||||
if (path.empty()) {
|
||||
handler(Status::InvalidArgument("Argument 'path' cannot be empty"));
|
||||
handler(Status::InvalidArgument("AllowSnapshot: argument 'path' cannot be empty"));
|
||||
return;
|
||||
}
|
||||
|
||||
nn_.AllowSnapshot(path, [handler](const Status &stat) {
|
||||
handler(stat);
|
||||
});
|
||||
nn_.AllowSnapshot(path, handler);
|
||||
}
|
||||
|
||||
Status FileSystemImpl::AllowSnapshot(const std::string &path) {
|
||||
LOG_DEBUG(kFileSystem,
|
||||
<< "FileSystemImpl::[sync]AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
|
||||
|
||||
auto callstate = std::make_shared<std::promise<std::tuple<Status>>>();
|
||||
std::future<std::tuple<Status>> future(callstate->get_future());
|
||||
auto callstate = std::make_shared<std::promise<Status>>();
|
||||
std::future<Status> future(callstate->get_future());
|
||||
|
||||
/* wrap async FileSystem::AllowSnapshot with promise to make it a blocking call */
|
||||
auto h = [callstate](const Status &s) {
|
||||
callstate->set_value(std::make_tuple(s));
|
||||
callstate->set_value(s);
|
||||
};
|
||||
|
||||
AllowSnapshot(path, h);
|
||||
|
||||
/* block until promise is set */
|
||||
auto returnstate = future.get();
|
||||
Status stat = std::get<0>(returnstate);
|
||||
Status stat = returnstate;
|
||||
|
||||
return stat;
|
||||
}
|
||||
|
@ -634,32 +733,30 @@ void FileSystemImpl::DisallowSnapshot(const std::string &path,
|
|||
<< "FileSystemImpl::DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
|
||||
|
||||
if (path.empty()) {
|
||||
handler(Status::InvalidArgument("Argument 'path' cannot be empty"));
|
||||
handler(Status::InvalidArgument("DisallowSnapshot: argument 'path' cannot be empty"));
|
||||
return;
|
||||
}
|
||||
|
||||
nn_.DisallowSnapshot(path, [handler](const Status &stat) {
|
||||
handler(stat);
|
||||
});
|
||||
nn_.DisallowSnapshot(path, handler);
|
||||
}
|
||||
|
||||
Status FileSystemImpl::DisallowSnapshot(const std::string &path) {
|
||||
LOG_DEBUG(kFileSystem,
|
||||
<< "FileSystemImpl::[sync]DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
|
||||
|
||||
auto callstate = std::make_shared<std::promise<std::tuple<Status>>>();
|
||||
std::future<std::tuple<Status>> future(callstate->get_future());
|
||||
auto callstate = std::make_shared<std::promise<Status>>();
|
||||
std::future<Status> future(callstate->get_future());
|
||||
|
||||
/* wrap async FileSystem::DisallowSnapshot with promise to make it a blocking call */
|
||||
auto h = [callstate](const Status &s) {
|
||||
callstate->set_value(std::make_tuple(s));
|
||||
callstate->set_value(s);
|
||||
};
|
||||
|
||||
DisallowSnapshot(path, h);
|
||||
|
||||
/* block until promise is set */
|
||||
auto returnstate = future.get();
|
||||
Status stat = std::get<0>(returnstate);
|
||||
Status stat = returnstate;
|
||||
|
||||
return stat;
|
||||
}
|
||||
|
|
|
@ -93,6 +93,17 @@ public:
|
|||
virtual Status GetBlockLocations(const std::string & path,
|
||||
std::shared_ptr<FileBlockLocation> * locations) override;
|
||||
|
||||
virtual void Mkdirs(const std::string & path, long permissions, bool createparent,
|
||||
std::function<void(const Status &)> handler) override;
|
||||
virtual Status Mkdirs(const std::string & path, long permissions, bool createparent) override;
|
||||
|
||||
virtual void Delete(const std::string &path, bool recursive,
|
||||
const std::function<void(const Status &)> &handler) override;
|
||||
virtual Status Delete(const std::string &path, bool recursive) override;
|
||||
|
||||
virtual void Rename(const std::string &oldPath, const std::string &newPath,
|
||||
const std::function<void(const Status &)> &handler) override;
|
||||
virtual Status Rename(const std::string &oldPath, const std::string &newPath) override;
|
||||
|
||||
/*****************************************************************************
|
||||
* FILE SYSTEM SNAPSHOT FUNCTIONS
|
||||
|
|
|
@ -64,6 +64,11 @@ void NameNodeOperations::GetBlockLocations(const std::string & path,
|
|||
LOG_TRACE(kFileSystem, << "NameNodeOperations::GetBlockLocations("
|
||||
<< FMT_THIS_ADDR << ", path=" << path << ", ...) called");
|
||||
|
||||
if (path.empty()) {
|
||||
handler(Status::InvalidArgument("GetBlockLocations: argument 'path' cannot be empty"), nullptr);
|
||||
return;
|
||||
}
|
||||
|
||||
GetBlockLocationsRequestProto req;
|
||||
req.set_src(path);
|
||||
req.set_offset(0);
|
||||
|
@ -106,6 +111,11 @@ void NameNodeOperations::GetFileInfo(const std::string & path,
|
|||
LOG_TRACE(kFileSystem, << "NameNodeOperations::GetFileInfo("
|
||||
<< FMT_THIS_ADDR << ", path=" << path << ") called");
|
||||
|
||||
if (path.empty()) {
|
||||
handler(Status::InvalidArgument("GetFileInfo: argument 'path' cannot be empty"), StatInfo());
|
||||
return;
|
||||
}
|
||||
|
||||
GetFileInfoRequestProto req;
|
||||
req.set_src(path);
|
||||
|
||||
|
@ -164,6 +174,12 @@ void NameNodeOperations::GetListing(
|
|||
kFileSystem,
|
||||
<< "NameNodeOperations::GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called");
|
||||
|
||||
if (path.empty()) {
|
||||
std::shared_ptr<std::vector<StatInfo>> stat_infos;
|
||||
handler(Status::InvalidArgument("GetListing: argument 'path' cannot be empty"), stat_infos, false);
|
||||
return;
|
||||
}
|
||||
|
||||
GetListingRequestProto req;
|
||||
req.set_src(path);
|
||||
req.set_startafter(start_after.c_str());
|
||||
|
@ -198,6 +214,125 @@ void NameNodeOperations::GetListing(
|
|||
});
|
||||
}
|
||||
|
||||
void NameNodeOperations::Mkdirs(const std::string & path, long permissions, bool createparent,
|
||||
std::function<void(const Status &)> handler)
|
||||
{
|
||||
using ::hadoop::hdfs::MkdirsRequestProto;
|
||||
using ::hadoop::hdfs::MkdirsResponseProto;
|
||||
|
||||
LOG_TRACE(kFileSystem,
|
||||
<< "NameNodeOperations::Mkdirs(" << FMT_THIS_ADDR << ", path=" << path <<
|
||||
", permissions=" << permissions << ", createparent=" << createparent << ") called");
|
||||
|
||||
if (path.empty()) {
|
||||
handler(Status::InvalidArgument("Mkdirs: argument 'path' cannot be empty"));
|
||||
return;
|
||||
}
|
||||
|
||||
MkdirsRequestProto req;
|
||||
req.set_src(path);
|
||||
hadoop::hdfs::FsPermissionProto *perm = req.mutable_masked();
|
||||
if (permissions < 0) {
|
||||
perm->set_perm(0755);
|
||||
} else {
|
||||
perm->set_perm(permissions);
|
||||
}
|
||||
req.set_createparent(createparent);
|
||||
|
||||
auto resp = std::make_shared<MkdirsResponseProto>();
|
||||
|
||||
namenode_.Mkdirs(&req, resp, [resp, handler, path](const Status &stat) {
|
||||
if (stat.ok()) {
|
||||
// Checking resp
|
||||
if(resp -> has_result() && resp ->result() == 1) {
|
||||
handler(stat);
|
||||
} else {
|
||||
//NameNode does not specify why there is no result, in my testing it was happening when the path is not found
|
||||
std::string errormsg = "No such file or directory: " + path;
|
||||
Status statNew = Status::PathNotFound(errormsg.c_str());
|
||||
handler(statNew);
|
||||
}
|
||||
} else {
|
||||
handler(stat);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void NameNodeOperations::Delete(const std::string & path, bool recursive, std::function<void(const Status &)> handler) {
|
||||
using ::hadoop::hdfs::DeleteRequestProto;
|
||||
using ::hadoop::hdfs::DeleteResponseProto;
|
||||
|
||||
LOG_TRACE(kFileSystem,
|
||||
<< "NameNodeOperations::Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called");
|
||||
|
||||
if (path.empty()) {
|
||||
handler(Status::InvalidArgument("Delete: argument 'path' cannot be empty"));
|
||||
return;
|
||||
}
|
||||
|
||||
DeleteRequestProto req;
|
||||
req.set_src(path);
|
||||
req.set_recursive(recursive);
|
||||
|
||||
auto resp = std::make_shared<DeleteResponseProto>();
|
||||
|
||||
namenode_.Delete(&req, resp, [resp, handler, path](const Status &stat) {
|
||||
if (stat.ok()) {
|
||||
// Checking resp
|
||||
if(resp -> has_result() && resp ->result() == 1) {
|
||||
handler(stat);
|
||||
} else {
|
||||
//NameNode does not specify why there is no result, in my testing it was happening when the path is not found
|
||||
std::string errormsg = "No such file or directory: " + path;
|
||||
Status statNew = Status::PathNotFound(errormsg.c_str());
|
||||
handler(statNew);
|
||||
}
|
||||
} else {
|
||||
handler(stat);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void NameNodeOperations::Rename(const std::string & oldPath, const std::string & newPath, std::function<void(const Status &)> handler) {
|
||||
using ::hadoop::hdfs::RenameRequestProto;
|
||||
using ::hadoop::hdfs::RenameResponseProto;
|
||||
|
||||
LOG_TRACE(kFileSystem,
|
||||
<< "NameNodeOperations::Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called");
|
||||
|
||||
if (oldPath.empty()) {
|
||||
handler(Status::InvalidArgument("Rename: argument 'oldPath' cannot be empty"));
|
||||
return;
|
||||
}
|
||||
|
||||
if (newPath.empty()) {
|
||||
handler(Status::InvalidArgument("Rename: argument 'newPath' cannot be empty"));
|
||||
return;
|
||||
}
|
||||
|
||||
RenameRequestProto req;
|
||||
req.set_src(oldPath);
|
||||
req.set_dst(newPath);
|
||||
|
||||
auto resp = std::make_shared<RenameResponseProto>();
|
||||
|
||||
namenode_.Rename(&req, resp, [resp, handler](const Status &stat) {
|
||||
if (stat.ok()) {
|
||||
// Checking resp
|
||||
if(resp -> has_result() && resp ->result() == 1) {
|
||||
handler(stat);
|
||||
} else {
|
||||
//Since NameNode does not specify why the result is not success, we set the general error
|
||||
std::string errormsg = "oldPath and parent directory of newPath must exist. newPath must not exist.";
|
||||
Status statNew = Status::InvalidArgument(errormsg.c_str());
|
||||
handler(statNew);
|
||||
}
|
||||
} else {
|
||||
handler(stat);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void NameNodeOperations::CreateSnapshot(const std::string & path,
|
||||
const std::string & name, std::function<void(const Status &)> handler) {
|
||||
using ::hadoop::hdfs::CreateSnapshotRequestProto;
|
||||
|
@ -207,7 +342,7 @@ void NameNodeOperations::CreateSnapshot(const std::string & path,
|
|||
<< "NameNodeOperations::CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
|
||||
|
||||
if (path.empty()) {
|
||||
handler(Status::InvalidArgument("Argument 'path' cannot be empty"));
|
||||
handler(Status::InvalidArgument("CreateSnapshot: argument 'path' cannot be empty"));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -220,7 +355,7 @@ void NameNodeOperations::CreateSnapshot(const std::string & path,
|
|||
auto resp = std::make_shared<CreateSnapshotResponseProto>();
|
||||
|
||||
namenode_.CreateSnapshot(&req, resp,
|
||||
[resp, handler, path](const Status &stat) {
|
||||
[handler](const Status &stat) {
|
||||
handler(stat);
|
||||
});
|
||||
}
|
||||
|
@ -234,11 +369,11 @@ void NameNodeOperations::DeleteSnapshot(const std::string & path,
|
|||
<< "NameNodeOperations::DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
|
||||
|
||||
if (path.empty()) {
|
||||
handler(Status::InvalidArgument("Argument 'path' cannot be empty"));
|
||||
handler(Status::InvalidArgument("DeleteSnapshot: argument 'path' cannot be empty"));
|
||||
return;
|
||||
}
|
||||
if (name.empty()) {
|
||||
handler(Status::InvalidArgument("Argument 'name' cannot be empty"));
|
||||
handler(Status::InvalidArgument("DeleteSnapshot: argument 'name' cannot be empty"));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -249,7 +384,7 @@ void NameNodeOperations::DeleteSnapshot(const std::string & path,
|
|||
auto resp = std::make_shared<DeleteSnapshotResponseProto>();
|
||||
|
||||
namenode_.DeleteSnapshot(&req, resp,
|
||||
[resp, handler, path](const Status &stat) {
|
||||
[handler](const Status &stat) {
|
||||
handler(stat);
|
||||
});
|
||||
}
|
||||
|
@ -262,7 +397,7 @@ void NameNodeOperations::AllowSnapshot(const std::string & path, std::function<v
|
|||
<< "NameNodeOperations::AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
|
||||
|
||||
if (path.empty()) {
|
||||
handler(Status::InvalidArgument("Argument 'path' cannot be empty"));
|
||||
handler(Status::InvalidArgument("AllowSnapshot: argument 'path' cannot be empty"));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -272,7 +407,7 @@ void NameNodeOperations::AllowSnapshot(const std::string & path, std::function<v
|
|||
auto resp = std::make_shared<AllowSnapshotResponseProto>();
|
||||
|
||||
namenode_.AllowSnapshot(&req, resp,
|
||||
[resp, handler, path](const Status &stat) {
|
||||
[handler](const Status &stat) {
|
||||
handler(stat);
|
||||
});
|
||||
}
|
||||
|
@ -285,7 +420,7 @@ void NameNodeOperations::DisallowSnapshot(const std::string & path, std::functio
|
|||
<< "NameNodeOperations::DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
|
||||
|
||||
if (path.empty()) {
|
||||
handler(Status::InvalidArgument("Argument 'path' cannot be empty"));
|
||||
handler(Status::InvalidArgument("DisallowSnapshot: argument 'path' cannot be empty"));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -295,7 +430,7 @@ void NameNodeOperations::DisallowSnapshot(const std::string & path, std::functio
|
|||
auto resp = std::make_shared<DisallowSnapshotResponseProto>();
|
||||
|
||||
namenode_.DisallowSnapshot(&req, resp,
|
||||
[resp, handler, path](const Status &stat) {
|
||||
[handler](const Status &stat) {
|
||||
handler(stat);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -65,6 +65,15 @@ public:
|
|||
std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> handler,
|
||||
const std::string & start_after = "");
|
||||
|
||||
void Mkdirs(const std::string & path, long permissions, bool createparent,
|
||||
std::function<void(const Status &)> handler);
|
||||
|
||||
void Delete(const std::string & path, bool recursive,
|
||||
std::function<void(const Status &)> handler);
|
||||
|
||||
void Rename(const std::string & oldPath, const std::string & newPath,
|
||||
std::function<void(const Status &)> handler);
|
||||
|
||||
void CreateSnapshot(const std::string & path, const std::string & name,
|
||||
std::function<void(const Status &)> handler);
|
||||
|
||||
|
|
|
@ -172,6 +172,83 @@ TEST_F(HdfsExtTest, TestSnapshotOperations) {
|
|||
hdfsFreeFileInfo(file_infos, 0);
|
||||
}
|
||||
|
||||
//Testing creating directories
|
||||
TEST_F(HdfsExtTest, TestMkdirs) {
|
||||
HdfsHandle connection = cluster.connect_c();
|
||||
hdfsFS fs = connection.handle();
|
||||
EXPECT_NE(nullptr, fs);
|
||||
|
||||
//Correct operation
|
||||
EXPECT_EQ(0, hdfsCreateDirectory(fs, "/myDir123"));
|
||||
|
||||
//TODO Should return error if directory already exists?
|
||||
//EXPECT_EQ(-1, hdfsCreateDirectory(fs, "/myDir123"));
|
||||
//EXPECT_EQ((int) std::errc::file_exists, errno);
|
||||
|
||||
//Creating directory on a path of the existing file
|
||||
std::string path = connection.newFile(1024); //1024 byte file
|
||||
EXPECT_EQ(-1, hdfsCreateDirectory(fs, path.c_str()));
|
||||
EXPECT_EQ((int) std::errc::file_exists, errno);
|
||||
}
|
||||
|
||||
//Testing deleting files and directories
|
||||
TEST_F(HdfsExtTest, TestDelete) {
|
||||
HdfsHandle connection = cluster.connect_c();
|
||||
hdfsFS fs = connection.handle();
|
||||
EXPECT_NE(nullptr, fs);
|
||||
|
||||
//Path not found
|
||||
EXPECT_EQ(-1, hdfsDelete(fs, "/wrong_path", 1));
|
||||
EXPECT_EQ((int) std::errc::no_such_file_or_directory, errno);
|
||||
|
||||
EXPECT_EQ(0, hdfsCreateDirectory(fs, "/myDir"));
|
||||
std::string path = connection.newFile("/myDir", 1024); //1024 byte file
|
||||
|
||||
//Non-recursive delete should fail on a non-empty directory
|
||||
//error ENOTEMPTY(39) for libhdfspp or 255 for libhdfs
|
||||
EXPECT_EQ(-1, hdfsDelete(fs, "/myDir", 0));
|
||||
EXPECT_EQ((int) std::errc::directory_not_empty, errno);
|
||||
|
||||
//Correct operation
|
||||
EXPECT_EQ(0, hdfsDelete(fs, "/myDir", 1));
|
||||
}
|
||||
|
||||
//Testing renaming files and directories
|
||||
TEST_F(HdfsExtTest, TestRename) {
|
||||
HdfsHandle connection = cluster.connect_c();
|
||||
hdfsFS fs = connection.handle();
|
||||
EXPECT_NE(nullptr, fs);
|
||||
|
||||
//Creating directory with two files
|
||||
EXPECT_EQ(0, hdfsCreateDirectory(fs, "/myDir"));
|
||||
std::string file1 = connection.newFile("/myDir", 1024); //1024 byte file
|
||||
std::string file2 = connection.newFile("/myDir", 1024); //1024 byte file
|
||||
std::string file3 = connection.newFile(1024); //1024 byte file
|
||||
|
||||
//Path not found
|
||||
EXPECT_EQ(-1, hdfsRename(fs, "/wrong_path", "/new_name"));
|
||||
EXPECT_EQ((int) std::errc::invalid_argument, errno);
|
||||
|
||||
//No parent directory in new path
|
||||
EXPECT_EQ(-1, hdfsRename(fs, file1.c_str(), "/wrong_parent/new_name"));
|
||||
EXPECT_EQ((int ) std::errc::invalid_argument, errno);
|
||||
|
||||
//New name already exists in the folder
|
||||
EXPECT_EQ(-1, hdfsRename(fs, file1.c_str(), file2.c_str()));
|
||||
EXPECT_EQ((int ) std::errc::invalid_argument, errno);
|
||||
|
||||
//Correct operation
|
||||
EXPECT_EQ(0, hdfsRename(fs, file1.c_str(), "/myDir/new_awesome_name"));
|
||||
EXPECT_EQ(0, hdfsRename(fs, file3.c_str(), "/myDir/another_file"));
|
||||
EXPECT_EQ(0, hdfsRename(fs, "/myDir", "/new_awesome_dir"));
|
||||
|
||||
//Verification
|
||||
int numEntries;
|
||||
hdfsFileInfo * dirList = hdfsListDirectory(fs, "/new_awesome_dir", &numEntries);
|
||||
EXPECT_NE(nullptr, dirList);
|
||||
EXPECT_EQ(3, numEntries);
|
||||
hdfsFreeFileInfo(dirList, 3);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -268,11 +268,11 @@ int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) {
|
|||
}
|
||||
|
||||
int hdfsDelete(hdfsFS fs, const char* path, int recursive) {
|
||||
return libhdfs_hdfsDelete(fs->libhdfsRep, path, recursive);
|
||||
return libhdfspp_hdfsDelete(fs->libhdfsppRep, path, recursive);
|
||||
}
|
||||
|
||||
int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) {
|
||||
return libhdfs_hdfsRename(fs->libhdfsRep, oldPath, newPath);
|
||||
return libhdfspp_hdfsRename(fs->libhdfsppRep, oldPath, newPath);
|
||||
}
|
||||
|
||||
char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) {
|
||||
|
@ -284,7 +284,7 @@ int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) {
|
|||
}
|
||||
|
||||
int hdfsCreateDirectory(hdfsFS fs, const char* path) {
|
||||
return libhdfs_hdfsCreateDirectory(fs->libhdfsRep, path);
|
||||
return libhdfspp_hdfsCreateDirectory(fs->libhdfsppRep, path);
|
||||
}
|
||||
|
||||
int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) {
|
||||
|
|
Loading…
Reference in New Issue