HDFS-10494: libhdfs++: Implement snapshot operations and GetFsStats. Contributed by Anatoli Shein.

This commit is contained in:
Bob Hansen 2016-06-10 10:56:52 -04:00 committed by James Clampffer
parent d64bbf983e
commit 9ac66ccaa3
17 changed files with 897 additions and 58 deletions

View File

@ -197,7 +197,7 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs,
}
if (ret != expected) {
fprintf(stderr, "hdfsWrite was supposed to write %d bytes, but "
"it wrote %d\n", ret, expected);
"it wrote %d\n", expected, ret);
return EIO;
}
EXPECT_ZERO(hdfsFlush(fs, file));

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef HDFSPP_FSINFO_H_
#define HDFSPP_FSINFO_H_
namespace hdfs {
/**
* Information that is assumed to be unchanging about a file system for the duration of
* the operations.
*/
struct FsInfo {
unsigned long int capacity;
unsigned long int used;
unsigned long int remaining;
unsigned long int under_replicated;
unsigned long int corrupt_blocks;
unsigned long int missing_blocks;
unsigned long int missing_repl_one_blocks;
unsigned long int blocks_in_future;
FsInfo()
: capacity(0),
used(0),
remaining(0),
under_replicated(0),
corrupt_blocks(0),
missing_blocks(0),
missing_repl_one_blocks(0),
blocks_in_future(0) {
}
};
}
#endif

View File

@ -275,6 +275,49 @@ LIBHDFS_EXTERNAL
int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t cookie);
/*****************************************************************************
* HDFS SNAPSHOT FUNCTIONS
****************************************************************************/
/**
* Creates a snapshot of a snapshottable directory specified by path
*
* @param fs The filesystem (required)
* @param path Path to the directory to be snapshotted (must be non-blank)
* @param name Name to be given to the created snapshot (may be NULL)
* @return 0 on success, corresponding errno on failure
**/
int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name);
/**
* Deletes the directory snapshot specified by path and name
*
* @param fs The filesystem (required)
* @param path Path to the snapshotted directory (must be non-blank)
* @param name Name of the snapshot to be deleted (must be non-blank)
* @return 0 on success, corresponding errno on failure
**/
int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const char* name);
/**
* Allows snapshots to be made on the specified directory
*
* @param fs The filesystem (required)
* @param path Path to the directory to be made snapshottable (must be non-blank)
* @return 0 on success, corresponding errno on failure
**/
int hdfsAllowSnapshot(hdfsFS fs, const char* path);
/**
* Disallows snapshots to be made on the specified directory
*
* @param fs The filesystem (required)
* @param path Path to the directory to be made non-snapshottable (must be non-blank)
* @return 0 on success, corresponding errno on failure
**/
int hdfsDisallowSnapshot(hdfsFS fs, const char* path);
#ifdef __cplusplus
} /* end extern "C" */
#endif

View File

