HDFS-9699: libhdfs++: Add appropriate catch blocks for asio operations that throw. Contributed by Bob Hansen

This commit is contained in:
James 2016-03-03 10:37:33 -05:00 committed by James Clampffer
parent 1600e6bba8
commit 6a96f978eb
3 changed files with 297 additions and 135 deletions

View File

@ -150,6 +150,16 @@ static int Error(const Status &stat) {
return -1; return -1;
} }
static int ReportException(const std::exception & e)
{
return Error(Status::Exception("Uncaught exception", e.what()));
}
static int ReportCaughtNonException()
{
return Error(Status::Exception("Uncaught value not derived from std::exception", ""));
}
/* return false on failure */ /* return false on failure */
bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) { bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) {
if (!fs) { if (!fs) {
@ -180,135 +190,202 @@ hdfsFS hdfsConnect(const char *nn, tPort port) {
} }
hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) { hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) {
std::string port_as_string = std::to_string(port); try
IoService * io_service = IoService::New(); {
std::string user_name; std::string port_as_string = std::to_string(port);
if (user) { IoService * io_service = IoService::New();
user_name = user; std::string user_name;
} if (user) {
user_name = user;
}
FileSystem *fs = FileSystem::New(io_service, user_name, Options()); FileSystem *fs = FileSystem::New(io_service, user_name, Options());
if (!fs) { if (!fs) {
ReportError(ENODEV, "Could not create FileSystem object"); ReportError(ENODEV, "Could not create FileSystem object");
return nullptr;
}
if (!fs->Connect(nn, port_as_string).ok()) {
ReportError(ENODEV, "Unable to connect to NameNode.");
// FileSystem's ctor might take ownership of the io_service; if it does,
// it will null out the pointer
if (io_service)
delete io_service;
delete fs;
return nullptr;
}
return new hdfs_internal(fs);
} catch (const std::exception & e) {
ReportException(e);
return nullptr;
} catch (...) {
ReportCaughtNonException();
return nullptr; return nullptr;
} }
if (!fs->Connect(nn, port_as_string).ok()) {
ReportError(ENODEV, "Unable to connect to NameNode.");
// FileSystem's ctor might take ownership of the io_service; if it does,
// it will null out the pointer
if (io_service)
delete io_service;
delete fs;
return nullptr;
}
return new hdfs_internal(fs);
} }
int hdfsDisconnect(hdfsFS fs) { int hdfsDisconnect(hdfsFS fs) {
if (!fs) { try
ReportError(ENODEV, "Cannot disconnect null FS handle."); {
return -1; if (!fs) {
} ReportError(ENODEV, "Cannot disconnect null FS handle.");
return -1;
}
delete fs; delete fs;
return 0; return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
} }
hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize, hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
short replication, tSize blocksize) { short replication, tSize blocksize) {
(void)flags; try
(void)bufferSize; {
(void)replication; (void)flags;
(void)blocksize; (void)bufferSize;
if (!fs) { (void)replication;
ReportError(ENODEV, "Cannot perform FS operations with null FS handle."); (void)blocksize;
if (!fs) {
ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
return nullptr;
}
FileHandle *f = nullptr;
Status stat = fs->get_impl()->Open(path, &f);
if (!stat.ok()) {
Error(stat);
return nullptr;
}
return new hdfsFile_internal(f);
} catch (const std::exception & e) {
ReportException(e);
return nullptr;
} catch (...) {
ReportCaughtNonException();
return nullptr; return nullptr;
} }
FileHandle *f = nullptr;
Status stat = fs->get_impl()->Open(path, &f);
if (!stat.ok()) {
Error(stat);
return nullptr;
}
return new hdfsFile_internal(f);
} }
int hdfsCloseFile(hdfsFS fs, hdfsFile file) { int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
if (!CheckSystemAndHandle(fs, file)) { try
return -1; {
if (!CheckSystemAndHandle(fs, file)) {
return -1;
}
delete file;
return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
} }
delete file;
return 0;
} }
tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer, tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
tSize length) { tSize length) {
if (!CheckSystemAndHandle(fs, file)) { try
return -1; {
} if (!CheckSystemAndHandle(fs, file)) {
return -1;
}
size_t len = length; size_t len = length;
Status stat = file->get_impl()->PositionRead(buffer, &len, position); Status stat = file->get_impl()->PositionRead(buffer, &len, position);
if(!stat.ok()) { if(!stat.ok()) {
return Error(stat); return Error(stat);
}
return (tSize)len;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
} }
return (tSize)len;
} }
tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) { tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) {
try
{
if (!CheckSystemAndHandle(fs, file)) { if (!CheckSystemAndHandle(fs, file)) {
return -1; return -1;
} }
size_t len = length; size_t len = length;
Status stat = file->get_impl()->Read(buffer, &len); Status stat = file->get_impl()->Read(buffer, &len);
if (!stat.ok()) { if (!stat.ok()) {
return Error(stat); return Error(stat);
} }
return (tSize)len; return (tSize)len;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
} }
/* 0 on success, -1 on error*/ /* 0 on success, -1 on error*/
int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
if (!CheckSystemAndHandle(fs, file)) { try
return -1; {
} if (!CheckSystemAndHandle(fs, file)) {
return -1;
}
off_t desired = desiredPos; off_t desired = desiredPos;
Status stat = file->get_impl()->Seek(&desired, std::ios_base::beg); Status stat = file->get_impl()->Seek(&desired, std::ios_base::beg);
if (!stat.ok()) { if (!stat.ok()) {
return Error(stat); return Error(stat);
} }
return 0; return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
} }
tOffset hdfsTell(hdfsFS fs, hdfsFile file) { tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
if (!CheckSystemAndHandle(fs, file)) { try
return -1; {
} if (!CheckSystemAndHandle(fs, file)) {
return -1;
}
ssize_t offset = 0; ssize_t offset = 0;
Status stat = file->get_impl()->Seek(&offset, std::ios_base::cur); Status stat = file->get_impl()->Seek(&offset, std::ios_base::cur);
if (!stat.ok()) { if (!stat.ok()) {
return Error(stat); return Error(stat);
} }
return offset; return offset;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
} }
/* extended API */ /* extended API */
int hdfsCancel(hdfsFS fs, hdfsFile file) { int hdfsCancel(hdfsFS fs, hdfsFile file) {
if (!CheckSystemAndHandle(fs, file)) { try
return -1; {
if (!CheckSystemAndHandle(fs, file)) {
return -1;
}
static_cast<FileHandleImpl*>(file->get_impl())->CancelOperations();
return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
} }
static_cast<FileHandleImpl*>(file->get_impl())->CancelOperations();
return 0;
} }
/******************************************************************* /*******************************************************************
@ -341,7 +418,16 @@ hdfsBuilder::hdfsBuilder(const char * directory) :
struct hdfsBuilder *hdfsNewBuilder(void) struct hdfsBuilder *hdfsNewBuilder(void)
{ {
return new struct hdfsBuilder(); try
{
return new struct hdfsBuilder();
} catch (const std::exception & e) {
ReportException(e);
return nullptr;
} catch (...) {
ReportCaughtNonException();
return nullptr;
}
} }
void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn) void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
@ -366,22 +452,36 @@ void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
void hdfsFreeBuilder(struct hdfsBuilder *bld) void hdfsFreeBuilder(struct hdfsBuilder *bld)
{ {
delete bld; try
{
delete bld;
} catch (const std::exception & e) {
ReportException(e);
} catch (...) {
ReportCaughtNonException();
}
} }
int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key, int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
const char *val) const char *val)
{ {
optional<HdfsConfiguration> newConfig = bld->loader.OverlayValue(bld->config, key, val); try
if (newConfig)
{ {
bld->config = newConfig.value(); optional<HdfsConfiguration> newConfig = bld->loader.OverlayValue(bld->config, key, val);
return 0; if (newConfig)
} {
else bld->config = newConfig.value();
{ return 0;
ReportError(EINVAL, "Could not change Builder value"); }
return 1; else
{
ReportError(EINVAL, "Could not change Builder value");
return 1;
}
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
} }
} }
@ -391,38 +491,60 @@ void hdfsConfStrFree(char *val)
} }
hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) { hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
try
if (!bld->overrideHost.empty())
{ {
// TODO: pass rest of config once we get that done (HDFS-9556) if (!bld->overrideHost.empty())
tPort port = bld->overridePort;
if (port == hdfsBuilder::kUseDefaultPort)
{ {
port = hdfsBuilder::kDefaultPort; // TODO: pass rest of config once we get that done (HDFS-9556)
tPort port = bld->overridePort;
if (port == hdfsBuilder::kUseDefaultPort)
{
port = hdfsBuilder::kDefaultPort;
}
if (bld->user.empty())
return hdfsConnect(bld->overrideHost.c_str(), port);
else
return hdfsConnectAsUser(bld->overrideHost.c_str(), port, bld->user.c_str());
} }
if (bld->user.empty())
return hdfsConnect(bld->overrideHost.c_str(), port);
else else
return hdfsConnectAsUser(bld->overrideHost.c_str(), port, bld->user.c_str()); {
} //TODO: allow construction from default port once that is done (HDFS-9556)
else ReportError(EINVAL, "No host provided to builder in hdfsBuilderConnect");
{ return nullptr;
//TODO: allow construction from default port once that is done (HDFS-9556) }
ReportError(EINVAL, "No host provided to builder in hdfsBuilderConnect"); } catch (const std::exception & e) {
ReportException(e);
return nullptr;
} catch (...) {
ReportCaughtNonException();
return nullptr; return nullptr;
} }
} }
int hdfsConfGetStr(const char *key, char **val) int hdfsConfGetStr(const char *key, char **val)
{ {
hdfsBuilder builder; try
return hdfsBuilderConfGetStr(&builder, key, val); {
hdfsBuilder builder;
return hdfsBuilderConfGetStr(&builder, key, val);
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
} }
int hdfsConfGetInt(const char *key, int32_t *val) int hdfsConfGetInt(const char *key, int32_t *val)
{ {
hdfsBuilder builder; try
return hdfsBuilderConfGetInt(&builder, key, val); {
hdfsBuilder builder;
return hdfsBuilderConfGetInt(&builder, key, val);
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
} }
// //
@ -430,24 +552,40 @@ int hdfsConfGetInt(const char *key, int32_t *val)
// //
struct hdfsBuilder *hdfsNewBuilderFromDirectory(const char * configDirectory) struct hdfsBuilder *hdfsNewBuilderFromDirectory(const char * configDirectory)
{ {
return new struct hdfsBuilder(configDirectory); try
{
return new struct hdfsBuilder(configDirectory);
} catch (const std::exception & e) {
ReportException(e);
return nullptr;
} catch (...) {
ReportCaughtNonException();
return nullptr;
}
} }
int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key, int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key,
char **val) char **val)
{ {
optional<std::string> value = bld->config.Get(key); try
if (value)
{ {
size_t len = value->length() + 1; optional<std::string> value = bld->config.Get(key);
*val = static_cast<char *>(malloc(len)); if (value)
strncpy(*val, value->c_str(), len); {
size_t len = value->length() + 1;
*val = static_cast<char *>(malloc(len));
strncpy(*val, value->c_str(), len);
}
else
{
*val = nullptr;
}
return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
} }
else
{
*val = nullptr;
}
return 0;
} }
// If we're running on a 32-bit platform, we might get 64-bit values that // If we're running on a 32-bit platform, we might get 64-bit values that
@ -460,16 +598,23 @@ bool isValidInt(int64_t value)
int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val) int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val)
{ {
// Pull from default configuration try
optional<int64_t> value = bld->config.GetInt(key);
if (value)
{ {
if (!isValidInt(*value)) // Pull from default configuration
return 1; optional<int64_t> value = bld->config.GetInt(key);
if (value)
{
if (!isValidInt(*value))
return 1;
*val = *value; *val = *value;
}
// If not found, don't change val
ReportError(EINVAL, "Could not get Builder value");
return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
} }
// If not found, don't change val
ReportError(EINVAL, "Could not get Builder value");
return 0;
} }

View File

@ -18,10 +18,30 @@
#include "hdfs_public_api.h" #include "hdfs_public_api.h"
#include "common/logging.h"
namespace hdfs { namespace hdfs {
IoService::~IoService() {} IoService::~IoService() {}
IoService *IoService::New() { return new IoServiceImpl(); } IoService *IoService::New() { return new IoServiceImpl(); }
void IoServiceImpl::Run() {
// As recommended in http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
asio::io_service::work work(io_service_);
for(;;)
{
try
{
io_service_.run();
break;
} catch (const std::exception & e) {
LOG_WARN() << "Unexpected exception in libhdfspp worker thread: " << e.what();
} catch (...) {
LOG_WARN() << "Unexpected value not derived from std::exception in libhdfspp worker thread";
}
}
}
} }

View File

@ -27,10 +27,7 @@ namespace hdfs {
class IoServiceImpl : public IoService { class IoServiceImpl : public IoService {
public: public:
virtual void Run() override { virtual void Run() override;
asio::io_service::work work(io_service_);
io_service_.run();
}
virtual void Stop() override { io_service_.stop(); } virtual void Stop() override { io_service_.stop(); }
::asio::io_service &io_service() { return io_service_; } ::asio::io_service &io_service() { return io_service_; }
private: private: