HDFS-9616. libhdfs++: Add runtime hooks to allow a client application to add low level monitoring and tests. Contributed by Bob Hansen

This commit is contained in:
James 2016-03-24 00:18:59 -04:00 committed by James Clampffer
parent 0f1a278dd5
commit 427edae365
19 changed files with 731 additions and 42 deletions

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef HDFSPP_EVENTS
#define HDFSPP_EVENTS
#include "hdfspp/status.h"
#include <functional>
namespace hdfs {
/*
* Supported event names. These names will stay consistent in libhdfs callbacks.
*
* Other events not listed here may be seen, but they are not stable and
* should not be counted on.
*/
static constexpr const char * FS_NN_CONNECT_EVENT = "NN::connect";
static constexpr const char * FS_NN_READ_EVENT = "NN::read";
static constexpr const char * FS_NN_WRITE_EVENT = "NN::write";
static constexpr const char * FILE_DN_CONNECT_EVENT = "DN::connect";
static constexpr const char * FILE_DN_READ_EVENT = "DN::read";
static constexpr const char * FILE_DN_WRITE_EVENT = "DN::write";
class event_response {
public:
// Create a response
enum event_response_type {
kOk = 0,
#ifndef NDEBUG
// Responses to be used in testing only
kTest_Error = 100
#endif
};
// The default ok response; libhdfspp should continue normally
static event_response ok() { return event_response(); }
event_response_type response() { return response_; }
private:
event_response() : response_(event_response_type::kOk) {};
event_response_type response_;
///////////////////////////////////////////////
//
// Testing support
//
// If running a debug build, the consumer can stimulate errors
// within libhdfdspp by returning a Status from the callback.
///////////////////////////////////////////////
#ifndef NDEBUG
public:
static event_response test_err(const Status &status) {
return event_response(status);
}
Status status() { return error_status_; }
private:
event_response(const Status & status) :
response_(event_response_type::kTest_Error), error_status_(status) {}
Status error_status_; // To be used with kTest_Error
#endif
};
/* callback signature */
typedef std::function<
event_response (const char * event,
const char * cluster,
int64_t value)>
fs_event_callback;
typedef std::function<
event_response (const char * event,
const char * cluster,
const char * file,
int64_t value)>
file_event_callback;
}
#endif

View File

@ -162,6 +162,61 @@ int hdfsDisableLoggingForComponent(int component);
LIBHDFS_EXTERNAL
int hdfsSetLoggingLevel(int component);
/*
* Supported event names. These names will stay consistent in libhdfs callbacks.
*
* Other events not listed here may be seen, but they are not stable and
* should not be counted on.
*/
extern const char * FS_NN_CONNECT_EVENT;
extern const char * FS_NN_READ_EVENT;
extern const char * FS_NN_WRITE_EVENT;
extern const char * FILE_DN_CONNECT_EVENT;
extern const char * FILE_DN_READ_EVENT;
extern const char * FILE_DN_WRITE_EVENT;
#define LIBHDFSPP_EVENT_OK (0)
#ifndef NDEBUG
#define DEBUG_SIMULATE_ERROR (-1)
#endif
typedef int (*libhdfspp_fs_event_callback)(const char * event, const char * cluster,
int64_t value, int64_t cookie);
typedef int (*libhdfspp_file_event_callback)(const char * event,
const char * cluster,
const char * file,
int64_t value, int64_t cookie);
/**
* Registers a callback for the next filesystem connect operation the current
* thread executes.
*
* @param handler A function pointer. Taken as a void* and internally
* cast into the appropriate type.
* @param cookie An opaque value that will be passed into the handler; can
* be used to correlate the handler with some object in the
* consumer's space.
**/
LIBHDFS_EXTERNAL
int hdfsPreAttachFSMonitor(libhdfspp_fs_event_callback handler, int64_t cookie);
/**
* Registers a callback for the next file open operation the current thread
* executes.
*
* @param fs The filesystem
* @param handler A function pointer. Taken as a void* and internally
* cast into the appropriate type.
* @param cookie An opaque value that will be passed into the handler; can
* be used to correlate the handler with some object in the
* consumer's space.
**/
LIBHDFS_EXTERNAL
int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t cookie);
#ifdef __cplusplus
} /* end extern "C" */

View File

@ -20,6 +20,7 @@
#include "hdfspp/options.h"
#include "hdfspp/status.h"
#include "hdfspp/events.h"
#include <functional>
#include <memory>
@ -108,6 +109,18 @@ public:
**/
static bool ShouldExclude(const Status &status);
/**
* Sets an event callback for file-level event notifications (such as connecting
* to the DataNode, communications errors, etc.)
*
* Many events are defined in hdfspp/events.h; the consumer should also expect
* to be called with many private events, which can be ignored.
*
* @param callback The function to call when a reporting event occurs.
*/
virtual void SetFileEventCallback(file_event_callback callback) = 0;
virtual ~FileHandle();
};
@ -161,6 +174,17 @@ class FileSystem {
*/
virtual ~FileSystem() {};
/**
* Sets an event callback for fs-level event notifications (such as connecting
* to the NameNode, communications errors with the NN, etc.)
*
* Many events are defined in hdfspp/events.h; the consumer should also expect
* to be called with many private events, which can be ignored.
*
* @param callback The function to call when a reporting event occurs.
*/
virtual void SetFsEventCallback(fs_event_callback callback) = 0;
};
}

View File

@ -30,9 +30,11 @@
#include <cstring>
#include <iostream>
#include <algorithm>
#include <functional>
using namespace hdfs;
using std::experimental::nullopt;
using namespace std::placeholders;
static constexpr tPort kDefaultPort = 8020;
@ -81,6 +83,10 @@ void hdfsGetLastError(char *buf, int len) {
buf[copylen] = 0;
}
/* Event callbacks for next open calls */
thread_local std::experimental::optional<fs_event_callback> fsEventCallback;
thread_local std::experimental::optional<file_event_callback> fileEventCallback;
struct hdfsBuilder {
hdfsBuilder();
hdfsBuilder(const char * directory);
@ -197,6 +203,10 @@ hdfsFS doHdfsConnect(optional<std::string> nn, optional<tPort> port, optional<st
return nullptr;
}
if (fsEventCallback) {
fs->SetFsEventCallback(fsEventCallback.value());
}
Status status;
if (nn || port) {
if (!port) {
@ -399,6 +409,72 @@ int hdfsCancel(hdfsFS fs, hdfsFile file) {
}
}
/*******************************************************************
* EVENT CALLBACKS
*******************************************************************/
const char * FS_NN_CONNECT_EVENT = hdfs::FS_NN_CONNECT_EVENT;
const char * FS_NN_READ_EVENT = hdfs::FS_NN_READ_EVENT;
const char * FS_NN_WRITE_EVENT = hdfs::FS_NN_WRITE_EVENT;
const char * FILE_DN_CONNECT_EVENT = hdfs::FILE_DN_CONNECT_EVENT;
const char * FILE_DN_READ_EVENT = hdfs::FILE_DN_READ_EVENT;
const char * FILE_DN_WRITE_EVENT = hdfs::FILE_DN_WRITE_EVENT;
event_response fs_callback_glue(libhdfspp_fs_event_callback handler,
int64_t cookie,
const char * event,
const char * cluster,
int64_t value) {
int result = handler(event, cluster, value, cookie);
if (result == LIBHDFSPP_EVENT_OK) {
return event_response::ok();
}
#ifndef NDEBUG
if (result == DEBUG_SIMULATE_ERROR) {
return event_response::test_err(Status::Error("Simulated error"));
}
#endif
return event_response::ok();
}
event_response file_callback_glue(libhdfspp_file_event_callback handler,
int64_t cookie,
const char * event,
const char * cluster,
const char * file,
int64_t value) {
int result = handler(event, cluster, file, value, cookie);
if (result == LIBHDFSPP_EVENT_OK) {
return event_response::ok();
}
#ifndef NDEBUG
if (result == DEBUG_SIMULATE_ERROR) {
return event_response::test_err(Status::Error("Simulated error"));
}
#endif
return event_response::ok();
}
int hdfsPreAttachFSMonitor(libhdfspp_fs_event_callback handler, int64_t cookie)
{
fs_event_callback callback = std::bind(fs_callback_glue, handler, cookie, _1, _2, _3);
fsEventCallback = callback;
return 0;
}
int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t cookie)
{
file_event_callback callback = std::bind(file_callback_glue, handler, cookie, _1, _2, _3, _4);
fileEventCallback = callback;
return 0;
}
/*******************************************************************
* BUILDER INTERFACE
*******************************************************************/

View File

@ -19,6 +19,6 @@ if(NEED_LINK_DL)
set(LIB_DL dl)
endif()
add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc)
add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc)
add_library(common $<TARGET_OBJECTS:common_obj> $<TARGET_OBJECTS:uriparser2_obj>)
target_link_libraries(common ${LIB_DL})

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "libhdfs_events_impl.h"
namespace hdfs {
/**
* Default no-op callback implementations
**/
LibhdfsEvents::LibhdfsEvents() : fs_callback(std::experimental::nullopt),
file_callback(std::experimental::nullopt)
{}
LibhdfsEvents::~LibhdfsEvents() {}
void LibhdfsEvents::set_fs_callback(const fs_event_callback & callback) {
fs_callback = callback;
}
void LibhdfsEvents::set_file_callback(const file_event_callback & callback) {
file_callback = callback;
}
void LibhdfsEvents::clear_fs_callback() {
fs_callback = std::experimental::nullopt;
}
void LibhdfsEvents::clear_file_callback() {
file_callback = std::experimental::nullopt;
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBHDFSPP_COMMON_LIBHDFSEVENTS_IMPL
#define LIBHDFSPP_COMMON_LIBHDFSEVENTS_IMPL
#include "hdfspp/events.h"
#include <optional.hpp>
#include <functional>
namespace hdfs {
/**
* Users can specify event handlers. Default is a no-op handler.
**/
class LibhdfsEvents {
public:
LibhdfsEvents();
virtual ~LibhdfsEvents();
void set_fs_callback(const fs_event_callback & callback);
void set_file_callback(const file_event_callback & callback);
void clear_fs_callback();
void clear_file_callback();
event_response call(const char * event,
const char * cluster,
int64_t value) {
if (fs_callback) {
return fs_callback->operator ()(event, cluster, value);
} else {
return event_response::ok();
}
}
event_response call(const char * event,
const char * cluster,
const char * file,
int64_t value) {
if (file_callback) {
return file_callback->operator ()(event, cluster, file, value);
} else {
return event_response::ok();
}
}
private:
// Called when fs events occur
std::experimental::optional<fs_event_callback> fs_callback;
// Called when file events occur
std::experimental::optional<file_event_callback> file_callback;
};
}
#endif

View File

@ -26,7 +26,8 @@ DataNodeConnectionImpl::~DataNodeConnectionImpl(){}
DataNodeConnectionImpl::DataNodeConnectionImpl(asio::io_service * io_service,
const ::hadoop::hdfs::DatanodeInfoProto &dn_proto,
const hadoop::common::TokenProto *token)
const hadoop::common::TokenProto *token,
LibhdfsEvents *event_handlers) : event_handlers_(event_handlers)
{
using namespace ::asio::ip;

View File

@ -21,6 +21,7 @@
#include "common/hdfs_public_api.h"
#include "common/async_stream.h"
#include "ClientNamenodeProtocol.pb.h"
#include "common/libhdfs_events_impl.h"
#include "asio.hpp"
@ -42,10 +43,12 @@ public:
std::unique_ptr<asio::ip::tcp::socket> conn_;
std::array<asio::ip::tcp::endpoint, 1> endpoints_;
std::string uuid_;
LibhdfsEvents *event_handlers_;
virtual ~DataNodeConnectionImpl();
DataNodeConnectionImpl(asio::io_service * io_service, const ::hadoop::hdfs::DatanodeInfoProto &dn_proto,
const hadoop::common::TokenProto *token);
const hadoop::common::TokenProto *token,
LibhdfsEvents *event_handlers);
void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override;
@ -54,12 +57,17 @@ public:
void async_read_some(const MutableBuffers &buf,
std::function<void (const asio::error_code & error,
std::size_t bytes_transferred) > handler) override {
event_handlers_->call("DN_read_req", "", "", buf.end() - buf.begin());
conn_->async_read_some(buf, handler);
};
void async_write_some(const ConstBuffers &buf,
std::function<void (const asio::error_code & error,
std::size_t bytes_transferred) > handler) override {
event_handlers_->call("DN_write_req", "", "", buf.end() - buf.begin());
conn_->async_write_some(buf, handler);
}
};