@ -23,7 +23,7 @@
#include "hdfspp/events.h"
#include "hdfspp/block_location.h"
#include "hdfspp/statinfo.h"
#include "hdfspp/fsinfo.h"
#include <functional>
#include <memory>
@ -179,6 +179,16 @@ class FileSystem {
const std::function<void(const Status &, const StatInfo &)> &handler) = 0;
virtual Status GetFileInfo(const std::string &path, StatInfo & stat_info) = 0;
/**
* Retrieves the file system information as a whole, such as the total raw size of all files in the filesystem
* and the raw capacity of the filesystem
*
* @param FsInfo struct to be populated by GetFsStats
**/
virtual void GetFsStats(
const std::function<void(const Status &, const FsInfo &)> &handler) = 0;
virtual Status GetFsStats(FsInfo & fs_info) = 0;
/**
* Retrieves the files contained in a directory and returns the metadata
* for each of them.
@ -207,6 +217,50 @@ class FileSystem {
virtual Status GetBlockLocations(const std::string & path,
std::shared_ptr<FileBlockLocation> * locations) = 0;
/*****************************************************************************
* FILE SYSTEM SNAPSHOT FUNCTIONS
****************************************************************************/
/**
* Creates a snapshot of a snapshottable directory specified by path
*
* @param path Path to the directory to be snapshotted (must be non-empty)
* @param name Name to be given to the created snapshot (may be empty)
**/
virtual void CreateSnapshot(const std::string &path, const std::string &name,
const std::function<void(const Status &)> &handler) = 0;
virtual Status CreateSnapshot(const std::string &path,
const std::string &name) = 0;
/**
* Deletes the directory snapshot specified by path and name
*
* @param path Path to the snapshotted directory (must be non-empty)
* @param name Name of the snapshot to be deleted (must be non-empty)
**/
virtual void DeleteSnapshot(const std::string &path, const std::string &name,
const std::function<void(const Status &)> &handler) = 0;
virtual Status DeleteSnapshot(const std::string &path,
const std::string &name) = 0;
/**
* Allows snapshots to be made on the specified directory
*
* @param path Path to the directory to be made snapshottable (must be non-empty)
**/
virtual void AllowSnapshot(const std::string &path,
const std::function<void(const Status &)> &handler) = 0;
virtual Status AllowSnapshot(const std::string &path) = 0;
/**
* Disallows snapshots to be made on the specified directory
*
* @param path Path to the directory to be made non-snapshottable (must be non-empty)
**/
virtual void DisallowSnapshot(const std::string &path,
const std::function<void(const Status &)> &handler) = 0;
virtual Status DisallowSnapshot(const std::string &path) = 0;
/**
* Note that it is an error to destroy the filesystem from within a filesystem
* callback. It will lead to a deadlock and the termination of the process.

View File

@ -58,6 +58,7 @@ class Status {
kOperationCanceled = static_cast<unsigned>(std::errc::operation_canceled),
kPermissionDenied = static_cast<unsigned>(std::errc::permission_denied),
kPathNotFound = static_cast<unsigned>(std::errc::no_such_file_or_directory),
kNotADirectory = static_cast<unsigned>(std::errc::not_a_directory),
kException = 256,
kAuthenticationFailed = 257,
};

View File

@ -153,6 +153,10 @@ static int Error(const Status &stat) {
errnum = ENOENT;
default_message = "No such file or directory";
break;
case Status::Code::kNotADirectory:
errnum = ENOTDIR;
default_message = "Not a directory";
break;
default:
errnum = ENOSYS;
default_message = "Error: unrecognised code";
@ -323,6 +327,52 @@ int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
}
}
tOffset hdfsGetCapacity(hdfsFS fs) {
try {
errno = 0;
if (!CheckSystem(fs)) {
return -1;
}
hdfs::FsInfo fs_info;
Status stat = fs->get_impl()->GetFsStats(fs_info);
if (!stat.ok()) {
Error(stat);
return -1;
}
return fs_info.capacity;
} catch (const std::exception & e) {
ReportException(e);
return -1;
} catch (...) {
ReportCaughtNonException();
return -1;
}
}
tOffset hdfsGetUsed(hdfsFS fs) {
try {
errno = 0;
if (!CheckSystem(fs)) {
return -1;
}
hdfs::FsInfo fs_info;
Status stat = fs->get_impl()->GetFsStats(fs_info);
if (!stat.ok()) {
Error(stat);
return -1;
}
return fs_info.used;
} catch (const std::exception & e) {
ReportException(e);
return -1;
} catch (...) {
ReportCaughtNonException();
return -1;
}
}
void StatInfoToHdfsFileInfo(hdfsFileInfo * file_info,
const hdfs::StatInfo & stat_info) {
/* file or directory */
@ -365,10 +415,10 @@ void StatInfoToHdfsFileInfo(hdfsFileInfo * file_info,
file_info->mGroup = new char[stat_info.group.size() + 1];
strncpy(file_info->mGroup, stat_info.group.c_str(), stat_info.group.size() + 1);
/* the permissions associated with the file */
/* the permissions associated with the file encoded as an octal number (0777)*/
file_info->mPermissions = (short) stat_info.permissions;
/* the last access time for the file in seconds */
/* the last access time for the file in seconds since the epoch*/
file_info->mLastAccess = stat_info.access_time;
}
@ -444,6 +494,101 @@ void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
delete[] hdfsFileInfo;
}
int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) {
try {
errno = 0;
if (!CheckSystem(fs)) {
return -1;
}
if (!path) {
return Error(Status::InvalidArgument("Argument 'path' cannot be NULL"));
}
Status stat;
if(!name){
stat = fs->get_impl()->CreateSnapshot(path, "");
} else {
stat = fs->get_impl()->CreateSnapshot(path, name);
}
if (!stat.ok()) {
return Error(stat);
}
return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
}
int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const char* name) {
try {
errno = 0;
if (!CheckSystem(fs)) {
return -1;
}
if (!path) {
return Error(Status::InvalidArgument("Argument 'path' cannot be NULL"));
}
if (!name) {
return Error(Status::InvalidArgument("Argument 'name' cannot be NULL"));
}
Status stat;
stat = fs->get_impl()->DeleteSnapshot(path, name);
if (!stat.ok()) {
return Error(stat);
}
return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
}
int hdfsAllowSnapshot(hdfsFS fs, const char* path) {
try {
errno = 0;
if (!CheckSystem(fs)) {
return -1;
}
if (!path) {
return Error(Status::InvalidArgument("Argument 'path' cannot be NULL"));
}
Status stat;
stat = fs->get_impl()->AllowSnapshot(path);
if (!stat.ok()) {
return Error(stat);
}
return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
}
int hdfsDisallowSnapshot(hdfsFS fs, const char* path) {
try {
errno = 0;
if (!CheckSystem(fs)) {
return -1;
}
if (!path) {
return Error(Status::InvalidArgument("Argument 'path' cannot be NULL"));
}
Status stat;
stat = fs->get_impl()->DisallowSnapshot(path);
if (!stat.ok()) {
return Error(stat);
}
return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
}
tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
tSize length) {
try

View File

@ -25,47 +25,49 @@
namespace hdfs {
std::string Base64Encode(const std::string &src) {
static const char kDictionary[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789+/";
//encoded size is (sizeof(buf) + 2) / 3 * 4
static const std::string base64_chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789+/";
std::string ret;
int i = 0;
int j = 0;
unsigned char char_array_3[3];
unsigned char char_array_4[4];
unsigned const char *bytes_to_encode = reinterpret_cast<unsigned const char *>(&src[i]);
unsigned int in_len = src.size();
int encoded_size = (src.size() + 2) / 3 * 4;
std::string dst;
dst.reserve(encoded_size);
while (in_len--) {
char_array_3[i++] = *(bytes_to_encode++);
if (i == 3) {
char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
char_array_4[3] = char_array_3[2] & 0x3f;
size_t i = 0;
while (i + 3 < src.length()) {
const char *s = &src[i];
const int32_t r[4] = {s[0] >> 2, ((s[0] << 4) | (s[1] >> 4)) & 0x3f,
((s[1] << 2) | (s[2] >> 6)) & 0x3f, s[2] & 0x3f};
std::transform(r, r + sizeof(r) / sizeof(int32_t), std::back_inserter(dst),
[&r](unsigned char v) { return kDictionary[v]; });
i += 3;
for(i = 0; (i <4) ; i++)
ret += base64_chars[char_array_4[i]];
i = 0;
}
}
size_t remained = src.length() - i;
const char *s = &src[i];
if (i) {
for(j = i; j < 3; j++)
char_array_3[j] = '\0';
switch (remained) {
case 0:
break;
case 1: {
char padding[4] = {kDictionary[s[0] >> 2], kDictionary[(s[0] << 4) & 0x3f],
'=', '='};
dst.append(padding, sizeof(padding));
} break;
case 2: {
char padding[4] = {kDictionary[src[i] >> 2],
kDictionary[((s[0] << 4) | (s[1] >> 4)) & 0x3f],
kDictionary[(s[1] << 2) & 0x3f], '='};
dst.append(padding, sizeof(padding));
} break;
default:
assert("Unreachable");
break;
char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
char_array_4[3] = char_array_3[2] & 0x3f;
for (j = 0; (j < i + 1); j++)
ret += base64_chars[char_array_4[j]];
while((i++ < 3))
ret += '=';
}
return dst;
return ret;
}
}

View File

@ -28,6 +28,8 @@ const char * kStatusAccessControlException = "org.apache.hadoop.security.AccessC
const char * kStatusSaslException = "javax.security.sasl.SaslException";
const char * kPathNotFoundException = "org.apache.hadoop.fs.InvalidPathException";
const char * kPathNotFoundException2 = "java.io.FileNotFoundException";
const char * kPathIsNotDirectoryException = "org.apache.hadoop.fs.PathIsNotDirectoryException";
const char * kSnapshotException = "org.apache.hadoop.hdfs.protocol.SnapshotException";
Status::Status(int code, const char *msg1) : code_(code) {
if(msg1) {
@ -76,6 +78,10 @@ Status Status::Exception(const char *exception_class_name, const char *error_mes
return Status(kPathNotFound, error_message);
else if (exception_class_name && (strcmp(exception_class_name, kPathNotFoundException2) == 0))
return Status(kPathNotFound, error_message);
else if (exception_class_name && (strcmp(exception_class_name, kPathIsNotDirectoryException) == 0))
return Status(kNotADirectory, error_message);
else if (exception_class_name && (strcmp(exception_class_name, kSnapshotException) == 0))
return Status(kInvalidArgument, error_message);
else
return Status(kException, exception_class_name, error_message);
}

View File

@ -55,13 +55,24 @@ std::string SerializeDelimitedProtobufMessage(const ::google::protobuf::MessageL
std::string GetRandomClientName() {
unsigned char buf[6];
/**
* The server is requesting a 16-byte UUID:
* https://github.com/c9n/hadoop/blob/master/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java
*
* This function generates a 16-byte UUID (version 4):
* https://en.wikipedia.org/wiki/Universally_unique_identifier#Version_4_.28random.29
**/
unsigned char buf[16];
RAND_pseudo_bytes(buf, sizeof(buf));
//clear the first four bits of byte 6 then set the second bit
buf[6] = (buf[6] & 0x0f) | 0x40;
//clear the second bit of byte 8 and set the first bit
buf[8] = (buf[8] & 0xbf) | 0x80;
std::stringstream ss;
ss << "libhdfs++_"
<< Base64Encode(std::string(reinterpret_cast<char *>(buf), sizeof(buf)));
ss << std::string(reinterpret_cast<char *>(buf), sizeof(buf));
return ss.str();
}

View File

@ -79,16 +79,16 @@ const std::string get_effective_user_name(const std::string &user_name) {
return "unknown_user";
}
FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_name,
const Options &options)
: options_(options),
io_service_(static_cast<IoServiceImpl *>(io_service)),
nn_(&io_service_->io_service(), options,
GetRandomClientName(), get_effective_user_name(user_name), kNamenodeProtocol,
kNamenodeProtocolVersion), client_name_(GetRandomClientName()),
bad_node_tracker_(std::make_shared<BadDataNodeTracker>()),
event_handlers_(std::make_shared<LibhdfsEvents>())
{
FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_name, const Options &options) :
options_(options), client_name_(GetRandomClientName()), io_service_(
static_cast<IoServiceImpl *>(io_service)),
nn_(
&io_service_->io_service(), options, client_name_,
get_effective_user_name(user_name), kNamenodeProtocol,
kNamenodeProtocolVersion
), bad_node_tracker_(std::make_shared<BadDataNodeTracker>()),
event_handlers_(std::make_shared<LibhdfsEvents>()) {
LOG_TRACE(kFileSystem, << "FileSystemImpl::FileSystemImpl("
<< FMT_THIS_ADDR << ") called");
@ -396,6 +396,43 @@ Status FileSystemImpl::GetFileInfo(const std::string &path,
return stat;
}
void FileSystemImpl::GetFsStats(
const std::function<void(const Status &, const FsInfo &)> &handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::GetFsStats(" << FMT_THIS_ADDR << ") called");
nn_.GetFsStats([handler](const Status &stat, const FsInfo &fs_info) {
handler(stat, fs_info);
});
}
Status FileSystemImpl::GetFsStats(FsInfo & fs_info) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]GetFsStats(" << FMT_THIS_ADDR << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status, FsInfo>>>();
std::future<std::tuple<Status, FsInfo>> future(callstate->get_future());
/* wrap async FileSystem::GetFsStats with promise to make it a blocking call */
auto h = [callstate](const Status &s, const FsInfo &si) {
callstate->set_value(std::make_tuple(s, si));
};
GetFsStats(h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = std::get<0>(returnstate);
FsInfo info = std::get<1>(returnstate);
if (!stat.ok()) {
return stat;
}
fs_info = info;
return stat;
}
/**
* Helper function for recursive GetListing calls.
*
@ -475,6 +512,157 @@ Status FileSystemImpl::GetListing(const std::string &path, std::shared_ptr<std::
return stat;
}
void FileSystemImpl::CreateSnapshot(const std::string &path,
const std::string &name,
const std::function<void(const Status &)> &handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("Argument 'path' cannot be empty"));
return;
}
nn_.CreateSnapshot(path, name, [handler](const Status &stat) {
handler(stat);
});
}
Status FileSystemImpl::CreateSnapshot(const std::string &path,
const std::string &name) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status>>>();
std::future<std::tuple<Status>> future(callstate->get_future());
/* wrap async FileSystem::CreateSnapshot with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(std::make_tuple(s));
};
CreateSnapshot(path, name, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = std::get<0>(returnstate);
return stat;
}
void FileSystemImpl::DeleteSnapshot(const std::string &path,
const std::string &name,
const std::function<void(const Status &)> &handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("Argument 'path' cannot be empty"));
return;
}
if (name.empty()) {
handler(Status::InvalidArgument("Argument 'name' cannot be empty"));
return;
}
nn_.DeleteSnapshot(path, name, [handler](const Status &stat) {
handler(stat);
});
}
Status FileSystemImpl::DeleteSnapshot(const std::string &path,
const std::string &name) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status>>>();
std::future<std::tuple<Status>> future(callstate->get_future());
/* wrap async FileSystem::DeleteSnapshot with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(std::make_tuple(s));
};
DeleteSnapshot(path, name, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = std::get<0>(returnstate);
return stat;
}
void FileSystemImpl::AllowSnapshot(const std::string &path,
const std::function<void(const Status &)> &handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("Argument 'path' cannot be empty"));
return;
}
nn_.AllowSnapshot(path, [handler](const Status &stat) {
handler(stat);
});
}
Status FileSystemImpl::AllowSnapshot(const std::string &path) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status>>>();
std::future<std::tuple<Status>> future(callstate->get_future());
/* wrap async FileSystem::AllowSnapshot with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(std::make_tuple(s));
};
AllowSnapshot(path, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = std::get<0>(returnstate);
return stat;
}
void FileSystemImpl::DisallowSnapshot(const std::string &path,
const std::function<void(const Status &)> &handler) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("Argument 'path' cannot be empty"));
return;
}
nn_.DisallowSnapshot(path, [handler](const Status &stat) {
handler(stat);
});
}
Status FileSystemImpl::DisallowSnapshot(const std::string &path) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status>>>();
std::future<std::tuple<Status>> future(callstate->get_future());
/* wrap async FileSystem::DisallowSnapshot with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(std::make_tuple(s));
};
DisallowSnapshot(path, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = std::get<0>(returnstate);
return stat;
}
void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) {
// It is far too easy to destroy the filesystem (and thus the threadpool)

View File

@ -71,6 +71,17 @@ public:
Status GetFileInfo(const std::string &path, StatInfo & stat_info) override;
/**
* Retrieves the file system information such as the total raw size of all files in the filesystem
* and the raw capacity of the filesystem
*
* @param FsInfo struct to be populated by GetFsStats
**/
void GetFsStats(
const std::function<void(const Status &, const FsInfo &)> &handler) override;
Status GetFsStats(FsInfo & fs_info) override;
void GetListing(
const std::string &path,
const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>> &, bool)> &handler) override;
@ -83,6 +94,46 @@ public:
std::shared_ptr<FileBlockLocation> * locations) override;
/*****************************************************************************
* FILE SYSTEM SNAPSHOT FUNCTIONS
****************************************************************************/
/**
* Creates a snapshot of a snapshottable directory specified by path
*
* @param path Path to the directory to be snapshotted (must be non-empty)
* @param name Name to be given to the created snapshot (may be empty)
**/
void CreateSnapshot(const std::string &path, const std::string &name,
const std::function<void(const Status &)> &handler) override;
Status CreateSnapshot(const std::string &path, const std::string &name) override;
/**
* Deletes the directory snapshot specified by path and name
*
* @param path Path to the snapshotted directory (must be non-empty)
* @param name Name of the snapshot to be deleted (must be non-empty)
**/
void DeleteSnapshot(const std::string &path, const std::string &name,
const std::function<void(const Status &)> &handler) override;
Status DeleteSnapshot(const std::string &path, const std::string &name) override;
/**
* Allows snapshots to be made on the specified directory
*
* @param path Path to the directory to be made snapshottable (must be non-empty)
**/
void AllowSnapshot(const std::string &path, const std::function<void(const Status &)> &handler) override;
Status AllowSnapshot(const std::string &path) override;
/**
* Disallows snapshots to be made on the specified directory
*
* @param path Path to the directory to be made non-snapshottable (must be non-empty)
**/
void DisallowSnapshot(const std::string &path, const std::function<void(const Status &)> &handler) override;
Status DisallowSnapshot(const std::string &path) override;
void SetFsEventCallback(fs_event_callback callback) override;
/* add a new thread to handle asio requests, return number of threads in pool
@ -97,7 +148,7 @@ public:
private:
const Options options_;
const std::string client_name_;
std::string cluster_name_;
/**
* The IoService must be the first member variable to ensure that it gets
@ -106,7 +157,6 @@ private:
**/
std::unique_ptr<IoServiceImpl> io_service_;
NameNodeOperations nn_;
const std::string client_name_;
std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
struct WorkerDeleter {

View File

@ -131,6 +131,28 @@ void NameNodeOperations::GetFileInfo(const std::string & path,
});
}
void NameNodeOperations::GetFsStats(
std::function<void(const Status &, const FsInfo &)> handler) {
using ::hadoop::hdfs::GetFsStatusRequestProto;
using ::hadoop::hdfs::GetFsStatsResponseProto;
LOG_TRACE(kFileSystem,
<< "NameNodeOperations::GetFsStats(" << FMT_THIS_ADDR << ") called");
GetFsStatusRequestProto req;
auto resp = std::make_shared<GetFsStatsResponseProto>();
namenode_.GetFsStats(&req, resp, [resp, handler](const Status &stat) {
if (stat.ok()) {
struct FsInfo fs_info;
GetFsStatsResponseProtoToFsInfo(fs_info, resp);
handler(stat, fs_info);
} else {
handler(stat, FsInfo());
}
});
}
void NameNodeOperations::GetListing(
const std::string & path,
std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>> &, bool)> handler,
@ -176,6 +198,108 @@ void NameNodeOperations::GetListing(
});
}
void NameNodeOperations::CreateSnapshot(const std::string & path,
const std::string & name, std::function<void(const Status &)> handler) {
using ::hadoop::hdfs::CreateSnapshotRequestProto;
using ::hadoop::hdfs::CreateSnapshotResponseProto;
LOG_TRACE(kFileSystem,
<< "NameNodeOperations::CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("Argument 'path' cannot be empty"));
return;
}
CreateSnapshotRequestProto req;
req.set_snapshotroot(path);
if (!name.empty()) {
req.set_snapshotname(name);
}
auto resp = std::make_shared<CreateSnapshotResponseProto>();
namenode_.CreateSnapshot(&req, resp,
[resp, handler, path](const Status &stat) {
handler(stat);
});
}
void NameNodeOperations::DeleteSnapshot(const std::string & path,
const std::string & name, std::function<void(const Status &)> handler) {
using ::hadoop::hdfs::DeleteSnapshotRequestProto;
using ::hadoop::hdfs::DeleteSnapshotResponseProto;
LOG_TRACE(kFileSystem,
<< "NameNodeOperations::DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("Argument 'path' cannot be empty"));
return;
}
if (name.empty()) {
handler(Status::InvalidArgument("Argument 'name' cannot be empty"));
return;
}
DeleteSnapshotRequestProto req;
req.set_snapshotroot(path);
req.set_snapshotname(name);
auto resp = std::make_shared<DeleteSnapshotResponseProto>();
namenode_.DeleteSnapshot(&req, resp,
[resp, handler, path](const Status &stat) {
handler(stat);
});
}
void NameNodeOperations::AllowSnapshot(const std::string & path, std::function<void(const Status &)> handler) {
using ::hadoop::hdfs::AllowSnapshotRequestProto;
using ::hadoop::hdfs::AllowSnapshotResponseProto;
LOG_TRACE(kFileSystem,
<< "NameNodeOperations::AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("Argument 'path' cannot be empty"));
return;
}
AllowSnapshotRequestProto req;
req.set_snapshotroot(path);
auto resp = std::make_shared<AllowSnapshotResponseProto>();
namenode_.AllowSnapshot(&req, resp,
[resp, handler, path](const Status &stat) {
handler(stat);
});
}
void NameNodeOperations::DisallowSnapshot(const std::string & path, std::function<void(const Status &)> handler) {
using ::hadoop::hdfs::DisallowSnapshotRequestProto;
using ::hadoop::hdfs::DisallowSnapshotResponseProto;
LOG_TRACE(kFileSystem,
<< "NameNodeOperations::DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
if (path.empty()) {
handler(Status::InvalidArgument("Argument 'path' cannot be empty"));
return;
}
DisallowSnapshotRequestProto req;
req.set_snapshotroot(path);
auto resp = std::make_shared<DisallowSnapshotResponseProto>();
namenode_.DisallowSnapshot(&req, resp,
[resp, handler, path](const Status &stat) {
handler(stat);
});
}
void NameNodeOperations::SetFsEventCallback(fs_event_callback callback) {
engine_.SetFsEventCallback(callback);
}
@ -197,4 +321,19 @@ void NameNodeOperations::HdfsFileStatusProtoToStatInfo(
stat_info.children_num = fs.childrennum();
}
void NameNodeOperations::GetFsStatsResponseProtoToFsInfo(
hdfs::FsInfo & fs_info,
const std::shared_ptr<::hadoop::hdfs::GetFsStatsResponseProto> & fs) {
fs_info.capacity = fs->capacity();
fs_info.used = fs->used();
fs_info.remaining = fs->remaining();
fs_info.under_replicated = fs->under_replicated();
fs_info.corrupt_blocks = fs->corrupt_blocks();
fs_info.missing_blocks = fs->missing_blocks();
fs_info.missing_repl_one_blocks = fs->missing_repl_one_blocks();
if(fs->has_blocks_in_future()){
fs_info.blocks_in_future = fs->blocks_in_future();
}
}
}

View File

@ -20,6 +20,7 @@
#include "rpc/rpc_engine.h"
#include "hdfspp/statinfo.h"
#include "hdfspp/fsinfo.h"
#include "ClientNamenodeProtocol.pb.h"
#include "ClientNamenodeProtocol.hrpc.inl"
@ -35,6 +36,7 @@ namespace hdfs {
* Threading model: thread-safe; all operations can be called concurrently
* Lifetime: owned by a FileSystemImpl
*/
class NameNodeOperations {
public:
MEMCHECKED_CLASS(NameNodeOperations);
@ -56,16 +58,31 @@ public:
void GetFileInfo(const std::string & path,
std::function<void(const Status &, const StatInfo &)> handler);
void GetFsStats(std::function<void(const Status &, const FsInfo &)> handler);
// start_after="" for initial call
void GetListing(const std::string & path,
std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> handler,
const std::string & start_after = "");
void CreateSnapshot(const std::string & path, const std::string & name,
std::function<void(const Status &)> handler);
void DeleteSnapshot(const std::string & path, const std::string & name,
std::function<void(const Status &)> handler);
void AllowSnapshot(const std::string & path,
std::function<void(const Status &)> handler);
void DisallowSnapshot(const std::string & path,
std::function<void(const Status &)> handler);
void SetFsEventCallback(fs_event_callback callback);
private:
static void HdfsFileStatusProtoToStatInfo(hdfs::StatInfo & si, const ::hadoop::hdfs::HdfsFileStatusProto & fs);
static void DirectoryListingProtoToStatInfo(std::shared_ptr<std::vector<StatInfo>> stat_infos, const ::hadoop::hdfs::DirectoryListingProto & dl);
static void GetFsStatsResponseProtoToFsInfo(hdfs::FsInfo & fs_info, const std::shared_ptr<::hadoop::hdfs::GetFsStatsResponseProto> & fs);
::asio::io_service * io_service_;
RpcEngine engine_;

View File

@ -20,7 +20,7 @@
#include "hdfspp_mini_dfs.h"
#include "hdfspp/hdfs_ext.h"
#include <chrono>
namespace hdfs {
@ -66,6 +66,113 @@ TEST_F(HdfsExtTest, TestGetBlockLocations) {
}
// Writing a file to the filesystem and checking the used space
TEST_F(HdfsExtTest, TestGetUsed) {
using namespace std::chrono;
HdfsHandle connection = cluster.connect_c();
hdfsFS fs = connection.handle();
EXPECT_NE(nullptr, fs);
// File system's used space before writing
tOffset used_before_write;
EXPECT_GE(used_before_write = hdfsGetUsed(fs), 0);
// Write to a file
tOffset fileSize = 1024;
std::string filename = connection.newFile(fileSize);
//Need to run hdfsGetUsed() in a loop until the refreshInterval
//is passed on the filesystem and the used space is updated
//Time-out is 3 minutes
tOffset used_after_write;
tOffset difference;
minutes beginTime = duration_cast<minutes>(
system_clock::now().time_since_epoch());
minutes currentTime;
do{
EXPECT_GE(used_after_write = hdfsGetUsed(fs), 0);
difference = used_after_write - used_before_write;
currentTime = duration_cast<minutes>(
system_clock::now().time_since_epoch());
} while (difference == 0 && currentTime.count() - beginTime.count() < 3);
//There should be at least fileSize bytes added to the used space
EXPECT_GT(difference, fileSize);
//There could be additional metadata added to the used space,
//but no more than double the fileSize
EXPECT_LT(difference, fileSize * 2);
}
//Testing allow, disallow, create, and delete snapshot
TEST_F(HdfsExtTest, TestSnapshotOperations) {
HdfsHandle connection = cluster.connect_c();
hdfsFS fs = connection.handle();
EXPECT_NE(nullptr, fs);
//argument 'path' is NULL
EXPECT_EQ(-1, hdfsAllowSnapshot(fs, nullptr));
EXPECT_EQ((int) std::errc::invalid_argument, errno);
EXPECT_EQ(-1, hdfsCreateSnapshot(fs, nullptr, "Bad"));
EXPECT_EQ((int) std::errc::invalid_argument, errno);
EXPECT_EQ(-1, hdfsDeleteSnapshot(fs, nullptr, "Bad"));
EXPECT_EQ((int) std::errc::invalid_argument, errno);
EXPECT_EQ(-1, hdfsDisallowSnapshot(fs, nullptr));
EXPECT_EQ((int) std::errc::invalid_argument, errno);
//argument 'name' is NULL for deletion
EXPECT_EQ(-1, hdfsDeleteSnapshot(fs, "/dir/", nullptr));
EXPECT_EQ((int) std::errc::invalid_argument, errno);
//Path not found
std::string path = "/wrong/dir/";
EXPECT_EQ(-1, hdfsAllowSnapshot(fs, path.c_str()));
EXPECT_EQ((int) std::errc::no_such_file_or_directory, errno);
EXPECT_EQ(-1, hdfsCreateSnapshot(fs, path.c_str(), "Bad"));
EXPECT_EQ((int) std::errc::no_such_file_or_directory, errno);
EXPECT_EQ(-1, hdfsDeleteSnapshot(fs, path.c_str(), "Bad"));
EXPECT_EQ((int) std::errc::no_such_file_or_directory, errno);
EXPECT_EQ(-1, hdfsDisallowSnapshot(fs, path.c_str()));
EXPECT_EQ((int) std::errc::no_such_file_or_directory, errno);
//Not a directory
path = connection.newFile(1024); //1024 byte file
EXPECT_EQ(-1, hdfsAllowSnapshot(fs, path.c_str()));
EXPECT_EQ((int) std::errc::not_a_directory, errno);
EXPECT_EQ(-1, hdfsCreateSnapshot(fs, path.c_str(), "Bad"));
EXPECT_EQ((int) std::errc::not_a_directory, errno);
EXPECT_EQ(-1, hdfsDeleteSnapshot(fs, path.c_str(), "Bad"));
EXPECT_EQ((int) std::errc::not_a_directory, errno);
EXPECT_EQ(-1, hdfsDisallowSnapshot(fs, path.c_str()));
EXPECT_EQ((int) std::errc::not_a_directory, errno);
//Not snapshottable directory
std::string dirName = connection.newDir();
EXPECT_EQ(0, hdfsDisallowSnapshot(fs, dirName.c_str()));
EXPECT_EQ(-1, hdfsCreateSnapshot(fs, dirName.c_str(), "Bad"));
EXPECT_EQ((int) std::errc::invalid_argument, errno);
//Verify snapshot created
EXPECT_EQ(0, hdfsAllowSnapshot(fs, dirName.c_str()));
EXPECT_EQ(0, hdfsCreateSnapshot(fs, dirName.c_str(), "Good"));
std::string snapDir = dirName + ".snapshot/";
int size;
hdfsFileInfo *file_infos;
EXPECT_NE(nullptr, file_infos = hdfsListDirectory(fs, snapDir.c_str(), &size));
EXPECT_EQ(1, size);
EXPECT_STREQ("Good", file_infos[0].mName);
hdfsFreeFileInfo(file_infos, 1);
//Verify snapshot deleted
EXPECT_EQ(0, hdfsDeleteSnapshot(fs, dirName.c_str(), "Good"));
EXPECT_EQ(nullptr, file_infos = hdfsListDirectory(fs, snapDir.c_str(), &size));
EXPECT_EQ(0, size);
hdfsFreeFileInfo(file_infos, 0);
}
}

View File

@ -328,11 +328,11 @@ tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path) {
}
tOffset hdfsGetCapacity(hdfsFS fs) {
return libhdfs_hdfsGetCapacity(fs->libhdfsRep);
return libhdfspp_hdfsGetCapacity(fs->libhdfsppRep);
}
tOffset hdfsGetUsed(hdfsFS fs) {
return libhdfs_hdfsGetUsed(fs->libhdfsRep);
return libhdfspp_hdfsGetUsed(fs->libhdfsppRep);
}
int hdfsChown(hdfsFS fs, const char* path, const char *owner,
@ -404,3 +404,19 @@ int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations
int hdfsFreeBlockLocations(struct hdfsBlockLocations * locations) {
return libhdfspp_hdfsFreeBlockLocations(locations);
}
int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) {
return libhdfspp_hdfsCreateSnapshot(fs->libhdfsppRep, path, name);
}
int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const char* name) {
return libhdfspp_hdfsDeleteSnapshot(fs->libhdfsppRep, path, name);
}
int hdfsAllowSnapshot(hdfsFS fs, const char* path) {
return libhdfspp_hdfsAllowSnapshot(fs->libhdfsppRep, path);
}
int hdfsDisallowSnapshot(hdfsFS fs, const char* path) {
return libhdfspp_hdfsDisallowSnapshot(fs->libhdfsppRep, path);
}

View File

@ -94,3 +94,7 @@
#undef hdfsCancel
#undef hdfsGetBlockLocations
#undef hdfsFreeBlockLocations
#undef hdfsCreateSnapshot
#undef hdfsDeleteSnapshot
#undef hdfsAllowSnapshot
#undef hdfsDisallowSnapshot

View File

@ -94,3 +94,7 @@
#define hdfsCancel libhdfspp_hdfsCancel
#define hdfsGetBlockLocations libhdfspp_hdfsGetBlockLocations
#define hdfsFreeBlockLocations libhdfspp_hdfsFreeBlockLocations
#define hdfsCreateSnapshot libhdfspp_hdfsCreateSnapshot
#define hdfsDeleteSnapshot libhdfspp_hdfsDeleteSnapshot
#define hdfsAllowSnapshot libhdfspp_hdfsAllowSnapshot
#define hdfsDisallowSnapshot libhdfspp_hdfsDisallowSnapshot