HDFS-9271: libhdfs++: Implement basic NN operations. Contributed by Anatoli Shein.

This commit is contained in:
James 2016-08-10 13:27:31 -04:00 committed by James Clampffer
parent b1ed72e098
commit 2a8edd4e52
20 changed files with 1157 additions and 131 deletions

View File

@ -93,13 +93,12 @@ static int doTestGetDefaultBlockSize(hdfsFS fs, const char *path)
blockSize = hdfsGetDefaultBlockSize(fs); blockSize = hdfsGetDefaultBlockSize(fs);
if (blockSize < 0) { if (blockSize < 0) {
ret = errno; fprintf(stderr, "hdfsGetDefaultBlockSize failed with error %d\n", errno);
fprintf(stderr, "hdfsGetDefaultBlockSize failed with error %d\n", ret); return -1;
return ret;
} else if (blockSize != TLH_DEFAULT_BLOCK_SIZE) { } else if (blockSize != TLH_DEFAULT_BLOCK_SIZE) {
fprintf(stderr, "hdfsGetDefaultBlockSize got %"PRId64", but we " fprintf(stderr, "hdfsGetDefaultBlockSize got %"PRId64", but we "
"expected %d\n", blockSize, TLH_DEFAULT_BLOCK_SIZE); "expected %d\n", blockSize, TLH_DEFAULT_BLOCK_SIZE);
return EIO; return -1;
} }
blockSize = hdfsGetDefaultBlockSizeAtPath(fs, path); blockSize = hdfsGetDefaultBlockSizeAtPath(fs, path);
@ -205,6 +204,8 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs,
EXPECT_ZERO(hdfsHSync(fs, file)); EXPECT_ZERO(hdfsHSync(fs, file));
EXPECT_ZERO(hdfsCloseFile(fs, file)); EXPECT_ZERO(hdfsCloseFile(fs, file));
EXPECT_ZERO(doTestGetDefaultBlockSize(fs, paths->file1));
/* There should be 1 entry in the directory. */ /* There should be 1 entry in the directory. */
hdfsFileInfo * dirList = hdfsListDirectory(fs, paths->prefix, &numEntries); hdfsFileInfo * dirList = hdfsListDirectory(fs, paths->prefix, &numEntries);
EXPECT_NONNULL(dirList); EXPECT_NONNULL(dirList);

View File

@ -115,6 +115,20 @@ int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key,
LIBHDFS_EXTERNAL LIBHDFS_EXTERNAL
int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val); int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val);
/**
* Get a configuration long from the settings currently read into the builder.
*
* @param key The key to find
* @param val (out param) The value. This will NOT be changed if the
* key isn't found.
*
* @return 0 on success; -1 otherwise.
* Failure to find the key is not an error.
*/
LIBHDFS_EXTERNAL
int hdfsBuilderConfGetLong(struct hdfsBuilder *bld, const char *key, int64_t *val);
struct hdfsDNInfo { struct hdfsDNInfo {
const char * ip_address; const char * ip_address;
const char * hostname; const char * hostname;

View File

@ -124,6 +124,12 @@ public:
*/ */
virtual void SetFileEventCallback(file_event_callback callback) = 0; virtual void SetFileEventCallback(file_event_callback callback) = 0;
/* how many bytes have been successfully read */
virtual uint64_t get_bytes_read() = 0;
/* resets the number of bytes read to zero */
virtual void clear_bytes_read() = 0;
virtual ~FileHandle(); virtual ~FileHandle();
}; };
@ -171,6 +177,41 @@ class FileSystem {
const std::function<void(const Status &, FileHandle *)> &handler) = 0; const std::function<void(const Status &, FileHandle *)> &handler) = 0;
virtual Status Open(const std::string &path, FileHandle **handle) = 0; virtual Status Open(const std::string &path, FileHandle **handle) = 0;
/**
* Get the block size for the given file.
* @param path The path to the file
*/
virtual void GetPreferredBlockSize(const std::string &path,
const std::function<void(const Status &, const uint64_t &)> &handler) = 0;
virtual Status GetPreferredBlockSize(const std::string &path, uint64_t & block_size) = 0;
/**
* Set replication for an existing file.
* <p>
* The NameNode sets replication to the new value and returns.
* The actual block replication is not expected to be performed during
* this method call. The blocks will be populated or removed in the
* background as the result of the routine block maintenance procedures.
*
* @param src file name
* @param replication new replication
*/
virtual void SetReplication(const std::string & path, int16_t replication, std::function<void(const Status &)> handler) = 0;
virtual Status SetReplication(const std::string & path, int16_t replication) = 0;
/**
* Sets the modification and access time of the file to the specified time.
* @param src The string representation of the path
* @param mtime The number of milliseconds since Jan 1, 1970.
* Setting mtime to -1 means that modification time should not
* be set by this call.
* @param atime The number of milliseconds since Jan 1, 1970.
* Setting atime to -1 means that access time should not be set
* by this call.
*/
virtual void SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, std::function<void(const Status &)> handler) = 0;
virtual Status SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) = 0;
/** /**
* Returns metadata about the file if the file/directory exists. * Returns metadata about the file if the file/directory exists.
**/ **/
@ -209,12 +250,12 @@ class FileSystem {
std::shared_ptr<std::vector<StatInfo>> & stat_infos) = 0; std::shared_ptr<std::vector<StatInfo>> & stat_infos) = 0;
/** /**
* Returns the locations of all known blocks for the indicated file, or an error * Returns the locations of all known blocks for the indicated file (or part of it), or an error
* if the information clould not be found * if the information clould not be found
*/ */
virtual void GetBlockLocations(const std::string & path, virtual void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> ) = 0; const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> ) = 0;
virtual Status GetBlockLocations(const std::string & path, virtual Status GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
std::shared_ptr<FileBlockLocation> * locations) = 0; std::shared_ptr<FileBlockLocation> * locations) = 0;
/** /**
@ -224,9 +265,9 @@ class FileSystem {
* @param permissions Permissions for the new directory (negative value for the default permissions) * @param permissions Permissions for the new directory (negative value for the default permissions)
* @param createparent Create parent directories if they do not exist (may not be empty) * @param createparent Create parent directories if they do not exist (may not be empty)
*/ */
virtual void Mkdirs(const std::string & path, long permissions, bool createparent, virtual void Mkdirs(const std::string & path, uint16_t permissions, bool createparent,
std::function<void(const Status &)> handler) = 0; std::function<void(const Status &)> handler) = 0;
virtual Status Mkdirs(const std::string & path, long permissions, bool createparent) = 0; virtual Status Mkdirs(const std::string & path, uint16_t permissions, bool createparent) = 0;
/** /**
* Delete the given file or directory from the file system. * Delete the given file or directory from the file system.
@ -257,8 +298,8 @@ class FileSystem {
* @param permissions the bitmask to set it to (should be between 0 and 01777) * @param permissions the bitmask to set it to (should be between 0 and 01777)
*/ */
virtual void SetPermission(const std::string & path, virtual void SetPermission(const std::string & path,
short permissions, const std::function<void(const Status &)> &handler) = 0; uint16_t permissions, const std::function<void(const Status &)> &handler) = 0;
virtual Status SetPermission(const std::string & path, short permissions) = 0; virtual Status SetPermission(const std::string & path, uint16_t permissions) = 0;
/** /**
* Set Owner of a path (i.e. a file or a directory). * Set Owner of a path (i.e. a file or a directory).
@ -335,6 +376,8 @@ class FileSystem {
* @param callback The function to call when a reporting event occurs. * @param callback The function to call when a reporting event occurs.
*/ */
virtual void SetFsEventCallback(fs_event_callback callback) = 0; virtual void SetFsEventCallback(fs_event_callback callback) = 0;
virtual Options get_options() = 0;
}; };
} }

View File

@ -116,6 +116,13 @@ struct Options {
Authentication authentication; Authentication authentication;
static const Authentication kDefaultAuthentication = kSimple; static const Authentication kDefaultAuthentication = kSimple;
/**
* Block size in bytes.
* Default: 128 * 1024 * 1024 = 134217728
**/
long block_size;
static const long kDefaultBlockSize = 128*1024*1024;
Options(); Options();
}; };
} }

View File

@ -46,12 +46,16 @@ class Status {
static Status Canceled(); static Status Canceled();
static Status PathNotFound(const char *msg); static Status PathNotFound(const char *msg);
static Status InvalidOffset(const char *msg); static Status InvalidOffset(const char *msg);
static Status PathIsNotDirectory(const char *msg);
// success // success
bool ok() const { return code_ == 0; } bool ok() const { return code_ == 0; }
bool is_invalid_offset() const { return code_ == kInvalidOffset; } bool is_invalid_offset() const { return code_ == kInvalidOffset; }
// contains ENOENT error
bool pathNotFound() const { return code_ == kPathNotFound; }
// Returns the string "OK" for success. // Returns the string "OK" for success.
std::string ToString() const; std::string ToString() const;

View File

@ -43,15 +43,18 @@ static constexpr tPort kDefaultPort = 8020;
/* Separate the handles used by the C api from the C++ API*/ /* Separate the handles used by the C api from the C++ API*/
struct hdfs_internal { struct hdfs_internal {
hdfs_internal(FileSystem *p) : filesystem_(p) {} hdfs_internal(FileSystem *p) : filesystem_(p), working_directory("/") {}
hdfs_internal(std::unique_ptr<FileSystem> p) hdfs_internal(std::unique_ptr<FileSystem> p)
: filesystem_(std::move(p)) {} : filesystem_(std::move(p)), working_directory("/") {}
virtual ~hdfs_internal(){}; virtual ~hdfs_internal(){};
FileSystem *get_impl() { return filesystem_.get(); } FileSystem *get_impl() { return filesystem_.get(); }
const FileSystem *get_impl() const { return filesystem_.get(); } const FileSystem *get_impl() const { return filesystem_.get(); }
std::string get_working_directory() { return working_directory; }
void set_working_directory(std::string new_directory) { working_directory = new_directory; }
private: private:
std::unique_ptr<FileSystem> filesystem_; std::unique_ptr<FileSystem> filesystem_;
std::string working_directory; //has to always start and end with '/'
}; };
struct hdfsFile_internal { struct hdfsFile_internal {
@ -198,6 +201,7 @@ static int ReportCaughtNonException()
return Error(Status::Exception("Uncaught value not derived from std::exception", "")); return Error(Status::Exception("Uncaught value not derived from std::exception", ""));
} }
/* return false on failure */
bool CheckSystem(hdfsFS fs) { bool CheckSystem(hdfsFS fs) {
if (!fs) { if (!fs) {
ReportError(ENODEV, "Cannot perform FS operations with null FS handle."); ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
@ -208,10 +212,7 @@ bool CheckSystem(hdfsFS fs) {
} }
/* return false on failure */ /* return false on failure */
bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) { bool CheckHandle(hdfsFile file) {
if (!CheckSystem(fs))
return false;
if (!file) { if (!file) {
ReportError(EBADF, "Cannot perform FS operations with null File handle."); ReportError(EBADF, "Cannot perform FS operations with null File handle.");
return false; return false;
@ -219,16 +220,60 @@ bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) {
return true; return true;
} }
/* return false on failure */
bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) {
if (!CheckSystem(fs))
return false;
if (!CheckHandle(file))
return false;
return true;
}
optional<std::string> getAbsolutePath(hdfsFS fs, const char* path) {
//Does not support . (dot) and .. (double dot) semantics
if (!path || path[0] == '\0') {
Error(Status::InvalidArgument("getAbsolutePath: argument 'path' cannot be NULL or empty"));
return optional<std::string>();
}
if (path[0] != '/') {
//we know that working directory always ends with '/'
return fs->get_working_directory().append(path);
}
return optional<std::string>(path);
}
/** /**
* C API implementations * C API implementations
**/ **/
int hdfsFileIsOpenForRead(hdfsFile file) { int hdfsFileIsOpenForRead(hdfsFile file) {
/* files can only be open for reads at the moment, do a quick check */ /* files can only be open for reads at the moment, do a quick check */
if (file) { if (!CheckHandle(file)){
return 1; // Update implementation when we get file writing return 0;
}
return 1; // Update implementation when we get file writing
}
int hdfsFileIsOpenForWrite(hdfsFile file) {
/* files can only be open for reads at the moment, so return false */
CheckHandle(file);
return -1; // Update implementation when we get file writing
}
int hdfsConfGetLong(const char *key, int64_t *val)
{
try
{
errno = 0;
hdfsBuilder builder;
return hdfsBuilderConfGetLong(&builder, key, val);
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
} }
return 0;
} }
hdfsFS doHdfsConnect(optional<std::string> nn, optional<tPort> port, optional<std::string> user, const Options & options) { hdfsFS doHdfsConnect(optional<std::string> nn, optional<tPort> port, optional<std::string> user, const Options & options) {
@ -329,8 +374,12 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
ReportError(ENODEV, "Cannot perform FS operations with null FS handle."); ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
return nullptr; return nullptr;
} }
const optional<std::string> abs_path = getAbsolutePath(fs, path);
if(!abs_path) {
return nullptr;
}
FileHandle *f = nullptr; FileHandle *f = nullptr;
Status stat = fs->get_impl()->Open(path, &f); Status stat = fs->get_impl()->Open(*abs_path, &f);
if (!stat.ok()) { if (!stat.ok()) {
Error(stat); Error(stat);
return nullptr; return nullptr;
@ -364,11 +413,165 @@ int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
} }
} }
char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) {
try
{
errno = 0;
if (!CheckSystem(fs)) {
return nullptr;
}
std::string wd = fs->get_working_directory();
size_t size = wd.size();
if (size + 1 > bufferSize) {
std::stringstream ss;
ss << "hdfsGetWorkingDirectory: bufferSize is " << bufferSize <<
", which is not enough to fit working directory of size " << (size + 1);
Error(Status::InvalidArgument(ss.str().c_str()));
return nullptr;
}
wd.copy(buffer, size);
buffer[size] = '\0';
return buffer;
} catch (const std::exception & e) {
ReportException(e);
return nullptr;
} catch (...) {
ReportCaughtNonException();
return nullptr;
}
}
int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) {
try
{
errno = 0;
if (!CheckSystem(fs)) {
return -1;
}
optional<std::string> abs_path = getAbsolutePath(fs, path);
if(!abs_path) {
return -1;
}
//Enforce last character to be '/'
std::string withSlash = *abs_path;
char last = withSlash.back();
if (last != '/'){
withSlash += '/';
}
fs->set_working_directory(withSlash);
return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
}
int hdfsAvailable(hdfsFS fs, hdfsFile file) {
//Since we do not have read ahead implemented, return 0 if fs and file are good;
errno = 0;
if (!CheckSystemAndHandle(fs, file)) {
return -1;
}
return 0;
}
tOffset hdfsGetDefaultBlockSize(hdfsFS fs) {
try {
errno = 0;
return fs->get_impl()->get_options().block_size;
} catch (const std::exception & e) {
ReportException(e);
return -1;
} catch (...) {
ReportCaughtNonException();
return -1;
}
}
tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path) {
try {
errno = 0;
if (!CheckSystem(fs)) {
return -1;
}
const optional<std::string> abs_path = getAbsolutePath(fs, path);
if(!abs_path) {
return -1;
}
uint64_t block_size;
Status stat = fs->get_impl()->GetPreferredBlockSize(*abs_path, block_size);
if (!stat.ok()) {
if (stat.pathNotFound()){
return fs->get_impl()->get_options().block_size;
} else {
return Error(stat);
}
}
return block_size;
} catch (const std::exception & e) {
ReportException(e);
return -1;
} catch (...) {
ReportCaughtNonException();
return -1;
}
}
int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) {
try {
errno = 0;
if (!CheckSystem(fs)) {
return -1;
}
const optional<std::string> abs_path = getAbsolutePath(fs, path);
if(!abs_path) {
return -1;
}
if(replication < 1){
return Error(Status::InvalidArgument("SetReplication: argument 'replication' cannot be less than 1"));
}
Status stat;
stat = fs->get_impl()->SetReplication(*abs_path, replication);
if (!stat.ok()) {
return Error(stat);
}
return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
}
int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) {
try {
errno = 0;
if (!CheckSystem(fs)) {
return -1;
}
const optional<std::string> abs_path = getAbsolutePath(fs, path);
if(!abs_path) {
return -1;
}
Status stat;
stat = fs->get_impl()->SetTimes(*abs_path, mtime, atime);
if (!stat.ok()) {
return Error(stat);
}
return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
}
tOffset hdfsGetCapacity(hdfsFS fs) { tOffset hdfsGetCapacity(hdfsFS fs) {
try { try {
errno = 0; errno = 0;
if (!CheckSystem(fs)) { if (!CheckSystem(fs)) {
return -1; return -1;
} }
hdfs::FsInfo fs_info; hdfs::FsInfo fs_info;
@ -391,7 +594,7 @@ tOffset hdfsGetUsed(hdfsFS fs) {
try { try {
errno = 0; errno = 0;
if (!CheckSystem(fs)) { if (!CheckSystem(fs)) {
return -1; return -1;
} }
hdfs::FsInfo fs_info; hdfs::FsInfo fs_info;
@ -459,15 +662,41 @@ void StatInfoToHdfsFileInfo(hdfsFileInfo * file_info,
file_info->mLastAccess = stat_info.access_time; file_info->mLastAccess = stat_info.access_time;
} }
int hdfsExists(hdfsFS fs, const char *path) {
try {
errno = 0;
if (!CheckSystem(fs)) {
return -1;
}
const optional<std::string> abs_path = getAbsolutePath(fs, path);
if(!abs_path) {
return -1;
}
hdfs::StatInfo stat_info;
Status stat = fs->get_impl()->GetFileInfo(*abs_path, stat_info);
if (!stat.ok()) {
return Error(stat);
}
return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
}
hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) { hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) {
try { try {
errno = 0; errno = 0;
if (!CheckSystem(fs)) { if (!CheckSystem(fs)) {
return nullptr; return nullptr;
} }
const optional<std::string> abs_path = getAbsolutePath(fs, path);
if(!abs_path) {
return nullptr;
}
hdfs::StatInfo stat_info; hdfs::StatInfo stat_info;
Status stat = fs->get_impl()->GetFileInfo(path, stat_info); Status stat = fs->get_impl()->GetFileInfo(*abs_path, stat_info);
if (!stat.ok()) { if (!stat.ok()) {
Error(stat); Error(stat);
return nullptr; return nullptr;
@ -491,9 +720,12 @@ hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) {
*numEntries = 0; *numEntries = 0;
return nullptr; return nullptr;
} }
const optional<std::string> abs_path = getAbsolutePath(fs, path);
if(!abs_path) {
return nullptr;
}
std::shared_ptr<std::vector<StatInfo>> stat_infos; std::shared_ptr<std::vector<StatInfo>> stat_infos;
Status stat = fs->get_impl()->GetListing(path, stat_infos); Status stat = fs->get_impl()->GetListing(*abs_path, stat_infos);
if (!stat.ok()) { if (!stat.ok()) {
Error(stat); Error(stat);
*numEntries = 0; *numEntries = 0;
@ -540,12 +772,13 @@ int hdfsCreateDirectory(hdfsFS fs, const char* path) {
if (!CheckSystem(fs)) { if (!CheckSystem(fs)) {
return -1; return -1;
} }
if (!path) { const optional<std::string> abs_path = getAbsolutePath(fs, path);
return Error(Status::InvalidArgument("hdfsCreateDirectory: argument 'path' cannot be NULL")); if(!abs_path) {
return -1;
} }
Status stat; Status stat;
//-1 for default permissions and true for creating all non-existant parent directories //Use default permissions and set true for creating all non-existant parent directories
stat = fs->get_impl()->Mkdirs(path, -1, true); stat = fs->get_impl()->Mkdirs(*abs_path, NameNodeOperations::GetDefaultPermissionMask(), true);
if (!stat.ok()) { if (!stat.ok()) {
return Error(stat); return Error(stat);
} }
@ -563,11 +796,12 @@ int hdfsDelete(hdfsFS fs, const char* path, int recursive) {
if (!CheckSystem(fs)) { if (!CheckSystem(fs)) {
return -1; return -1;
} }
if (!path) { const optional<std::string> abs_path = getAbsolutePath(fs, path);
return Error(Status::InvalidArgument("hdfsDelete: argument 'path' cannot be NULL")); if(!abs_path) {
return -1;
} }
Status stat; Status stat;
stat = fs->get_impl()->Delete(path, recursive); stat = fs->get_impl()->Delete(*abs_path, recursive);
if (!stat.ok()) { if (!stat.ok()) {
return Error(stat); return Error(stat);
} }
@ -585,14 +819,13 @@ int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) {
if (!CheckSystem(fs)) { if (!CheckSystem(fs)) {
return -1; return -1;
} }
if (!oldPath) { const optional<std::string> old_abs_path = getAbsolutePath(fs, oldPath);
return Error(Status::InvalidArgument("hdfsRename: argument 'oldPath' cannot be NULL")); const optional<std::string> new_abs_path = getAbsolutePath(fs, newPath);
} if(!old_abs_path || !new_abs_path) {
if (!newPath) { return -1;
return Error(Status::InvalidArgument("hdfsRename: argument 'newPath' cannot be NULL"));
} }
Status stat; Status stat;
stat = fs->get_impl()->Rename(oldPath, newPath); stat = fs->get_impl()->Rename(*old_abs_path, *new_abs_path);
if (!stat.ok()) { if (!stat.ok()) {
return Error(stat); return Error(stat);
} }
@ -610,14 +843,15 @@ int hdfsChmod(hdfsFS fs, const char* path, short mode){
if (!CheckSystem(fs)) { if (!CheckSystem(fs)) {
return -1; return -1;
} }
if (!path) { const optional<std::string> abs_path = getAbsolutePath(fs, path);
return Error(Status::InvalidArgument("hdfsChmod: argument 'path' cannot be NULL")); if(!abs_path) {
return -1;
} }
Status stat = NameNodeOperations::CheckValidPermissionMask(mode); Status stat = NameNodeOperations::CheckValidPermissionMask(mode);
if (!stat.ok()) { if (!stat.ok()) {
return Error(stat); return Error(stat);
} }
stat = fs->get_impl()->SetPermission(path, mode); stat = fs->get_impl()->SetPermission(*abs_path, mode);
if (!stat.ok()) { if (!stat.ok()) {
return Error(stat); return Error(stat);
} }
@ -635,14 +869,15 @@ int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group)
if (!CheckSystem(fs)) { if (!CheckSystem(fs)) {
return -1; return -1;
} }
if (!path) { const optional<std::string> abs_path = getAbsolutePath(fs, path);
return Error(Status::InvalidArgument("hdfsChown: argument 'path' cannot be NULL")); if(!abs_path) {
return -1;
} }
std::string own = (owner) ? owner : ""; std::string own = (owner) ? owner : "";
std::string grp = (group) ? group : ""; std::string grp = (group) ? group : "";
Status stat; Status stat;
stat = fs->get_impl()->SetOwner(path, own, grp); stat = fs->get_impl()->SetOwner(*abs_path, own, grp);
if (!stat.ok()) { if (!stat.ok()) {
return Error(stat); return Error(stat);
} }
@ -660,14 +895,15 @@ int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) {
if (!CheckSystem(fs)) { if (!CheckSystem(fs)) {
return -1; return -1;
} }
if (!path) { const optional<std::string> abs_path = getAbsolutePath(fs, path);
return Error(Status::InvalidArgument("hdfsCreateSnapshot: argument 'path' cannot be NULL")); if(!abs_path) {
return -1;
} }
Status stat; Status stat;
if(!name){ if(!name){
stat = fs->get_impl()->CreateSnapshot(path, ""); stat = fs->get_impl()->CreateSnapshot(*abs_path, "");
} else { } else {
stat = fs->get_impl()->CreateSnapshot(path, name); stat = fs->get_impl()->CreateSnapshot(*abs_path, name);
} }
if (!stat.ok()) { if (!stat.ok()) {
return Error(stat); return Error(stat);
@ -686,14 +922,15 @@ int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const char* name) {
if (!CheckSystem(fs)) { if (!CheckSystem(fs)) {
return -1; return -1;
} }
if (!path) { const optional<std::string> abs_path = getAbsolutePath(fs, path);
return Error(Status::InvalidArgument("hdfsDeleteSnapshot: argument 'path' cannot be NULL")); if(!abs_path) {
return -1;
} }
if (!name) { if (!name) {
return Error(Status::InvalidArgument("hdfsDeleteSnapshot: argument 'name' cannot be NULL")); return Error(Status::InvalidArgument("hdfsDeleteSnapshot: argument 'name' cannot be NULL"));
} }
Status stat; Status stat;
stat = fs->get_impl()->DeleteSnapshot(path, name); stat = fs->get_impl()->DeleteSnapshot(*abs_path, name);
if (!stat.ok()) { if (!stat.ok()) {
return Error(stat); return Error(stat);
} }
@ -711,11 +948,12 @@ int hdfsAllowSnapshot(hdfsFS fs, const char* path) {
if (!CheckSystem(fs)) { if (!CheckSystem(fs)) {
return -1; return -1;
} }
if (!path) { const optional<std::string> abs_path = getAbsolutePath(fs, path);
return Error(Status::InvalidArgument("hdfsAllowSnapshot: argument 'path' cannot be NULL")); if(!abs_path) {
return -1;
} }
Status stat; Status stat;
stat = fs->get_impl()->AllowSnapshot(path); stat = fs->get_impl()->AllowSnapshot(*abs_path);
if (!stat.ok()) { if (!stat.ok()) {
return Error(stat); return Error(stat);
} }
@ -733,11 +971,12 @@ int hdfsDisallowSnapshot(hdfsFS fs, const char* path) {
if (!CheckSystem(fs)) { if (!CheckSystem(fs)) {
return -1; return -1;
} }
if (!path) { const optional<std::string> abs_path = getAbsolutePath(fs, path);
return Error(Status::InvalidArgument("hdfsDisallowSnapshot: argument 'path' cannot be NULL")); if(!abs_path) {
return -1;
} }
Status stat; Status stat;
stat = fs->get_impl()->DisallowSnapshot(path); stat = fs->get_impl()->DisallowSnapshot(*abs_path);
if (!stat.ok()) { if (!stat.ok()) {
return Error(stat); return Error(stat);
} }
@ -793,6 +1032,55 @@ tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) {
} }
} }
int hdfsUnbufferFile(hdfsFile file) {
//Currently we are not doing any buffering
CheckHandle(file);
return -1;
}
int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics **stats) {
try
{
errno = 0;
if (!CheckHandle(file)) {
return -1;
}
*stats = new hdfsReadStatistics;
memset(*stats, 0, sizeof(hdfsReadStatistics));
(*stats)->totalBytesRead = file->get_impl()->get_bytes_read();
return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
}
int hdfsFileClearReadStatistics(hdfsFile file) {
try
{
errno = 0;
if (!CheckHandle(file)) {
return -1;
}
file->get_impl()->clear_bytes_read();
return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
}
int64_t hdfsReadStatisticsGetRemoteBytesRead(const struct hdfsReadStatistics *stats) {
return stats->totalBytesRead - stats->totalLocalBytesRead;
}
void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) {
errno = 0;
delete stats;
}
/* 0 on success, -1 on error*/ /* 0 on success, -1 on error*/
int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
try try
@ -868,9 +1156,12 @@ int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations
ReportError(EINVAL, "Null pointer passed to hdfsGetBlockLocations"); ReportError(EINVAL, "Null pointer passed to hdfsGetBlockLocations");
return -1; return -1;
} }
const optional<std::string> abs_path = getAbsolutePath(fs, path);
if(!abs_path) {
return -1;
}
std::shared_ptr<FileBlockLocation> ppLocations; std::shared_ptr<FileBlockLocation> ppLocations;
Status stat = fs->get_impl()->GetBlockLocations(path, &ppLocations); Status stat = fs->get_impl()->GetBlockLocations(*abs_path, 0, std::numeric_limits<int64_t>::max(), &ppLocations);
if (!stat.ok()) { if (!stat.ok()) {
return Error(stat); return Error(stat);
} }
@ -943,6 +1234,59 @@ int hdfsFreeBlockLocations(struct hdfsBlockLocations * blockLocations) {
return 0; return 0;
} }
char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) {
try
{
errno = 0;
if (!CheckSystem(fs)) {
return nullptr;
}
const optional<std::string> abs_path = getAbsolutePath(fs, path);
if(!abs_path) {
return nullptr;
}
std::shared_ptr<FileBlockLocation> ppLocations;
Status stat = fs->get_impl()->GetBlockLocations(*abs_path, start, length, &ppLocations);
if (!stat.ok()) {
Error(stat);
return nullptr;
}
const std::vector<BlockLocation> & ppBlockLocations = ppLocations->getBlockLocations();
char ***hosts = new char**[ppBlockLocations.size() + 1];
for (size_t i=0; i < ppBlockLocations.size(); i++) {
const std::vector<DNInfo> & ppDNInfos = ppBlockLocations[i].getDataNodes();
hosts[i] = new char*[ppDNInfos.size() + 1];
for (size_t j=0; j < ppDNInfos.size(); j++) {
auto ppDNInfo = ppDNInfos[j];
hosts[i][j] = new char[ppDNInfo.getHostname().size() + 1];
strncpy(hosts[i][j], ppDNInfo.getHostname().c_str(), ppDNInfo.getHostname().size() + 1);
}
hosts[i][ppDNInfos.size()] = nullptr;
}
hosts[ppBlockLocations.size()] = nullptr;
return hosts;
} catch (const std::exception & e) {
ReportException(e);
return nullptr;
} catch (...) {
ReportCaughtNonException();
return nullptr;
}
}
void hdfsFreeHosts(char ***blockHosts) {
errno = 0;
if (blockHosts == nullptr)
return;
for (size_t i = 0; blockHosts[i]; i++) {
for (size_t j = 0; blockHosts[i][j]; j++) {
delete[] blockHosts[i][j];
}
delete[] blockHosts[i];
}
delete blockHosts;
}
/******************************************************************* /*******************************************************************
* EVENT CALLBACKS * EVENT CALLBACKS
@ -1234,6 +1578,28 @@ int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val
} }
} }
int hdfsBuilderConfGetLong(struct hdfsBuilder *bld, const char *key, int64_t *val)
{
try
{
errno = 0;
// Pull from default configuration
optional<int64_t> value = bld->config.GetInt(key);
if (value)
{
*val = *value;
return 0;
}
// If not found, don't change val
ReportError(EINVAL, "Could not get Builder value");
return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
}
/** /**
* Logging functions * Logging functions
**/ **/

View File

@ -144,6 +144,7 @@ Options HdfsConfiguration::GetOptions() {
OptionalSet(result.max_rpc_retries, GetInt(kIpcClientConnectMaxRetriesKey)); OptionalSet(result.max_rpc_retries, GetInt(kIpcClientConnectMaxRetriesKey));
OptionalSet(result.rpc_retry_delay_ms, GetInt(kIpcClientConnectRetryIntervalKey)); OptionalSet(result.rpc_retry_delay_ms, GetInt(kIpcClientConnectRetryIntervalKey));
OptionalSet(result.defaultFS, GetUri(kFsDefaultFsKey)); OptionalSet(result.defaultFS, GetUri(kFsDefaultFsKey));
OptionalSet(result.block_size, GetInt(kDfsBlockSizeKey));
OptionalSet(result.failover_max_retries, GetInt(kDfsClientFailoverMaxAttempts)); OptionalSet(result.failover_max_retries, GetInt(kDfsClientFailoverMaxAttempts));

View File

@ -45,6 +45,7 @@ class HdfsConfiguration : public Configuration {
static constexpr const char * kHadoopSecurityAuthenticationKey = "hadoop.security.authentication"; static constexpr const char * kHadoopSecurityAuthenticationKey = "hadoop.security.authentication";
static constexpr const char * kHadoopSecurityAuthentication_simple = "simple"; static constexpr const char * kHadoopSecurityAuthentication_simple = "simple";
static constexpr const char * kHadoopSecurityAuthentication_kerberos = "kerberos"; static constexpr const char * kHadoopSecurityAuthentication_kerberos = "kerberos";
static constexpr const char * kDfsBlockSizeKey = "dfs.blocksize";
static constexpr const char * kDfsClientFailoverMaxAttempts = "dfs.client.failover.max.attempts"; static constexpr const char * kDfsClientFailoverMaxAttempts = "dfs.client.failover.max.attempts";
static constexpr const char * kDfsClientFailoverConnectionRetriesOnTimeouts = "dfs.client.failover.connection.retries.on.timeouts"; static constexpr const char * kDfsClientFailoverConnectionRetriesOnTimeouts = "dfs.client.failover.connection.retries.on.timeouts";

View File

@ -28,6 +28,7 @@ const int Options::kDefaultRpcRetryDelayMs;
const unsigned int Options::kDefaultHostExclusionDuration; const unsigned int Options::kDefaultHostExclusionDuration;
const unsigned int Options::kDefaultFailoverMaxRetries; const unsigned int Options::kDefaultFailoverMaxRetries;
const unsigned int Options::kDefaultFailoverConnectionMaxRetries; const unsigned int Options::kDefaultFailoverConnectionMaxRetries;
const long Options::kDefaultBlockSize;
Options::Options() : rpc_timeout(kDefaultRpcTimeout), Options::Options() : rpc_timeout(kDefaultRpcTimeout),
rpc_connect_timeout(kDefaultRpcConnectTimeout), rpc_connect_timeout(kDefaultRpcConnectTimeout),
@ -37,7 +38,8 @@ Options::Options() : rpc_timeout(kDefaultRpcTimeout),
defaultFS(), defaultFS(),
failover_max_retries(kDefaultFailoverMaxRetries), failover_max_retries(kDefaultFailoverMaxRetries),
failover_connection_max_retries(kDefaultFailoverConnectionMaxRetries), failover_connection_max_retries(kDefaultFailoverConnectionMaxRetries),
authentication(kDefaultAuthentication) {} authentication(kDefaultAuthentication),
block_size(kDefaultBlockSize) {}
std::string NamenodeInfo::get_host() const { std::string NamenodeInfo::get_host() const {
return uri.get_host(); return uri.get_host();

View File

@ -89,6 +89,10 @@ Status Status::ResourceUnavailable(const char *msg) {
return Status(kResourceUnavailable, msg); return Status(kResourceUnavailable, msg);
} }
Status Status::PathIsNotDirectory(const char *msg) {
return Status(kNotADirectory, msg);
}
Status Status::Unimplemented() { Status Status::Unimplemented() {
return Status(kUnimplemented, ""); return Status(kUnimplemented, "");
} }

View File

@ -107,6 +107,15 @@ std::string SafeDisconnect(asio::ip::tcp::socket *sock) {
return err; return err;
} }
bool IsHighBitSet(uint64_t num) {
uint64_t firstBit = (uint64_t) 1 << 63;
if (num & firstBit) {
return true;
} else {
return false;
}
}
} }
void ShutdownProtobufLibrary_C() { void ShutdownProtobufLibrary_C() {

View File

@ -110,6 +110,8 @@ inline asio::ip::tcp::socket *get_asio_socket_ptr<asio::ip::tcp::socket>
return s; return s;
} }
//Check if the high bit is set
bool IsHighBitSet(uint64_t num);
} }
#endif #endif

View File

@ -41,7 +41,7 @@ FileHandleImpl::FileHandleImpl(const std::string & cluster_name,
std::shared_ptr<BadDataNodeTracker> bad_data_nodes, std::shared_ptr<BadDataNodeTracker> bad_data_nodes,
std::shared_ptr<LibhdfsEvents> event_handlers) std::shared_ptr<LibhdfsEvents> event_handlers)
: cluster_name_(cluster_name), path_(path), io_service_(io_service), client_name_(client_name), file_info_(file_info), : cluster_name_(cluster_name), path_(path), io_service_(io_service), client_name_(client_name), file_info_(file_info),
bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()), event_handlers_(event_handlers) { bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()), event_handlers_(event_handlers), bytes_read_(0) {
LOG_TRACE(kFileHandle, << "FileHandleImpl::FileHandleImpl(" LOG_TRACE(kFileHandle, << "FileHandleImpl::FileHandleImpl("
<< FMT_THIS_ADDR << ", ...) called"); << FMT_THIS_ADDR << ", ...) called");
@ -68,6 +68,7 @@ void FileHandleImpl::PositionRead(
bad_node_tracker_->AddBadNode(contacted_datanode); bad_node_tracker_->AddBadNode(contacted_datanode);
} }
bytes_read_ += bytes_read;
handler(status, bytes_read); handler(status, bytes_read);
}; };
@ -352,4 +353,8 @@ bool FileHandle::ShouldExclude(const Status &s) {
} }
} }
uint64_t FileHandleImpl::get_bytes_read() { return bytes_read_; }
void FileHandleImpl::clear_bytes_read() { bytes_read_ = 0; }
} }

View File

@ -113,6 +113,12 @@ public:
**/ **/
std::shared_ptr<LibhdfsEvents> get_event_handlers(); std::shared_ptr<LibhdfsEvents> get_event_handlers();
/* how many bytes have been successfully read */
virtual uint64_t get_bytes_read() override;
/* resets the number of bytes read to zero */
virtual void clear_bytes_read() override;
protected: protected:
virtual std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options, virtual std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
std::shared_ptr<DataNodeConnection> dn, std::shared_ptr<DataNodeConnection> dn,
@ -133,6 +139,7 @@ private:
CancelHandle cancel_state_; CancelHandle cancel_state_;
ReaderGroup readers_; ReaderGroup readers_;
std::shared_ptr<LibhdfsEvents> event_handlers_; std::shared_ptr<LibhdfsEvents> event_handlers_;
uint64_t bytes_read_;
}; };
} }

View File

@ -243,14 +243,13 @@ void FileSystemImpl::Open(
<< FMT_THIS_ADDR << ", path=" << FMT_THIS_ADDR << ", path="
<< path << ") called"); << path << ") called");
nn_.GetBlockLocations(path, [this, path, handler](const Status &stat, std::shared_ptr<const struct FileInfo> file_info) { nn_.GetBlockLocations(path, 0, std::numeric_limits<int64_t>::max(), [this, path, handler](const Status &stat, std::shared_ptr<const struct FileInfo> file_info) {
if(!stat.ok()) { if(!stat.ok()) {
LOG_INFO(kFileSystem, << "FileSystemImpl::Open failed to get block locations. status=" << stat.ToString()); LOG_INFO(kFileSystem, << "FileSystemImpl::Open failed to get block locations. status=" << stat.ToString());
if(stat.get_server_exception_type() == Status::kStandbyException) { if(stat.get_server_exception_type() == Status::kStandbyException) {
LOG_INFO(kFileSystem, << "Operation not allowed on standby datanode"); LOG_INFO(kFileSystem, << "Operation not allowed on standby datanode");
} }
} }
handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, &io_service_->io_service(), client_name_, file_info, bad_node_tracker_, event_handlers_) handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, &io_service_->io_service(), client_name_, file_info, bad_node_tracker_, event_handlers_)
: nullptr); : nullptr);
}); });
@ -326,13 +325,24 @@ BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto
return result; return result;
} }
void FileSystemImpl::GetBlockLocations(const std::string & path, void FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> handler) const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> handler)
{ {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetBlockLocations(" LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetBlockLocations("
<< FMT_THIS_ADDR << ", path=" << FMT_THIS_ADDR << ", path="
<< path << ") called"); << path << ") called");
//Protobuf gives an error 'Negative value is not supported'
//if the high bit is set in uint64 in GetBlockLocations
if (IsHighBitSet(offset)) {
handler(Status::InvalidArgument("GetBlockLocations: argument 'offset' cannot have high bit set"), nullptr);
return;
}
if (IsHighBitSet(length)) {
handler(Status::InvalidArgument("GetBlockLocations: argument 'length' cannot have high bit set"), nullptr);
return;
}
auto conversion = [handler](const Status & status, std::shared_ptr<const struct FileInfo> fileInfo) { auto conversion = [handler](const Status & status, std::shared_ptr<const struct FileInfo> fileInfo) {
if (status.ok()) { if (status.ok()) {
auto result = std::make_shared<FileBlockLocation>(); auto result = std::make_shared<FileBlockLocation>();
@ -354,10 +364,10 @@ void FileSystemImpl::GetBlockLocations(const std::string & path,
} }
}; };
nn_.GetBlockLocations(path, conversion); nn_.GetBlockLocations(path, offset, length, conversion);
} }
Status FileSystemImpl::GetBlockLocations(const std::string & path, Status FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
std::shared_ptr<FileBlockLocation> * fileBlockLocations) std::shared_ptr<FileBlockLocation> * fileBlockLocations)
{ {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetBlockLocations(" LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetBlockLocations("
@ -375,7 +385,7 @@ Status FileSystemImpl::GetBlockLocations(const std::string & path,
callstate->set_value(std::make_tuple(s,blockInfo)); callstate->set_value(std::make_tuple(s,blockInfo));
}; };
GetBlockLocations(path, callback); GetBlockLocations(path, offset, length, callback);
/* wait for async to finish */ /* wait for async to finish */
auto returnstate = future.get(); auto returnstate = future.get();
@ -390,6 +400,119 @@ Status FileSystemImpl::GetBlockLocations(const std::string & path,
return stat; return stat;
} }
void FileSystemImpl::GetPreferredBlockSize(const std::string &path,
const std::function<void(const Status &, const uint64_t &)> &handler) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetPreferredBlockSize("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
nn_.GetPreferredBlockSize(path, handler);
}
Status FileSystemImpl::GetPreferredBlockSize(const std::string &path, uint64_t & block_size) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetPreferredBlockSize("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status, uint64_t>>>();
std::future<std::tuple<Status, uint64_t>> future(callstate->get_future());
/* wrap async FileSystem::GetPreferredBlockSize with promise to make it a blocking call */
auto h = [callstate](const Status &s, const uint64_t & bsize) {
callstate->set_value(std::make_tuple(s, bsize));
};
GetPreferredBlockSize(path, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = std::get<0>(returnstate);
uint64_t size = std::get<1>(returnstate);
if (!stat.ok()) {
return stat;
}
block_size = size;
return stat;
}
void FileSystemImpl::SetReplication(const std::string & path, int16_t replication, std::function<void(const Status &)> handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::SetReplication(" << FMT_THIS_ADDR << ", path=" << path <<
", replication=" << replication << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be empty"));
return;
}
Status replStatus = NameNodeOperations::CheckValidReplication(replication);
if (!replStatus.ok()) {
handler(replStatus);
return;
}
nn_.SetReplication(path, replication, handler);
}
Status FileSystemImpl::SetReplication(const std::string & path, int16_t replication) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]SetReplication(" << FMT_THIS_ADDR << ", path=" << path <<
", replication=" << replication << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::SetReplication with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
SetReplication(path, replication, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
void FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime,
std::function<void(const Status &)> handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::SetTimes(" << FMT_THIS_ADDR << ", path=" << path <<
", mtime=" << mtime << ", atime=" << atime << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("SetTimes: argument 'path' cannot be empty"));
return;
}
nn_.SetTimes(path, mtime, atime, handler);
}
Status FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]SetTimes(" << FMT_THIS_ADDR << ", path=" << path <<
", mtime=" << mtime << ", atime=" << atime << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::SetTimes with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
SetTimes(path, mtime, atime, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
void FileSystemImpl::GetFileInfo( void FileSystemImpl::GetFileInfo(
const std::string &path, const std::string &path,
const std::function<void(const Status &, const StatInfo &)> &handler) { const std::function<void(const Status &, const StatInfo &)> &handler) {
@ -543,7 +666,7 @@ Status FileSystemImpl::GetListing(const std::string &path, std::shared_ptr<std::
return stat; return stat;
} }
void FileSystemImpl::Mkdirs(const std::string & path, long permissions, bool createparent, void FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool createparent,
std::function<void(const Status &)> handler) { std::function<void(const Status &)> handler) {
LOG_DEBUG(kFileSystem, LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::Mkdirs(" << FMT_THIS_ADDR << ", path=" << path << << "FileSystemImpl::Mkdirs(" << FMT_THIS_ADDR << ", path=" << path <<
@ -554,10 +677,16 @@ void FileSystemImpl::Mkdirs(const std::string & path, long permissions, bool cre
return; return;
} }
Status permStatus = NameNodeOperations::CheckValidPermissionMask(permissions);
if (!permStatus.ok()) {
handler(permStatus);
return;
}
nn_.Mkdirs(path, permissions, createparent, handler); nn_.Mkdirs(path, permissions, createparent, handler);
} }
Status FileSystemImpl::Mkdirs(const std::string & path, long permissions, bool createparent) { Status FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool createparent) {
LOG_DEBUG(kFileSystem, LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]Mkdirs(" << FMT_THIS_ADDR << ", path=" << path << << "FileSystemImpl::[sync]Mkdirs(" << FMT_THIS_ADDR << ", path=" << path <<
", permissions=" << permissions << ", createparent=" << createparent << ") called"); ", permissions=" << permissions << ", createparent=" << createparent << ") called");
@ -653,7 +782,7 @@ Status FileSystemImpl::Rename(const std::string &oldPath, const std::string &new
} }
void FileSystemImpl::SetPermission(const std::string & path, void FileSystemImpl::SetPermission(const std::string & path,
short permissions, const std::function<void(const Status &)> &handler) { uint16_t permissions, const std::function<void(const Status &)> &handler) {
LOG_DEBUG(kFileSystem, LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called"); << "FileSystemImpl::SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called");
@ -670,7 +799,7 @@ void FileSystemImpl::SetPermission(const std::string & path,
nn_.SetPermission(path, permissions, handler); nn_.SetPermission(path, permissions, handler);
} }
Status FileSystemImpl::SetPermission(const std::string & path, short permissions) { Status FileSystemImpl::SetPermission(const std::string & path, uint16_t permissions) {
LOG_DEBUG(kFileSystem, LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called"); << "FileSystemImpl::[sync]SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called");
@ -896,4 +1025,8 @@ std::shared_ptr<LibhdfsEvents> FileSystemImpl::get_event_handlers() {
return event_handlers_; return event_handlers_;
} }
Options FileSystemImpl::get_options() {
return options_;
}
} }

View File

@ -65,6 +65,16 @@ public:
&handler) override; &handler) override;
Status Open(const std::string &path, FileHandle **handle) override; Status Open(const std::string &path, FileHandle **handle) override;
virtual void GetPreferredBlockSize(const std::string &path,
const std::function<void(const Status &, const uint64_t &)> &handler) override;
virtual Status GetPreferredBlockSize(const std::string &path, uint64_t & block_size) override;
virtual void SetReplication(const std::string & path, int16_t replication, std::function<void(const Status &)> handler) override;
virtual Status SetReplication(const std::string & path, int16_t replication) override;
void SetTimes(const std::string & path, uint64_t mtime, uint64_t atime, std::function<void(const Status &)> handler) override;
Status SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) override;
void GetFileInfo( void GetFileInfo(
const std::string &path, const std::string &path,
const std::function<void(const Status &, const StatInfo &)> &handler) override; const std::function<void(const Status &, const StatInfo &)> &handler) override;
@ -88,14 +98,14 @@ public:
Status GetListing(const std::string &path, std::shared_ptr<std::vector<StatInfo>> &stat_infos) override; Status GetListing(const std::string &path, std::shared_ptr<std::vector<StatInfo>> &stat_infos) override;
virtual void GetBlockLocations(const std::string & path, virtual void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> ) override; const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> ) override;
virtual Status GetBlockLocations(const std::string & path, virtual Status GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
std::shared_ptr<FileBlockLocation> * locations) override; std::shared_ptr<FileBlockLocation> * locations) override;
virtual void Mkdirs(const std::string & path, long permissions, bool createparent, virtual void Mkdirs(const std::string & path, uint16_t permissions, bool createparent,
std::function<void(const Status &)> handler) override; std::function<void(const Status &)> handler) override;
virtual Status Mkdirs(const std::string & path, long permissions, bool createparent) override; virtual Status Mkdirs(const std::string & path, uint16_t permissions, bool createparent) override;
virtual void Delete(const std::string &path, bool recursive, virtual void Delete(const std::string &path, bool recursive,
const std::function<void(const Status &)> &handler) override; const std::function<void(const Status &)> &handler) override;
@ -106,8 +116,8 @@ public:
virtual Status Rename(const std::string &oldPath, const std::string &newPath) override; virtual Status Rename(const std::string &oldPath, const std::string &newPath) override;
virtual void SetPermission(const std::string & path, virtual void SetPermission(const std::string & path,
short permissions, const std::function<void(const Status &)> &handler) override; uint16_t permissions, const std::function<void(const Status &)> &handler) override;
virtual Status SetPermission(const std::string & path, short permissions) override; virtual Status SetPermission(const std::string & path, uint16_t permissions) override;
virtual void SetOwner(const std::string & path, const std::string & username, virtual void SetOwner(const std::string & path, const std::string & username,
const std::string & groupname, const std::function<void(const Status &)> &handler) override; const std::string & groupname, const std::function<void(const Status &)> &handler) override;
@ -166,6 +176,8 @@ public:
/* all monitored events will need to lookup handlers */ /* all monitored events will need to lookup handlers */
std::shared_ptr<LibhdfsEvents> get_event_handlers(); std::shared_ptr<LibhdfsEvents> get_event_handlers();
Options get_options();
private: private:
const Options options_; const Options options_;
const std::string client_name_; const std::string client_name_;

View File

@ -39,14 +39,26 @@ namespace hdfs {
* NAMENODE OPERATIONS * NAMENODE OPERATIONS
****************************************************************************/ ****************************************************************************/
Status NameNodeOperations::CheckValidPermissionMask(short permissions) { uint16_t NameNodeOperations::GetDefaultPermissionMask() {
if (permissions < 0 || permissions > 01777) { return 0755;
}
Status NameNodeOperations::CheckValidPermissionMask(uint16_t permissions) {
if (permissions > 01777) {
std::stringstream errormsg; std::stringstream errormsg;
errormsg << "IsValidPermissionMask: argument 'permissions' is " << std::oct errormsg << "CheckValidPermissionMask: argument 'permissions' is " << std::oct
<< std::showbase << permissions << " (should be between 0 and 01777)"; << std::showbase << permissions << " (should be between 0 and 01777)";
//Avoid copying by binding errormsg.str() to a const reference, which extends its lifetime return Status::InvalidArgument(errormsg.str().c_str());
const std::string& tmp = errormsg.str(); }
return Status::InvalidArgument(tmp.c_str()); return Status::OK();
}
Status NameNodeOperations::CheckValidReplication(uint16_t replication) {
if (replication < 1 || replication > 512) {
std::stringstream errormsg;
errormsg << "CheckValidReplication: argument 'replication' is "
<< replication << " (should be between 1 and 512)";
return Status::InvalidArgument(errormsg.str().c_str());
} }
return Status::OK(); return Status::OK();
} }
@ -57,7 +69,7 @@ void NameNodeOperations::Connect(const std::string &cluster_name,
engine_.Connect(cluster_name, servers, handler); engine_.Connect(cluster_name, servers, handler);
} }
void NameNodeOperations::GetBlockLocations(const std::string & path, void NameNodeOperations::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler) std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler)
{ {
using ::hadoop::hdfs::GetBlockLocationsRequestProto; using ::hadoop::hdfs::GetBlockLocationsRequestProto;
@ -71,10 +83,21 @@ void NameNodeOperations::GetBlockLocations(const std::string & path,
return; return;
} }
//Protobuf gives an error 'Negative value is not supported'
//if the high bit is set in uint64 in GetBlockLocations
if (IsHighBitSet(offset)) {
handler(Status::InvalidArgument("GetBlockLocations: argument 'offset' cannot have high bit set"), nullptr);
return;
}
if (IsHighBitSet(length)) {
handler(Status::InvalidArgument("GetBlockLocations: argument 'length' cannot have high bit set"), nullptr);
return;
}
GetBlockLocationsRequestProto req; GetBlockLocationsRequestProto req;
req.set_src(path); req.set_src(path);
req.set_offset(0); req.set_offset(offset);
req.set_length(std::numeric_limits<long long>::max()); req.set_length(length);
auto resp = std::make_shared<GetBlockLocationsResponseProto>(); auto resp = std::make_shared<GetBlockLocationsResponseProto>();
@ -104,6 +127,106 @@ void NameNodeOperations::GetBlockLocations(const std::string & path,
}); });
} }
void NameNodeOperations::GetPreferredBlockSize(const std::string & path,
std::function<void(const Status &, const uint64_t)> handler)
{
using ::hadoop::hdfs::GetPreferredBlockSizeRequestProto;
using ::hadoop::hdfs::GetPreferredBlockSizeResponseProto;
LOG_TRACE(kFileSystem, << "NameNodeOperations::GetPreferredBlockSize("
<< FMT_THIS_ADDR << ", path=" << path << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("GetPreferredBlockSize: argument 'path' cannot be empty"), -1);
return;
}
GetPreferredBlockSizeRequestProto req;
req.set_filename(path);
auto resp = std::make_shared<GetPreferredBlockSizeResponseProto>();
namenode_.GetPreferredBlockSize(&req, resp, [resp, handler, path](const Status &stat) {
if (stat.ok() && resp -> has_bsize()) {
uint64_t block_size = resp -> bsize();
handler(stat, block_size);
} else {
handler(stat, -1);
}
});
}
void NameNodeOperations::SetReplication(const std::string & path, int16_t replication,
std::function<void(const Status &)> handler)
{
using ::hadoop::hdfs::SetReplicationRequestProto;
using ::hadoop::hdfs::SetReplicationResponseProto;
LOG_TRACE(kFileSystem,
<< "NameNodeOperations::SetReplication(" << FMT_THIS_ADDR << ", path=" << path <<
", replication=" << replication << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be empty"));
return;
}
Status replStatus = CheckValidReplication(replication);
if (!replStatus.ok()) {
handler(replStatus);
return;
}
SetReplicationRequestProto req;
req.set_src(path);
req.set_replication(replication);
auto resp = std::make_shared<SetReplicationResponseProto>();
namenode_.SetReplication(&req, resp, [resp, handler, path](const Status &stat) {
if (stat.ok()) {
// Checking resp
if(resp -> has_result() && resp ->result() == 1) {
handler(stat);
} else {
//NameNode does not specify why there is no result, in my testing it was happening when the path is not found
std::string errormsg = "No such file or directory: " + path;
Status statNew = Status::PathNotFound(errormsg.c_str());
handler(statNew);
}
} else {
handler(stat);
}
});
}
void NameNodeOperations::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime,
std::function<void(const Status &)> handler)
{
using ::hadoop::hdfs::SetTimesRequestProto;
using ::hadoop::hdfs::SetTimesResponseProto;
LOG_TRACE(kFileSystem,
<< "NameNodeOperations::SetTimes(" << FMT_THIS_ADDR << ", path=" << path <<
", mtime=" << mtime << ", atime=" << atime << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("SetTimes: argument 'path' cannot be empty"));
return;
}
SetTimesRequestProto req;
req.set_src(path);
req.set_mtime(mtime);
req.set_atime(atime);
auto resp = std::make_shared<SetTimesResponseProto>();
namenode_.SetTimes(&req, resp, [resp, handler, path](const Status &stat) {
handler(stat);
});
}
void NameNodeOperations::GetFileInfo(const std::string & path, void NameNodeOperations::GetFileInfo(const std::string & path,
std::function<void(const Status &, const StatInfo &)> handler) std::function<void(const Status &, const StatInfo &)> handler)
{ {
@ -216,7 +339,7 @@ void NameNodeOperations::GetListing(
}); });
} }
void NameNodeOperations::Mkdirs(const std::string & path, long permissions, bool createparent, void NameNodeOperations::Mkdirs(const std::string & path, uint16_t permissions, bool createparent,
std::function<void(const Status &)> handler) std::function<void(const Status &)> handler)
{ {
using ::hadoop::hdfs::MkdirsRequestProto; using ::hadoop::hdfs::MkdirsRequestProto;
@ -232,13 +355,14 @@ void NameNodeOperations::Mkdirs(const std::string & path, long permissions, bool
} }
MkdirsRequestProto req; MkdirsRequestProto req;
Status permStatus = CheckValidPermissionMask(permissions);
if (!permStatus.ok()) {
handler(permStatus);
return;
}
req.set_src(path); req.set_src(path);
hadoop::hdfs::FsPermissionProto *perm = req.mutable_masked(); hadoop::hdfs::FsPermissionProto *perm = req.mutable_masked();
if (permissions < 0) { perm->set_perm(permissions);
perm->set_perm(0755);
} else {
perm->set_perm(permissions);
}
req.set_createparent(createparent); req.set_createparent(createparent);
auto resp = std::make_shared<MkdirsResponseProto>(); auto resp = std::make_shared<MkdirsResponseProto>();
@ -336,7 +460,7 @@ void NameNodeOperations::Rename(const std::string & oldPath, const std::string &
} }
void NameNodeOperations::SetPermission(const std::string & path, void NameNodeOperations::SetPermission(const std::string & path,
short permissions, std::function<void(const Status &)> handler) { uint16_t permissions, std::function<void(const Status &)> handler) {
using ::hadoop::hdfs::SetPermissionRequestProto; using ::hadoop::hdfs::SetPermissionRequestProto;
using ::hadoop::hdfs::SetPermissionResponseProto; using ::hadoop::hdfs::SetPermissionResponseProto;

View File

@ -48,15 +48,28 @@ public:
engine_(io_service, options, client_name, user_name, protocol_name, protocol_version), engine_(io_service, options, client_name, user_name, protocol_name, protocol_version),
namenode_(& engine_), options_(options) {} namenode_(& engine_), options_(options) {}
static Status CheckValidPermissionMask(short permissions); static uint16_t GetDefaultPermissionMask();
static Status CheckValidPermissionMask(uint16_t permissions);
static Status CheckValidReplication(uint16_t replication);
void Connect(const std::string &cluster_name, void Connect(const std::string &cluster_name,
const std::vector<ResolvedNamenodeInfo> &servers, const std::vector<ResolvedNamenodeInfo> &servers,
std::function<void(const Status &)> &&handler); std::function<void(const Status &)> &&handler);
void GetBlockLocations(const std::string & path, void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler); std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler);
void GetPreferredBlockSize(const std::string & path,
std::function<void(const Status &, const uint64_t)> handler);
void SetReplication(const std::string & path, int16_t replication,
std::function<void(const Status &)> handler);
void SetTimes(const std::string & path, uint64_t mtime, uint64_t atime,
std::function<void(const Status &)> handler);
void GetFileInfo(const std::string & path, void GetFileInfo(const std::string & path,
std::function<void(const Status &, const StatInfo &)> handler); std::function<void(const Status &, const StatInfo &)> handler);
@ -67,7 +80,7 @@ public:
std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> handler, std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> handler,
const std::string & start_after = ""); const std::string & start_after = "");
void Mkdirs(const std::string & path, long permissions, bool createparent, void Mkdirs(const std::string & path, uint16_t permissions, bool createparent,
std::function<void(const Status &)> handler); std::function<void(const Status &)> handler);
void Delete(const std::string & path, bool recursive, void Delete(const std::string & path, bool recursive,
@ -76,7 +89,7 @@ public:
void Rename(const std::string & oldPath, const std::string & newPath, void Rename(const std::string & oldPath, const std::string & newPath,
std::function<void(const Status &)> handler); std::function<void(const Status &)> handler);
void SetPermission(const std::string & path, short permissions, void SetPermission(const std::string & path, uint16_t permissions,
std::function<void(const Status &)> handler); std::function<void(const Status &)> handler);
void SetOwner(const std::string & path, const std::string & username, void SetOwner(const std::string & path, const std::string & username,

View File

@ -39,8 +39,8 @@ TEST_F(HdfsExtTest, TestGetBlockLocations) {
EXPECT_EQ(0, result); EXPECT_EQ(0, result);
// Test non-extant files // Test non-extant files
result = hdfsGetBlockLocations(connection, "non_extant_file", &blocks); EXPECT_EQ(-1, hdfsGetBlockLocations(connection, "non_extant_file", &blocks)); // Should be an error
EXPECT_NE(0, result); // Should be an error EXPECT_EQ((int) std::errc::no_such_file_or_directory, errno);
// Test an extant file // Test an extant file
std::string filename = connection.newFile(1024); std::string filename = connection.newFile(1024);
@ -296,7 +296,6 @@ TEST_F(HdfsExtTest, TestEOF) {
HdfsHandle connection = cluster.connect_c(); HdfsHandle connection = cluster.connect_c();
hdfsFS fs = connection.handle(); hdfsFS fs = connection.handle();
EXPECT_NE(nullptr, fs); EXPECT_NE(nullptr, fs);
//Write to a file //Write to a file
errno = 0; errno = 0;
int size = 256; int size = 256;
@ -308,28 +307,284 @@ TEST_F(HdfsExtTest, TestEOF) {
EXPECT_EQ(size, hdfsWrite(fs, file, buf, size)); EXPECT_EQ(size, hdfsWrite(fs, file, buf, size));
free(buf); free(buf);
EXPECT_EQ(0, hdfsCloseFile(fs, file)); EXPECT_EQ(0, hdfsCloseFile(fs, file));
EXPECT_EQ(0, errno); //libhdfs file operations work, but sometimes sets errno ENOENT : 2
//Test normal reading (no EOF) //Test normal reading (no EOF)
char buffer[300]; char buffer[300];
EXPECT_EQ(0, errno);
file = hdfsOpenFile(fs, path.c_str(), O_RDONLY, 0, 0, 0); file = hdfsOpenFile(fs, path.c_str(), O_RDONLY, 0, 0, 0);
EXPECT_EQ(size, hdfsPread(fs, file, 0, buffer, sizeof(buffer))); EXPECT_EQ(size, hdfsPread(fs, file, 0, buffer, sizeof(buffer)));
//Read executes correctly, but causes a warning (captured in HDFS-10595) //Read executes correctly, but causes a warning (captured in HDFS-10595)
//and sets errno to EINPROGRESS 115 : Operation now in progress //and sets errno to EINPROGRESS 115 : Operation now in progress
errno = 0;
//Test reading at offset past the EOF //Test reading at offset past the EOF
EXPECT_EQ(-1, hdfsPread(fs, file, sizeof(buffer), buffer, sizeof(buffer))); EXPECT_EQ(-1, hdfsPread(fs, file, sizeof(buffer), buffer, sizeof(buffer)));
EXPECT_EQ(Status::kInvalidOffset, errno); EXPECT_EQ(Status::kInvalidOffset, errno);
EXPECT_EQ(0, hdfsCloseFile(fs, file));
}
//Testing hdfsExists
TEST_F(HdfsExtTest, TestExists) {
HdfsHandle connection = cluster.connect_c();
hdfsFS fs = connection.handle();
EXPECT_NE(nullptr, fs);
//Path not found
EXPECT_EQ(-1, hdfsExists(fs, "/wrong/dir/"));
EXPECT_EQ((int ) std::errc::no_such_file_or_directory, errno);
//Correct operation
std::string pathDir = "/testExistsDir";
EXPECT_EQ(0, hdfsCreateDirectory(fs, pathDir.c_str()));
EXPECT_EQ(0, hdfsExists(fs, pathDir.c_str()));
std::string pathFile = connection.newFile(pathDir.c_str(), 1024);
EXPECT_EQ(0, hdfsExists(fs, pathFile.c_str()));
//Permission denied
EXPECT_EQ(0, hdfsChmod(fs, pathDir.c_str(), 0700));
HdfsHandle connection2 = cluster.connect_c("OtherGuy");
hdfsFS fs2 = connection2.handle();
EXPECT_EQ(-1, hdfsExists(fs2, pathFile.c_str()));
EXPECT_EQ((int ) std::errc::permission_denied, errno);
}
//Testing Replication and Time modifications
TEST_F(HdfsExtTest, TestReplAndTime) {
HdfsHandle connection = cluster.connect_c();
hdfsFS fs = connection.handle();
EXPECT_NE(nullptr, fs);
std::string path = "/wrong/dir/";
//Path not found
EXPECT_EQ(-1, hdfsSetReplication(fs, path.c_str(), 3));
EXPECT_EQ((int ) std::errc::no_such_file_or_directory, errno);
EXPECT_EQ(-1, hdfsUtime(fs, path.c_str(), 1000000, 1000000));
EXPECT_EQ((int ) std::errc::no_such_file_or_directory, errno);
//Correct operation
path = connection.newFile(1024);
EXPECT_EQ(0, hdfsSetReplication(fs, path.c_str(), 7));
EXPECT_EQ(0, hdfsUtime(fs, path.c_str(), 123456789, 987654321));
hdfsFileInfo *file_info;
EXPECT_NE(nullptr, file_info = hdfsGetPathInfo(fs, path.c_str()));
EXPECT_EQ(7, file_info->mReplication);
EXPECT_EQ(123456789, file_info->mLastMod);
EXPECT_EQ(987654321, file_info->mLastAccess);
hdfsFreeFileInfo(file_info, 1);
//Wrong arguments
EXPECT_EQ(-1, hdfsSetReplication(fs, path.c_str(), 0));
EXPECT_EQ((int ) std::errc::invalid_argument, errno);
EXPECT_EQ(-1, hdfsSetReplication(fs, path.c_str(), 513));
EXPECT_EQ((int ) std::errc::invalid_argument, errno);
//Permission denied
EXPECT_EQ(0, hdfsChmod(fs, path.c_str(), 0700));
HdfsHandle connection2 = cluster.connect_c("OtherGuy");
hdfsFS fs2 = connection2.handle();
EXPECT_EQ(-1, hdfsSetReplication(fs2, path.c_str(), 3));
EXPECT_EQ((int ) std::errc::permission_denied, errno);
EXPECT_EQ(-1, hdfsUtime(fs2, path.c_str(), 111111111, 222222222));
EXPECT_EQ((int ) std::errc::permission_denied, errno);
}
//Testing getting default block size at path
TEST_F(HdfsExtTest, TestDefaultBlockSize) {
HdfsHandle connection = cluster.connect_c();
hdfsFS fs = connection.handle();
EXPECT_NE(nullptr, fs);
//Correct operation (existing path)
std::string path = connection.newFile(1024);
long block_size = hdfsGetDefaultBlockSizeAtPath(fs, path.c_str());
EXPECT_GT(block_size, 0);
hdfsFileInfo *file_info;
EXPECT_NE(nullptr, file_info = hdfsGetPathInfo(fs, path.c_str()));
EXPECT_EQ(block_size, file_info->mBlockSize);
hdfsFreeFileInfo(file_info, 1);
//Non-existing path
path = "/wrong/dir/";
EXPECT_GT(hdfsGetDefaultBlockSizeAtPath(fs, path.c_str()), 0);
//No path specified
EXPECT_GT(hdfsGetDefaultBlockSize(fs), 0);
}
//Testing getting hosts
TEST_F(HdfsExtTest, TestHosts) {
HdfsHandle connection = cluster.connect_c();
hdfsFS fs = connection.handle();
EXPECT_NE(nullptr, fs);
char *** hosts = nullptr;
// Free a null pointer
hdfsFreeHosts(hosts);
EXPECT_EQ(0, errno);
// Test non-existent files
EXPECT_EQ(nullptr, hdfsGetHosts(fs, "/wrong/file/", 0, std::numeric_limits<int64_t>::max()));
EXPECT_EQ((int ) std::errc::no_such_file_or_directory, errno);
// Test an existent file
std::string filename = connection.newFile(1024);
EXPECT_NE(nullptr, hosts = hdfsGetHosts(fs, filename.c_str(), 0, std::numeric_limits<int64_t>::max()));
//Make sure there is at least one host
EXPECT_NE(nullptr, *hosts);
EXPECT_NE(nullptr, **hosts);
hdfsFreeHosts(hosts);
EXPECT_EQ(0, errno);
//Test invalid arguments
EXPECT_EQ(nullptr, hdfsGetHosts(fs, filename.c_str(), 0, std::numeric_limits<int64_t>::max()+1));
EXPECT_EQ((int) std::errc::invalid_argument, errno);
//Test invalid arguments
EXPECT_EQ(nullptr, hdfsGetHosts(fs, filename.c_str(), std::numeric_limits<int64_t>::max()+1, std::numeric_limits<int64_t>::max()));
EXPECT_EQ((int) std::errc::invalid_argument, errno);
}
//Testing read statistics
TEST_F(HdfsExtTest, TestReadStats) {
HdfsHandle connection = cluster.connect_c();
hdfsFS fs = connection.handle();
EXPECT_NE(nullptr, fs);
struct hdfsReadStatistics *stats;
//Write to a file
int size = 256;
std::string path = "/readStatTest";
hdfsFile file = hdfsOpenFile(fs, path.c_str(), O_WRONLY, 0, 0, 0);
EXPECT_NE(nullptr, file);
void * buf = malloc(size);
bzero(buf, size);
EXPECT_EQ(size, hdfsWrite(fs, file, buf, size));
free(buf);
EXPECT_EQ(0, hdfsCloseFile(fs, file));
//test before reading
file = hdfsOpenFile(fs, path.c_str(), O_RDONLY, 0, 0, 0);
EXPECT_EQ(0, hdfsFileGetReadStatistics(file, &stats));
EXPECT_EQ(0, stats->totalBytesRead);
hdfsFileFreeReadStatistics(stats);
//test after reading
char buffer[123];
//Read executes correctly, but causes a warning (captured in HDFS-10595)
EXPECT_EQ(sizeof(buffer), hdfsRead(fs, file, buffer, sizeof(buffer)));
EXPECT_EQ(0, hdfsFileGetReadStatistics(file, &stats));
EXPECT_EQ(sizeof(buffer), stats->totalBytesRead);
EXPECT_EQ(sizeof(buffer), stats->totalLocalBytesRead);
EXPECT_EQ(0, hdfsReadStatisticsGetRemoteBytesRead(stats));
hdfsFileFreeReadStatistics(stats);
//test after clearing
EXPECT_EQ(0, hdfsFileClearReadStatistics(file));
EXPECT_EQ(0, hdfsFileGetReadStatistics(file, &stats));
EXPECT_EQ(0, stats->totalBytesRead);
hdfsFileFreeReadStatistics(stats);
EXPECT_EQ(0, hdfsCloseFile(fs, file)); EXPECT_EQ(0, hdfsCloseFile(fs, file));
EXPECT_EQ(0, errno); EXPECT_EQ(0, errno);
} }
//Testing working directory
TEST_F(HdfsExtTest, TestWorkingDirectory) {
HdfsHandle connection = cluster.connect_c();
hdfsFS fs = connection.handle();
EXPECT_NE(nullptr, fs);
//Correct operation of setter and getter
std::string pathDir = "/testWorkDir/";
EXPECT_EQ(0, hdfsCreateDirectory(fs, pathDir.c_str()));
std::string pathFile = connection.newFile(pathDir.c_str(), 1024);
EXPECT_EQ(0, hdfsSetWorkingDirectory(fs, pathDir.c_str()));
char array[100];
EXPECT_STREQ(pathDir.c_str(), hdfsGetWorkingDirectory(fs, array, 100));
//Get relative path
std::size_t slashPos = pathFile.find_last_of("/");
std::string fileName = pathFile.substr(slashPos + 1);
//Testing various functions with relative path:
//hdfsGetDefaultBlockSizeAtPath
EXPECT_GT(hdfsGetDefaultBlockSizeAtPath(fs, fileName.c_str()), 0);
//hdfsSetReplication
EXPECT_EQ(0, hdfsSetReplication(fs, fileName.c_str(), 7));
//hdfsUtime
EXPECT_EQ(0, hdfsUtime(fs, fileName.c_str(), 123456789, 987654321));
//hdfsExists
EXPECT_EQ(0, hdfsExists(fs, fileName.c_str()));
//hdfsGetPathInfo
hdfsFileInfo *file_info;
EXPECT_NE(nullptr, file_info = hdfsGetPathInfo(fs, fileName.c_str()));
hdfsFreeFileInfo(file_info, 1);
//hdfsOpenFile
hdfsFile file;
file = hdfsOpenFile(fs, fileName.c_str(), O_RDONLY, 0, 0, 0);
EXPECT_EQ(0, hdfsCloseFile(fs, file));
//hdfsCreateDirectory
EXPECT_EQ(0, hdfsCreateDirectory(fs, "newDir"));
//add another file
std::string fileName2 = connection.newFile(pathDir + "/newDir", 1024);
//hdfsListDirectory
int numEntries;
hdfsFileInfo * dirList;
EXPECT_NE(nullptr, dirList = hdfsListDirectory(fs, "newDir", &numEntries));
EXPECT_EQ(1, numEntries);
hdfsFreeFileInfo(dirList, 1);
//hdfsChmod
EXPECT_EQ(0, hdfsChmod(fs, fileName.c_str(), 0777));
//hdfsChown
EXPECT_EQ(0, hdfsChown(fs, fileName.c_str(), "cool", "nice"));
//hdfsDisallowSnapshot
EXPECT_EQ(0, hdfsDisallowSnapshot(fs, "newDir"));
//hdfsAllowSnapshot
EXPECT_EQ(0, hdfsAllowSnapshot(fs, "newDir"));
//hdfsCreateSnapshot
EXPECT_EQ(0, hdfsCreateSnapshot(fs, "newDir", "Some"));
//hdfsDeleteSnapshot
EXPECT_EQ(0, hdfsDeleteSnapshot(fs, "newDir", "Some"));
//hdfsGetBlockLocations
hdfsBlockLocations * blocks = nullptr;
EXPECT_EQ(0, hdfsGetBlockLocations(connection, fileName.c_str(), &blocks));
hdfsFreeBlockLocations(blocks);
//hdfsGetHosts
char *** hosts;
EXPECT_NE(nullptr, hosts = hdfsGetHosts(fs, fileName.c_str(), 0, std::numeric_limits<int64_t>::max()));
hdfsFreeHosts(hosts);
//hdfsRename
EXPECT_EQ(0, hdfsRename(fs, fileName.c_str(), "new_file_name"));
//hdfsDelete
EXPECT_EQ(0, hdfsDelete(fs, "new_file_name", 0));
} }
}
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
// The following line must be executed to initialize Google Mock // The following line must be executed to initialize Google Mock

View File

@ -50,25 +50,25 @@ int hdfsFileIsOpenForWrite(hdfsFile file) {
return libhdfs_hdfsFileIsOpenForWrite(file->libhdfsRep); return libhdfs_hdfsFileIsOpenForWrite(file->libhdfsRep);
} }
int hdfsFileGetReadStatistics(hdfsFile file, int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics **stats) {
struct hdfsReadStatistics **stats) { //We do not track which bytes were remote or local, so we assume all are local
return libhdfs_hdfsFileGetReadStatistics int ret = libhdfspp_hdfsFileGetReadStatistics(file->libhdfsppRep, (struct libhdfspp_hdfsReadStatistics **)stats);
(file->libhdfsRep, (struct libhdfs_hdfsReadStatistics **)stats); if(!ret) {
(*stats)->totalLocalBytesRead = (*stats)->totalBytesRead;
}
return ret;
} }
int64_t hdfsReadStatisticsGetRemoteBytesRead( int64_t hdfsReadStatisticsGetRemoteBytesRead(const struct hdfsReadStatistics *stats) {
const struct hdfsReadStatistics *stats) { return libhdfspp_hdfsReadStatisticsGetRemoteBytesRead((struct libhdfspp_hdfsReadStatistics *)stats);
return libhdfs_hdfsReadStatisticsGetRemoteBytesRead
((struct libhdfs_hdfsReadStatistics *)stats);
} }
int hdfsFileClearReadStatistics(hdfsFile file) { int hdfsFileClearReadStatistics(hdfsFile file) {
return libhdfs_hdfsFileClearReadStatistics(file->libhdfsRep); return libhdfspp_hdfsFileClearReadStatistics(file->libhdfsppRep);
} }
void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) { void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) {
libhdfs_hdfsFileFreeReadStatistics( libhdfspp_hdfsFileFreeReadStatistics((struct libhdfspp_hdfsReadStatistics *)stats);
(struct libhdfs_hdfsReadStatistics *)stats);
} }
hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) { hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) {
@ -208,15 +208,15 @@ int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
} }
int hdfsConfGetStr(const char *key, char **val) { int hdfsConfGetStr(const char *key, char **val) {
return libhdfs_hdfsConfGetStr(key, val); return libhdfspp_hdfsConfGetStr(key, val);
} }
int hdfsConfGetInt(const char *key, int32_t *val) { int hdfsConfGetInt(const char *key, int32_t *val) {
return libhdfs_hdfsConfGetInt(key, val); return libhdfspp_hdfsConfGetInt(key, val);
} }
void hdfsConfStrFree(char *val) { void hdfsConfStrFree(char *val) {
libhdfs_hdfsConfStrFree(val); libhdfspp_hdfsConfStrFree(val);
} }
int hdfsDisconnect(hdfsFS fs) { int hdfsDisconnect(hdfsFS fs) {
@ -269,15 +269,30 @@ int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
} }
int hdfsExists(hdfsFS fs, const char *path) { int hdfsExists(hdfsFS fs, const char *path) {
return libhdfs_hdfsExists(fs->libhdfsRep, path); return libhdfspp_hdfsExists(fs->libhdfsppRep, path);
} }
int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
return libhdfs_hdfsSeek(fs->libhdfsRep, file->libhdfsRep, desiredPos); int ret1 = libhdfs_hdfsSeek(fs->libhdfsRep, file->libhdfsRep, desiredPos);
int ret2 = libhdfspp_hdfsSeek(fs->libhdfsppRep, file->libhdfsppRep, desiredPos);
if (ret1) {
return ret1;
} else if (ret2) {
return ret2;
} else {
return 0;
}
} }
tOffset hdfsTell(hdfsFS fs, hdfsFile file) { tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
return libhdfs_hdfsTell(fs->libhdfsRep, file->libhdfsRep); tOffset ret1 = libhdfs_hdfsTell(fs->libhdfsRep, file->libhdfsRep);
tOffset ret2 = libhdfspp_hdfsTell(fs->libhdfsppRep, file->libhdfsppRep);
if (ret1 != ret2) {
errno = EIO;
return -1;
} else {
return ret1;
}
} }
tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length) { tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length) {
@ -320,7 +335,7 @@ int hdfsHSync(hdfsFS fs, hdfsFile file) {
} }
int hdfsAvailable(hdfsFS fs, hdfsFile file) { int hdfsAvailable(hdfsFS fs, hdfsFile file) {
return libhdfs_hdfsAvailable(fs->libhdfsRep, file->libhdfsRep); return libhdfspp_hdfsAvailable(fs->libhdfsppRep, file->libhdfsppRep);
} }
int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) { int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) {
@ -340,11 +355,19 @@ int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) {
} }
char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) { char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) {
return libhdfs_hdfsGetWorkingDirectory(fs->libhdfsRep, buffer, bufferSize); return libhdfspp_hdfsGetWorkingDirectory(fs->libhdfsppRep, buffer, bufferSize);
} }
int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) { int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) {
return libhdfs_hdfsSetWorkingDirectory(fs->libhdfsRep, path); int ret1 = libhdfspp_hdfsSetWorkingDirectory(fs->libhdfsppRep, path);
int ret2 = libhdfs_hdfsSetWorkingDirectory(fs->libhdfsRep, path);
if (ret1) {
return ret1;
} else if (ret2) {
return ret2;
} else {
return 0;
}
} }
int hdfsCreateDirectory(hdfsFS fs, const char* path) { int hdfsCreateDirectory(hdfsFS fs, const char* path) {
@ -352,7 +375,7 @@ int hdfsCreateDirectory(hdfsFS fs, const char* path) {
} }
int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) { int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) {
return libhdfs_hdfsSetReplication(fs->libhdfsRep, path, replication); return libhdfspp_hdfsSetReplication(fs->libhdfsppRep, path, replication);
} }
hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path,
@ -376,19 +399,19 @@ int hdfsFileIsEncrypted(hdfsFileInfo *hdfsFileInfo) {
char*** hdfsGetHosts(hdfsFS fs, const char* path, char*** hdfsGetHosts(hdfsFS fs, const char* path,
tOffset start, tOffset length) { tOffset start, tOffset length) {
return libhdfs_hdfsGetHosts(fs->libhdfsRep, path, start, length); return libhdfspp_hdfsGetHosts(fs->libhdfsppRep, path, start, length);
} }
void hdfsFreeHosts(char ***blockHosts) { void hdfsFreeHosts(char ***blockHosts) {
return libhdfs_hdfsFreeHosts(blockHosts); return libhdfspp_hdfsFreeHosts(blockHosts);
} }
tOffset hdfsGetDefaultBlockSize(hdfsFS fs) { tOffset hdfsGetDefaultBlockSize(hdfsFS fs) {
return libhdfs_hdfsGetDefaultBlockSize(fs->libhdfsRep); return libhdfspp_hdfsGetDefaultBlockSize(fs->libhdfsppRep);
} }
tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path) { tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path) {
return libhdfs_hdfsGetDefaultBlockSizeAtPath(fs->libhdfsRep, path); return libhdfspp_hdfsGetDefaultBlockSizeAtPath(fs->libhdfsppRep, path);
} }
tOffset hdfsGetCapacity(hdfsFS fs) { tOffset hdfsGetCapacity(hdfsFS fs) {
@ -409,7 +432,7 @@ int hdfsChmod(hdfsFS fs, const char* path, short mode) {
} }
int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) {
return libhdfs_hdfsUtime(fs->libhdfsRep, path, mtime, atime); return libhdfspp_hdfsUtime(fs->libhdfsppRep, path, mtime, atime);
} }
struct hadoopRzOptions *hadoopRzOptionsAlloc(void) { struct hadoopRzOptions *hadoopRzOptionsAlloc(void) {