View File

@ -21,6 +21,7 @@
#include "common/logging.h"
#include "connection/datanodeconnection.h"
#include "reader/block_reader.h"
#include "hdfspp/events.h"
#include <future>
#include <tuple>
@ -33,13 +34,17 @@ using ::hadoop::hdfs::LocatedBlocksProto;
FileHandle::~FileHandle() {}
FileHandleImpl::FileHandleImpl(::asio::io_service *io_service, const std::string &client_name,
FileHandleImpl::FileHandleImpl(const std::string & cluster_name,
const std::string & path,
::asio::io_service *io_service, const std::string &client_name,
const std::shared_ptr<const struct FileInfo> file_info,
std::shared_ptr<BadDataNodeTracker> bad_data_nodes)
: io_service_(io_service), client_name_(client_name), file_info_(file_info),
bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()) {
std::shared_ptr<BadDataNodeTracker> bad_data_nodes,
std::shared_ptr<LibhdfsEvents> event_handlers)
: cluster_name_(cluster_name), path_(path), io_service_(io_service), client_name_(client_name), file_info_(file_info),
bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()), event_handlers_(event_handlers) {
LOG_TRACE(kFileHandle, << "FileHandleImpl::FileHandleImpl("
<< FMT_THIS_ADDR << ", ...) called");
}
void FileHandleImpl::PositionRead(
@ -228,14 +233,34 @@ void FileHandleImpl::AsyncPreadSome(
std::shared_ptr<BlockReader> reader;
reader = CreateBlockReader(BlockReaderOptions(), dn);
// Lambdas cannot capture copies of member variables so we'll make explicit
// copies for it
auto event_handlers = event_handlers_;
auto path = path_;
auto cluster_name = cluster_name_;
auto read_handler = [reader, event_handlers, cluster_name, path, dn_id, handler](const Status & status, size_t transferred) {
auto event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred);
#ifndef NDEBUG
if (event_resp.response() == event_response::kTest_Error) {
handler(event_resp.status(), dn_id, transferred);
return;
}
#endif
auto read_handler = [reader, dn_id, handler](const Status & status, size_t transferred) {
handler(status, dn_id, transferred);
};
dn->Connect([handler,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name]
auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name]
(Status status, std::shared_ptr<DataNodeConnection> dn) {
(void)dn;
auto event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0);
#ifndef NDEBUG
if (event_resp.response() == event_response::kTest_Error) {
status = event_resp.status();
}
#endif
if (status.ok()) {
reader->AsyncReadBlock(
client_name, *block, offset_within_block,
@ -243,7 +268,9 @@ void FileHandleImpl::AsyncPreadSome(
} else {
handler(status, dn_id, 0);
}
});
};
dn->Connect(connect_handler);
return;
}
@ -267,7 +294,11 @@ std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection(
const hadoop::common::TokenProto * token) {
LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateDataNodeConnection("
<< FMT_THIS_ADDR << ", ...) called");
return std::make_shared<DataNodeConnectionImpl>(io_service, dn, token);
return std::make_shared<DataNodeConnectionImpl>(io_service, dn, token, event_handlers_.get());
}
std::shared_ptr<LibhdfsEvents> FileHandleImpl::get_event_handlers() {
return event_handlers_;
}
void FileHandleImpl::CancelOperations() {
@ -283,6 +314,18 @@ void FileHandleImpl::CancelOperations() {
}
}
void FileHandleImpl::SetFileEventCallback(file_event_callback callback) {
std::shared_ptr<LibhdfsEvents> new_event_handlers;
if (event_handlers_) {
new_event_handlers = std::make_shared<LibhdfsEvents>(*event_handlers_);
} else {
new_event_handlers = std::make_shared<LibhdfsEvents>();
}
new_event_handlers->set_file_callback(callback);
event_handlers_ = new_event_handlers;
}
bool FileHandle::ShouldExclude(const Status &s) {
if (s.ok()) {

View File

@ -21,6 +21,7 @@
#include "common/hdfs_public_api.h"
#include "common/async_stream.h"
#include "common/cancel_tracker.h"
#include "common/libhdfs_events_impl.h"
#include "reader/fileinfo.h"
#include "reader/readergroup.h"
@ -48,9 +49,12 @@ class DataNodeConnection;
*/
class FileHandleImpl : public FileHandle {
public:
FileHandleImpl(::asio::io_service *io_service, const std::string &client_name,
FileHandleImpl(const std::string & cluster_name,
const std::string & path,
::asio::io_service *io_service, const std::string &client_name,
const std::shared_ptr<const struct FileInfo> file_info,
std::shared_ptr<BadDataNodeTracker> bad_data_nodes);
std::shared_ptr<BadDataNodeTracker> bad_data_nodes,
std::shared_ptr<LibhdfsEvents> event_handlers);
/*
* [Some day reliably] Reads a particular offset into the data file.
@ -96,7 +100,6 @@ public:
const std::function<void(const Status &status,
const std::string &dn_id, size_t bytes_read)> handler);
/**
* Cancels all operations instantiated from this FileHandle.
* Will set a flag to abort continuation pipelines when they try to move to the next step.
@ -104,6 +107,14 @@ public:
**/
virtual void CancelOperations(void) override;
virtual void SetFileEventCallback(file_event_callback callback) override;
/**
* Ephemeral objects created by the filehandle will need to get the event
* handler registry owned by the FileSystem.
**/
std::shared_ptr<LibhdfsEvents> get_event_handlers();
protected:
virtual std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
std::shared_ptr<DataNodeConnection> dn);
@ -112,6 +123,8 @@ protected:
const ::hadoop::hdfs::DatanodeInfoProto & dn,
const hadoop::common::TokenProto * token);
private:
const std::string cluster_name_;
const std::string path_;
::asio::io_service * const io_service_;
const std::string client_name_;
const std::shared_ptr<const struct FileInfo> file_info_;
@ -120,6 +133,7 @@ private:
off_t offset_;
CancelHandle cancel_state_;
ReaderGroup readers_;
std::shared_ptr<LibhdfsEvents> event_handlers_;
};
}

View File

@ -47,7 +47,8 @@ static constexpr uint16_t kDefaultPort = 8020;
* NAMENODE OPERATIONS
****************************************************************************/
void NameNodeOperations::Connect(const std::string &server,
void NameNodeOperations::Connect(const std::string &cluster_name,
const std::string &server,
const std::string &service,
std::function<void(const Status &)> &&handler) {
using namespace asio_continuation;
@ -55,8 +56,8 @@ void NameNodeOperations::Connect(const std::string &server,
auto m = Pipeline<State>::Create();
m->Push(Resolve(io_service_, server, service,
std::back_inserter(m->state())))
.Push(Bind([this, m](const Continuation::Next &next) {
engine_.Connect(m->state(), next);
.Push(Bind([this, m, cluster_name](const Continuation::Next &next) {
engine_.Connect(cluster_name, m->state(), next);
}));
m->Run([this, handler](const Status &status, const State &) {
handler(status);
@ -113,6 +114,10 @@ void NameNodeOperations::GetBlockLocations(const std::string & path,
}
void NameNodeOperations::SetFsEventCallback(fs_event_callback callback) {
engine_.SetFsEventCallback(callback);
}
/*****************************************************************************
* FILESYSTEM BASE CLASS
****************************************************************************/
@ -162,7 +167,8 @@ FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_n
nn_(&io_service_->io_service(), options,
GetRandomClientName(), get_effective_user_name(user_name), kNamenodeProtocol,
kNamenodeProtocolVersion), client_name_(GetRandomClientName()),
bad_node_tracker_(std::make_shared<BadDataNodeTracker>())
bad_node_tracker_(std::make_shared<BadDataNodeTracker>()),
event_handlers_(std::make_shared<LibhdfsEvents>())
{
LOG_TRACE(kFileSystem, << "FileSystemImpl::FileSystemImpl("
<< FMT_THIS_ADDR << ") called");
@ -201,7 +207,9 @@ void FileSystemImpl::Connect(const std::string &server,
handler (Status::Error("Null IoService"), this);
}
nn_.Connect(server, service, [this, handler](const Status & s) {
cluster_name_ = server + ":" + service;
nn_.Connect(cluster_name_, server, service, [this, handler](const Status & s) {
handler(s, this);
});
}
@ -288,8 +296,8 @@ void FileSystemImpl::Open(
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
nn_.GetBlockLocations(path, [this, handler](const Status &stat, std::shared_ptr<const struct FileInfo> file_info) {
handler(stat, stat.ok() ? new FileHandleImpl(&io_service_->io_service(), client_name_, file_info, bad_node_tracker_)
nn_.GetBlockLocations(path, [this, path, handler](const Status &stat, std::shared_ptr<const struct FileInfo> file_info) {
handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, &io_service_->io_service(), client_name_, file_info, bad_node_tracker_, event_handlers_)
: nullptr);
});
}
@ -340,4 +348,18 @@ void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) {
delete t;
}
void FileSystemImpl::SetFsEventCallback(fs_event_callback callback) {
if (event_handlers_) {
event_handlers_->set_fs_callback(callback);
nn_.SetFsEventCallback(callback);
}
}
std::shared_ptr<LibhdfsEvents> FileSystemImpl::get_event_handlers() {
return event_handlers_;
}
}

View File

@ -19,6 +19,7 @@
#define LIBHDFSPP_LIB_FS_FILESYSTEM_H_
#include "filehandle.h"
#include "common/libhdfs_events_impl.h"
#include "common/hdfs_public_api.h"
#include "common/async_stream.h"
#include "hdfspp/hdfspp.h"
@ -53,13 +54,15 @@ public:
engine_(io_service, options, client_name, user_name, protocol_name, protocol_version),
namenode_(& engine_) {}
void Connect(const std::string &server,
void Connect(const std::string &cluster_name,
const std::string &server,
const std::string &service,
std::function<void(const Status &)> &&handler);
void GetBlockLocations(const std::string & path,
std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler);
void SetFsEventCallback(fs_event_callback callback);
private:
::asio::io_service * io_service_;
RpcEngine engine_;
@ -100,6 +103,8 @@ public:
Status Open(const std::string &path, FileHandle **handle) override;
void SetFsEventCallback(fs_event_callback callback) override;
/* add a new thread to handle asio requests, return number of threads in pool
*/
int AddWorkerThread();
@ -107,9 +112,13 @@ public:
/* how many worker threads are servicing asio requests */
int WorkerThreadCount() { return worker_threads_.size(); }
/* all monitored events will need to lookup handlers */
std::shared_ptr<LibhdfsEvents> get_event_handlers();
private:
const Options options_;
std::string cluster_name_;
/**
* The IoService must be the first member variable to ensure that it gets
* destroyed last. This allows other members to dequeue things from the
@ -126,6 +135,12 @@ private:
typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
std::vector<WorkerPtr> worker_threads_;
/**
* Runtime event monitoring handlers.
* Note: This is really handy to have for advanced usage but
* exposes implementation details that may change at any time.
**/
std::shared_ptr<LibhdfsEvents> event_handlers_;
};
}

View File

@ -321,6 +321,16 @@ void RpcConnection::PreEnqueueRequests(
// Don't start sending yet; will flush when connected
}
void RpcConnection::SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers) {
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
event_handlers_ = event_handlers;
}
void RpcConnection::SetClusterName(std::string cluster_name) {
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
cluster_name_ = cluster_name;
}
void RpcConnection::CommsError(const Status &status) {
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling

View File

@ -22,6 +22,7 @@
#include "common/logging.h"
#include "common/util.h"
#include "common/libhdfs_events_impl.h"
#include <asio/connect.hpp>
#include <asio/read.hpp>
@ -111,6 +112,15 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec)
LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called");
Status status = ToStatus(ec);
if(event_handlers_) {
auto event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0);
#ifndef NDEBUG
if (event_resp.response() == event_response::kTest_Error) {
status = event_resp.status();
}
#endif
}
if (status.ok()) {
StartReading();
Handshake([shared_this, this](const Status & s) {
@ -241,7 +251,7 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
template <class NextLayer>
void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ec,
void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &asio_ec,
size_t) {
using std::placeholders::_1;
using std::placeholders::_2;
@ -251,6 +261,16 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ec,
std::shared_ptr<RpcConnection> shared_this = shared_from_this();
::asio::error_code ec = asio_ec;
if(event_handlers_) {
auto event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0);
#ifndef NDEBUG
if (event_resp.response() == event_response::kTest_Error) {
ec = std::make_error_code(std::errc::network_down);
}
#endif
}
switch (ec.value()) {
case 0:
// No errors

View File

@ -39,18 +39,21 @@ RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
protocol_version_(protocol_version),
retry_policy_(std::move(MakeRetryPolicy(options))),
call_id_(0),
retry_timer(*io_service) {
retry_timer(*io_service),
event_handlers_(std::make_shared<LibhdfsEvents>()) {
LOG_DEBUG(kRPC, << "RpcEngine::RpcEngine called");
}
void RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
void RpcEngine::Connect(const std::string &cluster_name,
const std::vector<::asio::ip::tcp::endpoint> &server,
RpcCallback &handler) {
std::lock_guard<std::mutex> state_lock(engine_state_lock_);
LOG_DEBUG(kRPC, << "RpcEngine::Connect called");
last_endpoints_ = server;
cluster_name_ = cluster_name;
conn_ = NewConnection();
conn_ = InitializeConnection();
conn_->Connect(last_endpoints_, handler);
}
@ -85,7 +88,7 @@ void RpcEngine::AsyncRpc(
LOG_TRACE(kRPC, << "RpcEngine::AsyncRpc called");
if (!conn_) {
conn_ = NewConnection();
conn_ = InitializeConnection();
conn_->ConnectAndFlush(last_endpoints_);
}
conn_->AsyncRpc(method_name, req, resp, handler);
@ -111,6 +114,14 @@ std::shared_ptr<RpcConnection> RpcEngine::NewConnection()
return std::make_shared<RpcConnectionImpl<::asio::ip::tcp::socket>>(this);
}
std::shared_ptr<RpcConnection> RpcEngine::InitializeConnection()
{
std::shared_ptr<RpcConnection> result = NewConnection();
result->SetEventHandlers(event_handlers_);
result->SetClusterName(cluster_name_);
return result;
}
Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req,
std::shared_ptr<std::string> resp) {
@ -120,7 +131,7 @@ Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req,
{
std::lock_guard<std::mutex> state_lock(engine_state_lock_);
if (!conn_) {
conn_ = NewConnection();
conn_ = InitializeConnection();
conn_->ConnectAndFlush(last_endpoints_);
}
conn = conn_;
@ -185,7 +196,7 @@ void RpcEngine::RpcCommsError(
// the NN
if (!pendingRequests.empty() &&
head_action && head_action->action != RetryAction::FAIL) {
conn_ = NewConnection();
conn_ = InitializeConnection();
conn_->PreEnqueueRequests(pendingRequests);
if (head_action->delayMillis > 0) {
@ -203,4 +214,10 @@ void RpcEngine::RpcCommsError(
}
}
void RpcEngine::SetFsEventCallback(fs_event_callback callback) {
event_handlers_->set_fs_callback(callback);
}
}

View File

@ -22,6 +22,7 @@
#include "hdfspp/status.h"
#include "common/retry_policy.h"
#include "common/libhdfs_events_impl.h"
#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/coded_stream.h>
@ -131,6 +132,9 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
// on connect
void PreEnqueueRequests(std::vector<std::shared_ptr<Request>> requests);
void SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers);
void SetClusterName(std::string cluster_name);
LockFreeRpcEngine *engine() { return engine_; }
::asio::io_service &io_service();
@ -186,6 +190,10 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
// Requests that are waiting for responses
typedef std::unordered_map<int, std::shared_ptr<Request>> RequestOnFlyMap;
RequestOnFlyMap requests_on_fly_;
std::shared_ptr<LibhdfsEvents> event_handlers_;
std::string cluster_name_;
// Lock for mutable parts of this class that need to be thread safe
std::mutex connection_state_lock_;
};
@ -234,7 +242,9 @@ class RpcEngine : public LockFreeRpcEngine {
const std::string &client_name, const std::string &user_name,
const char *protocol_name, int protocol_version);
void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, RpcCallback &handler);
void Connect(const std::string & cluster_name,
const std::vector<::asio::ip::tcp::endpoint> &server,
RpcCallback &handler);
void AsyncRpc(const std::string &method_name,
const ::google::protobuf::MessageLite *req,
@ -272,13 +282,17 @@ class RpcEngine : public LockFreeRpcEngine {
::asio::io_service &io_service() override { return *io_service_; }
const Options &options() const override { return options_; }
static std::string GetRandomClientName();
void SetFsEventCallback(fs_event_callback callback);
protected:
std::shared_ptr<RpcConnection> conn_;
std::shared_ptr<RpcConnection> InitializeConnection();
virtual std::shared_ptr<RpcConnection> NewConnection();
virtual std::unique_ptr<const RetryPolicy> MakeRetryPolicy(const Options &options);
// Remember all of the last endpoints in case we need to reconnect and retry
std::vector<::asio::ip::tcp::endpoint> last_endpoints_;
private:
::asio::io_service * const io_service_;
const Options options_;
@ -287,9 +301,12 @@ private:
const std::string protocol_name_;
const int protocol_version_;
const std::unique_ptr<const RetryPolicy> retry_policy_; //null --> no retry
std::string cluster_name_;
std::atomic_int call_id_;
::asio::deadline_timer retry_timer;
std::shared_ptr<LibhdfsEvents> event_handlers_;
std::mutex engine_state_lock_;
};

View File

@ -18,6 +18,7 @@
#include "fs/filesystem.h"
#include "fs/bad_datanode_tracker.h"
#include "common/libhdfs_events_impl.h"
#include "common/util.h"
@ -129,9 +130,10 @@ TEST(BadDataNodeTest, TestNoNodes) {
};
IoServiceImpl io_service;
auto bad_node_tracker = std::make_shared<BadDataNodeTracker>();
auto monitors = std::make_shared<LibhdfsEvents>();
bad_node_tracker->AddBadNode("foo");
PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(), file_info, bad_node_tracker);
PartialMockFileHandle is("cluster", "file", &io_service.io_service(), GetRandomClientName(), file_info, bad_node_tracker, monitors);
Status stat;
size_t read = 0;
@ -147,6 +149,69 @@ TEST(BadDataNodeTest, TestNoNodes) {
ASSERT_EQ(0UL, read);
}
TEST(BadDataNodeTest, NNEventCallback) {
auto file_info = std::make_shared<struct FileInfo>();
file_info->blocks_.push_back(LocatedBlockProto());
LocatedBlockProto & block = file_info->blocks_[0];
ExtendedBlockProto *b = block.mutable_b();
b->set_poolid("");
b->set_blockid(1);
b->set_generationstamp(1);
b->set_numbytes(4096);
// Set up the one block to have one datanodes holding it
DatanodeInfoProto *di = block.add_locs();
DatanodeIDProto *dnid = di->mutable_id();
dnid->set_datanodeuuid("dn1");
char buf[4096] = {
0,
};
IoServiceImpl io_service;
auto tracker = std::make_shared<BadDataNodeTracker>();
// Set up event callbacks
int calls = 0;
std::vector<std::string> callbacks;
auto monitors = std::make_shared<LibhdfsEvents>();
monitors->set_file_callback([&calls, &callbacks] (const char * event,
const char * cluster,
const char * file,
int64_t value) {
(void)cluster; (void) file; (void)value;
callbacks.push_back(event);
// Allow connect call to succeed by fail on read
if (calls++ == 1)
return event_response::test_err(Status::Error("Test"));
return event_response::ok();
});
PartialMockFileHandle is("cluster", "file", &io_service.io_service(), GetRandomClientName(), file_info, tracker, monitors);
Status stat;
size_t read = 0;
EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))
// Will return OK, but our callback will subvert it
.WillOnce(InvokeArgument<4>(
Status::OK(), 0));
is.AsyncPreadSome(
0, asio::buffer(buf, sizeof(buf)), nullptr,
[&stat, &read](const Status &status, const std::string &,
size_t transferred) {
stat = status;
read = transferred;
});
ASSERT_FALSE(stat.ok());
ASSERT_EQ(2, callbacks.size());
ASSERT_EQ(FILE_DN_CONNECT_EVENT, callbacks[0]);
ASSERT_EQ(FILE_DN_READ_EVENT, callbacks[1]);
}
TEST(BadDataNodeTest, RecoverableError) {
auto file_info = std::make_shared<struct FileInfo>();
file_info->blocks_.push_back(LocatedBlockProto());
@ -167,7 +232,8 @@ TEST(BadDataNodeTest, RecoverableError) {
};
IoServiceImpl io_service;
auto tracker = std::make_shared<BadDataNodeTracker>();
PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(), file_info, tracker);
auto monitors = std::make_shared<LibhdfsEvents>();
PartialMockFileHandle is("cluster", "file", &io_service.io_service(), GetRandomClientName(), file_info, tracker, monitors);
Status stat;
size_t read = 0;
EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))
@ -216,7 +282,8 @@ TEST(BadDataNodeTest, InternalError) {
};
IoServiceImpl io_service;
auto tracker = std::make_shared<BadDataNodeTracker>();
PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(), file_info, tracker);
auto monitors = std::make_shared<LibhdfsEvents>();
PartialMockFileHandle is("cluster", "file", &io_service.io_service(), GetRandomClientName(), file_info, tracker, monitors);
Status stat;
size_t read = 0;
EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))

View File

@ -266,7 +266,7 @@ TEST(RpcEngineTest, TestConnectionFailure)
EXPECT_CALL(*producer, Produce())
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")));
engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) {
engine.Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) {
complete = true;
io_service.stop();
ASSERT_FALSE(stat.ok());
@ -294,7 +294,7 @@ TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure)
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")));
engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) {
engine.Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) {
complete = true;
io_service.stop();
ASSERT_FALSE(stat.ok());
@ -322,7 +322,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover)
.WillOnce(Return(std::make_pair(::asio::error_code(), "")))
.WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) {
engine.Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) {
complete = true;
io_service.stop();
ASSERT_TRUE(stat.ok());
@ -331,6 +331,72 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover)
ASSERT_TRUE(complete);
}
TEST(RpcEngineTest, TestEventCallbacks)
{
::asio::io_service io_service;
Options options;
options.max_rpc_retries = 99;
options.rpc_retry_delay_ms = 0;
SharedConnectionEngine engine(&io_service, options, "foo", "", "protocol", 1);
// Set up event callbacks
int calls = 0;
std::vector<std::string> callbacks;
engine.SetFsEventCallback([&calls, &callbacks] (const char * event,
const char * cluster,
int64_t value) {
(void)cluster; (void)value;
callbacks.push_back(event);
// Allow connect and fail first read
calls++;
if (calls == 1 || calls == 3) // First connect and first read
return event_response::test_err(Status::Error("Test"));
return event_response::ok();
});
EchoResponseProto server_resp;
server_resp.set_message("foo");
auto producer = std::make_shared<SharedConnectionData>();
producer->checkProducerForConnect = true;
RpcResponseHeaderProto h;
h.set_callid(1);
h.set_status(RpcResponseHeaderProto::SUCCESS);
EXPECT_CALL(*producer, Produce())
.WillOnce(Return(std::make_pair(::asio::error_code(), ""))) // subverted by callback
.WillOnce(Return(std::make_pair(::asio::error_code(), "")))
.WillOnce(Return(RpcResponse(h, "b"))) // subverted by callback
.WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
SharedMockConnection::SetSharedConnectionData(producer);
EchoRequestProto req;
req.set_message("foo");
std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
bool complete = false;
engine.AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) {
complete = true;
io_service.stop();
ASSERT_TRUE(stat.ok());
});
io_service.run();
ASSERT_TRUE(complete);
ASSERT_EQ(7, callbacks.size());
ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error
ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[1]); // reconnect
ASSERT_EQ(FS_NN_READ_EVENT, callbacks[2]); // makes an error
ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[3]); // reconnect
for (int i=4; i < 7; i++)
ASSERT_EQ(FS_NN_READ_EVENT, callbacks[i]);
}
TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover)
{
// Error and async recover
@ -351,7 +417,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover)
.WillOnce(Return(std::make_pair(::asio::error_code(), "")))
.WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) {
engine.Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) {
complete = true;
io_service.stop();
ASSERT_TRUE(stat.ok());