HDFS-13403: libhdfs++ Use hdfs::IoService object rather than asio::io_service. Contributed by James Clampffer.

This commit is contained in:
James Clampffer 2018-04-11 10:27:23 -04:00
parent 7eb783e263
commit eefe2a147c
36 changed files with 477 additions and 350 deletions

View File

@ -19,6 +19,7 @@
#define LIBHDFSPP_HDFSPP_H_ #define LIBHDFSPP_HDFSPP_H_
#include "hdfspp/options.h" #include "hdfspp/options.h"
#include "hdfspp/ioservice.h"
#include "hdfspp/status.h" #include "hdfspp/status.h"
#include "hdfspp/events.h" #include "hdfspp/events.h"
#include "hdfspp/block_location.h" #include "hdfspp/block_location.h"
@ -31,61 +32,9 @@
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <set>
#include <iostream>
namespace hdfs { namespace hdfs {
/**
* An IoService manages a queue of asynchronous tasks. All libhdfs++
* operations are filed against a particular IoService.
*
* When an operation is queued into an IoService, the IoService will
* run the callback handler associated with the operation. Note that
* the IoService must be stopped before destructing the objects that
* post the operations.
*
* From an implementation point of view the hdfs::IoService provides
* a thin wrapper over an asio::io_service object so that additional
* instrumentation and functionality can be added.
**/
class IoService : public std::enable_shared_from_this<IoService>
{
public:
static IoService *New();
static std::shared_ptr<IoService> MakeShared();
virtual ~IoService();
/**
* Start up as many threads as there are logical processors.
* Return number of threads created.
**/
virtual unsigned int InitDefaultWorkers() = 0;
/**
* Initialize with thread_count handler threads.
* If thread count is less than one print a log message and default to one thread.
* Return number of threads created.
**/
virtual unsigned int InitWorkers(unsigned int thread_count) = 0;
/**
* Place an item on the execution queue. Will be invoked from outside of the calling context.
**/
virtual void PostTask(std::function<void(void)>& asyncTask) = 0;
/**
* Run the asynchronous tasks associated with this IoService.
**/
virtual void Run() = 0;
/**
* Stop running asynchronous tasks associated with this IoService.
* All worker threads will return as soon as they finish executing their current task.
**/
virtual void Stop() = 0;
};
/** /**
* A node exclusion rule provides a simple way of testing if the * A node exclusion rule provides a simple way of testing if the
* client should attempt to connect to a node based on the node's * client should attempt to connect to a node based on the node's

View File

@ -0,0 +1,140 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* An asio::io_service maintains a queue of asynchronous tasks and invokes them
* when they are ready to run. Async network IO handlers become runnable when
* the associated IO operation has completed. The hdfs::IoService is a thin
* wrapper over that object to make it easier to add logging and instrumentation
* to tasks that have been queued.
*
* Lifecycle management:
* -The IoService *shall* outlive any tasks it owns. Deleting a task
* before it has been run **will** result in dangling reference issues.
* -Dependencies (including transitive dependencies) of pending tasks
* *shall* outlive the task. Failure to ensure this **will** result in
* danging reference issues.
* -libhdfs++ uses shared_ptr/weak_ptr heavily as a mechanism to ensure
* liveness of dependencies.
* -refcounted pointers in lambda capture lists have a poor track record
* for ensuring liveness in this library; it's easy to omit them because
* the capture list isn't context aware. Developers are encouraged to
* write callable classes that explicitly list dependencies.
*
* Constraints on tasks:
* -Tasks and async callbacks *shall* never do blocking IO or sleep().
* At best this hurts performance by preventing worker threads from doing
* useful work. It may also cause situations that look like deadlocks
* if the worker thread is stalled for long enough.
* -Tasks and async callbacks *shall* not acquire locks that guard resources
* that might be unavailable for an unknown amount of time. Lock acquisition
* when accessing shared data structures is acceptable and is often required.
* -Tasks and async callbacks *should* not allow exceptions to escape their
* scope since tasks will be executed on a different stack then where they
* were created. The exception will be caught by the IoService rather than
* being forwarded to the next task.
* -Tasks and async callbacks *should* not rely on thread local storage for
* ancillary context. The IoService does not support any sort of thread
* affinity that would guarantee tasks Post()ed from one thread will always
* be executed on the same thread. Applications that only use a single
* worker thread may use TLS but developers should be mindful that throughput
* can no longer be scaled by adding threads.
**/
#ifndef INCLUDE_HDFSPP_IOSERVICE_H_
#define INCLUDE_HDFSPP_IOSERVICE_H_
#include <memory>
// forward decl
namespace asio {
class io_service;
}
namespace hdfs {
// (Un)comment this to determine if issues are due to concurrency or logic faults
// If tests still fail with concurrency disabled it's most likely a logic bug
#define DISABLE_CONCURRENT_WORKERS
class IoService : public std::enable_shared_from_this<IoService>
{
public:
static IoService *New();
static std::shared_ptr<IoService> MakeShared();
virtual ~IoService();
/**
* Start up as many threads as there are logical processors.
* Return number of threads created.
**/
virtual unsigned int InitDefaultWorkers() = 0;
/**
* Initialize with thread_count handler threads.
* If thread count is less than one print a log message and default to one thread.
* Return number of threads created.
**/
virtual unsigned int InitWorkers(unsigned int thread_count) = 0;
/**
* Add a worker thread to existing pool.
* Return true on success, false otherwise.
**/
virtual bool AddWorkerThread() = 0;
/**
* Return the number of worker threads in use.
**/
virtual unsigned int GetWorkerThreadCount() = 0;
/**
* Enqueue an item for deferred execution. Non-blocking.
* Task will be invoked from outside of the calling context.
**/
virtual void PostTask(std::function<void(void)> asyncTask) = 0;
/**
* Provide type erasure for lambdas defined inside the argument list.
**/
template <typename LambdaInstance>
inline void PostLambda(LambdaInstance&& func)
{
std::function<void(void)> typeEraser = func;
this->PostTask(func);
}
/**
* Run the asynchronous tasks associated with this IoService.
**/
virtual void Run() = 0;
/**
* Stop running asynchronous tasks associated with this IoService.
* All worker threads will return as soon as they finish executing their current task.
**/
virtual void Stop() = 0;
/**
* Access underlying io_service object. Only to be used in asio library calls.
* After HDFS-11884 is complete only tests should need direct access to the asio::io_service.
**/
virtual asio::io_service& GetRaw() = 0;
};
} // namespace hdfs
#endif // include guard

View File

@ -17,18 +17,17 @@
*/ */
#include "hdfspp/hdfspp.h" #include "hdfspp/hdfspp.h"
#include "hdfspp/hdfs_ext.h"
#include "fs/filesystem.h"
#include "common/hdfs_configuration.h" #include "common/hdfs_configuration.h"
#include "common/configuration_loader.h" #include "common/configuration_loader.h"
#include "common/logging.h" #include "common/logging.h"
#include "fs/filesystem.h"
#include "fs/filehandle.h"
#include <hdfs/hdfs.h>
#include <hdfspp/hdfs_ext.h>
#include <libgen.h> #include <libgen.h>
#include "limits.h" #include "limits.h"
#include <string> #include <string>
#include <cstring> #include <cstring>
#include <iostream> #include <iostream>

View File

@ -19,6 +19,6 @@ if(NEED_LINK_DL)
set(LIB_DL dl) set(LIB_DL dl)
endif() endif()
add_library(common_obj OBJECT status.cc sasl_digest_md5.cc hdfs_ioservice.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc statinfo.cc fsinfo.cc content_summary.cc locks.cc config_parser.cc) add_library(common_obj OBJECT status.cc sasl_digest_md5.cc ioservice_impl.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc statinfo.cc fsinfo.cc content_summary.cc locks.cc config_parser.cc)
add_library(common $<TARGET_OBJECTS:common_obj> $<TARGET_OBJECTS:uriparser2_obj>) add_library(common $<TARGET_OBJECTS:common_obj> $<TARGET_OBJECTS:uriparser2_obj>)
target_link_libraries(common ${LIB_DL}) target_link_libraries(common ${LIB_DL})

View File

@ -19,12 +19,15 @@
#ifndef LIB_COMMON_ASYNC_STREAM_H_ #ifndef LIB_COMMON_ASYNC_STREAM_H_
#define LIB_COMMON_ASYNC_STREAM_H_ #define LIB_COMMON_ASYNC_STREAM_H_
#include <asio.hpp> #include <asio/buffer.hpp>
#include <asio/error_code.hpp>
#include <functional>
namespace hdfs { namespace hdfs {
typedef asio::mutable_buffers_1 MutableBuffers; // Contiguous buffer types
typedef asio::const_buffers_1 ConstBuffers; typedef asio::mutable_buffers_1 MutableBuffer;
typedef asio::const_buffers_1 ConstBuffer;
/* /*
* asio-compatible stream implementation. * asio-compatible stream implementation.
@ -35,11 +38,11 @@ typedef asio::const_buffers_1 ConstBuffers;
*/ */
class AsyncStream { class AsyncStream {
public: public:
virtual void async_read_some(const MutableBuffers &buf, virtual void async_read_some(const MutableBuffer &buf,
std::function<void (const asio::error_code & error, std::function<void (const asio::error_code & error,
std::size_t bytes_transferred) > handler) = 0; std::size_t bytes_transferred) > handler) = 0;
virtual void async_write_some(const ConstBuffers &buf, virtual void async_write_some(const ConstBuffer &buf,
std::function<void (const asio::error_code & error, std::function<void (const asio::error_code & error,
std::size_t bytes_transferred) > handler) = 0; std::size_t bytes_transferred) > handler) = 0;
}; };

View File

@ -20,13 +20,8 @@
#include "continuation.h" #include "continuation.h"
#include "common/util.h" #include "common/util.h"
#include "hdfspp/status.h" #include "hdfspp/status.h"
#include <asio/connect.hpp>
#include <asio/read.hpp>
#include <asio/write.hpp> #include <asio/write.hpp>
#include <asio/ip/tcp.hpp>
#include <memory> #include <memory>
namespace hdfs { namespace hdfs {

View File

@ -16,14 +16,16 @@
* limitations under the License. * limitations under the License.
*/ */
#include "hdfs_ioservice.h" #include "ioservice_impl.h"
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <vector> #include <vector>
#include "common/util.h"
#include "common/logging.h" #include "common/logging.h"
namespace hdfs { namespace hdfs {
IoService::~IoService() {} IoService::~IoService() {}
@ -99,7 +101,7 @@ void IoServiceImpl::ThreadExitHook() {
LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() << " for IoServiceImpl@" << this << " exiting"); LOG_DEBUG(kAsyncRuntime, << "Worker thread #" << std::this_thread::get_id() << " for IoServiceImpl@" << this << " exiting");
} }
void IoServiceImpl::PostTask(std::function<void(void)>& asyncTask) { void IoServiceImpl::PostTask(std::function<void(void)> asyncTask) {
io_service_.post(asyncTask); io_service_.post(asyncTask);
} }
@ -133,14 +135,25 @@ void IoServiceImpl::Run() {
} catch (const std::exception & e) { } catch (const std::exception & e) {
LOG_WARN(kFileSystem, << "Unexpected exception in libhdfspp worker thread: " << e.what()); LOG_WARN(kFileSystem, << "Unexpected exception in libhdfspp worker thread: " << e.what());
} catch (...) { } catch (...) {
LOG_WARN(kFileSystem, << "Unexpected value not derived from std::exception in libhdfspp worker thread"); LOG_WARN(kFileSystem, << "Caught unexpected value not derived from std::exception in libhdfspp worker thread");
} }
} }
} }
unsigned int IoServiceImpl::get_worker_thread_count() { void IoServiceImpl::Stop() {
mutex_guard state_lock(state_lock_); // Note: This doesn't wait for running operations to stop.
return worker_threads_.size(); io_service_.stop();
} }
asio::io_service& IoServiceImpl::GetRaw() {
return io_service_;
} }
unsigned int IoServiceImpl::GetWorkerThreadCount() {
mutex_guard state_lock(state_lock_);
return worker_threads_.size();
}
} // namespace hdfs

View File

@ -19,20 +19,16 @@
#ifndef COMMON_HDFS_IOSERVICE_H_ #ifndef COMMON_HDFS_IOSERVICE_H_
#define COMMON_HDFS_IOSERVICE_H_ #define COMMON_HDFS_IOSERVICE_H_
#include "hdfspp/hdfspp.h" #include "hdfspp/ioservice.h"
#include <asio/io_service.hpp> #include <asio/io_service.hpp>
#include "common/util.h" #include "common/new_delete.h"
#include <mutex> #include <mutex>
#include <thread> #include <thread>
namespace hdfs { namespace hdfs {
// Uncomment this to determine if issues are due to concurrency or logic faults
// If tests still fail with concurrency disabled it's most likely a logic bug
#define DISABLE_CONCURRENT_WORKERS
/* /*
* A thin wrapper over the asio::io_service with a few extras * A thin wrapper over the asio::io_service with a few extras
* -manages it's own worker threads * -manages it's own worker threads
@ -41,23 +37,24 @@ namespace hdfs {
class IoServiceImpl : public IoService { class IoServiceImpl : public IoService {
public: public:
MEMCHECKED_CLASS(IoServiceImpl)
IoServiceImpl() {} IoServiceImpl() {}
virtual unsigned int InitDefaultWorkers() override; unsigned int InitDefaultWorkers() override;
virtual unsigned int InitWorkers(unsigned int thread_count) override; unsigned int InitWorkers(unsigned int thread_count) override;
virtual void PostTask(std::function<void(void)>& asyncTask) override; void PostTask(std::function<void(void)> asyncTask) override;
virtual void Run() override; void Run() override;
virtual void Stop() override { io_service_.stop(); } void Stop() override;
asio::io_service& GetRaw() override;
// Add a single worker thread, in the common case try to avoid this in favor // Add a single worker thread, in the common case try to avoid this in favor
// of Init[Default]Workers. Public for use by tests and rare cases where a // of Init[Default]Workers. Public for use by tests and rare cases where a
// client wants very explicit control of threading for performance reasons // client wants very explicit control of threading for performance reasons
// e.g. pinning threads to NUMA nodes. // e.g. pinning threads to NUMA nodes.
bool AddWorkerThread(); bool AddWorkerThread() override;
unsigned int GetWorkerThreadCount() override;
// Be very careful about using this: HDFS-10241
::asio::io_service &io_service() { return io_service_; }
unsigned int get_worker_thread_count();
private: private:
std::mutex state_lock_; std::mutex state_lock_;
::asio::io_service io_service_; ::asio::io_service io_service_;

View File

@ -23,14 +23,11 @@
#include "hdfspp/log.h" #include "hdfspp/log.h"
#include <iostream>
#include <sstream> #include <sstream>
#include <mutex> #include <mutex>
#include <memory> #include <memory>
#include <thread> #include <thread>
#include <asio/ip/tcp.hpp>
namespace hdfs { namespace hdfs {
/** /**

View File

@ -20,11 +20,12 @@
#include "common/util.h" #include "common/util.h"
#include "common/logging.h" #include "common/logging.h"
#include "hdfspp/ioservice.h"
#include <sstream> #include <sstream>
#include <utility> #include <utility>
#include <future> #include <future>
#include <memory>
namespace hdfs { namespace hdfs {
@ -35,8 +36,6 @@ ResolvedNamenodeInfo& ResolvedNamenodeInfo::operator=(const NamenodeInfo &info)
return *this; return *this;
} }
std::string ResolvedNamenodeInfo::str() const { std::string ResolvedNamenodeInfo::str() const {
std::stringstream ss; std::stringstream ss;
ss << "ResolvedNamenodeInfo {nameservice: " << nameservice << ", name: " << name << ", uri: " << uri.str(); ss << "ResolvedNamenodeInfo {nameservice: " << nameservice << ", name: " << name << ", uri: " << uri.str();
@ -58,7 +57,7 @@ std::string ResolvedNamenodeInfo::str() const {
} }
bool ResolveInPlace(::asio::io_service *ioservice, ResolvedNamenodeInfo &info) { bool ResolveInPlace(std::shared_ptr<IoService> ioservice, ResolvedNamenodeInfo &info) {
// this isn't very memory friendly, but if it needs to be called often there are bigger issues at hand // this isn't very memory friendly, but if it needs to be called often there are bigger issues at hand
info.endpoints.clear(); info.endpoints.clear();
std::vector<ResolvedNamenodeInfo> resolved = BulkResolve(ioservice, {info}); std::vector<ResolvedNamenodeInfo> resolved = BulkResolve(ioservice, {info});
@ -76,7 +75,7 @@ typedef std::vector<asio::ip::tcp::endpoint> endpoint_vector;
// RAII wrapper // RAII wrapper
class ScopedResolver { class ScopedResolver {
private: private:
::asio::io_service *io_service_; std::shared_ptr<IoService> io_service_;
std::string host_; std::string host_;
std::string port_; std::string port_;
::asio::ip::tcp::resolver::query query_; ::asio::ip::tcp::resolver::query query_;
@ -86,8 +85,8 @@ class ScopedResolver {
// Caller blocks on access if resolution isn't finished // Caller blocks on access if resolution isn't finished
std::shared_ptr<std::promise<Status>> result_status_; std::shared_ptr<std::promise<Status>> result_status_;
public: public:
ScopedResolver(::asio::io_service *service, const std::string &host, const std::string &port) : ScopedResolver(std::shared_ptr<IoService> service, const std::string &host, const std::string &port) :
io_service_(service), host_(host), port_(port), query_(host, port), resolver_(*io_service_) io_service_(service), host_(host), port_(port), query_(host, port), resolver_(io_service_->GetRaw())
{ {
if(!io_service_) if(!io_service_)
LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << " passed nullptr to io_service"); LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << " passed nullptr to io_service");
@ -140,7 +139,7 @@ class ScopedResolver {
} }
}; };
std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, const std::vector<NamenodeInfo> &nodes) { std::vector<ResolvedNamenodeInfo> BulkResolve(std::shared_ptr<IoService> ioservice, const std::vector<NamenodeInfo> &nodes) {
std::vector< std::unique_ptr<ScopedResolver> > resolvers; std::vector< std::unique_ptr<ScopedResolver> > resolvers;
resolvers.reserve(nodes.size()); resolvers.reserve(nodes.size());

View File

@ -20,6 +20,7 @@
#define COMMON_HDFS_NAMENODE_INFO_H_ #define COMMON_HDFS_NAMENODE_INFO_H_
#include <asio.hpp> #include <asio.hpp>
#include <hdfspp/options.h> #include <hdfspp/options.h>
#include <string> #include <string>
@ -27,6 +28,9 @@
namespace hdfs { namespace hdfs {
// Forward decl
class IoService;
// Internal representation of namenode info that keeps track // Internal representation of namenode info that keeps track
// of its endpoints. // of its endpoints.
struct ResolvedNamenodeInfo : public NamenodeInfo { struct ResolvedNamenodeInfo : public NamenodeInfo {
@ -38,11 +42,11 @@ struct ResolvedNamenodeInfo : public NamenodeInfo {
// Clear endpoints if set and resolve all of them in parallel. // Clear endpoints if set and resolve all of them in parallel.
// Only successful lookups will be placed in the result set. // Only successful lookups will be placed in the result set.
std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, const std::vector<NamenodeInfo> &nodes); std::vector<ResolvedNamenodeInfo> BulkResolve(std::shared_ptr<IoService> ioservice, const std::vector<NamenodeInfo> &nodes);
// Clear endpoints, if any, and resolve them again // Clear endpoints, if any, and resolve them again
// Return true if endpoints were resolved // Return true if endpoints were resolved
bool ResolveInPlace(::asio::io_service *ioservice, ResolvedNamenodeInfo &info); bool ResolveInPlace(std::shared_ptr<IoService> ioservice, ResolvedNamenodeInfo &info);
} }

View File

@ -19,17 +19,25 @@
#include "common/util.h" #include "common/util.h"
#include "common/util_c.h" #include "common/util_c.h"
#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h> #include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#include <exception> #include <exception>
#include <sstream> #include <sstream>
#include <iostream>
#include <iomanip> #include <iomanip>
#include <thread> #include <thread>
namespace hdfs { namespace hdfs {
Status ToStatus(const ::asio::error_code &ec) {
if (ec) {
return Status(ec.value(), ec.message().c_str());
} else {
return Status::OK();
}
}
bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in, bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in,
::google::protobuf::MessageLite *msg) { ::google::protobuf::MessageLite *msg) {
uint32_t size = 0; uint32_t size = 0;
@ -60,6 +68,10 @@ std::string SerializeDelimitedProtobufMessage(const ::google::protobuf::MessageL
return buf; return buf;
} }
int DelimitedPBMessageSize(const ::google::protobuf::MessageLite *msg) {
size_t size = msg->ByteSize();
return ::google::protobuf::io::CodedOutputStream::VarintSize32(size) + size;
}
std::string GetRandomClientName() { std::string GetRandomClientName() {
std::vector<unsigned char>buf(8); std::vector<unsigned char>buf(8);

View File

@ -21,16 +21,19 @@
#include "hdfspp/status.h" #include "hdfspp/status.h"
#include "common/logging.h" #include "common/logging.h"
#include <sstream>
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <asio/error_code.hpp> #include <asio/error_code.hpp>
#include <openssl/rand.h> #include <openssl/rand.h>
#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/coded_stream.h> #include <google/protobuf/io/coded_stream.h>
#include <asio.hpp>
namespace google {
namespace protobuf {
class MessageLite;
}
}
namespace hdfs { namespace hdfs {
@ -38,20 +41,11 @@ namespace hdfs {
typedef std::lock_guard<std::mutex> mutex_guard; typedef std::lock_guard<std::mutex> mutex_guard;
static inline Status ToStatus(const ::asio::error_code &ec) { Status ToStatus(const ::asio::error_code &ec);
if (ec) {
return Status(ec.value(), ec.message().c_str());
} else {
return Status::OK();
}
}
// Determine size of buffer that needs to be allocated in order to serialize msg // Determine size of buffer that needs to be allocated in order to serialize msg
// in delimited format // in delimited format
static inline int DelimitedPBMessageSize(const ::google::protobuf::MessageLite *msg) { int DelimitedPBMessageSize(const ::google::protobuf::MessageLite *msg);
size_t size = msg->ByteSize();
return ::google::protobuf::io::CodedOutputStream::VarintSize32(size) + size;
}
// Construct msg from the input held in the CodedInputStream // Construct msg from the input held in the CodedInputStream
// return false on failure, otherwise return true // return false on failure, otherwise return true
@ -84,7 +78,6 @@ bool lock_held(T & mutex) {
std::string SafeDisconnect(asio::ip::tcp::socket *sock); std::string SafeDisconnect(asio::ip::tcp::socket *sock);
// The following helper function is used for classes that look like the following: // The following helper function is used for classes that look like the following:
// //
// template <typename socket_like_object> // template <typename socket_like_object>

View File

@ -24,14 +24,14 @@ namespace hdfs {
DataNodeConnection::~DataNodeConnection(){} DataNodeConnection::~DataNodeConnection(){}
DataNodeConnectionImpl::~DataNodeConnectionImpl(){} DataNodeConnectionImpl::~DataNodeConnectionImpl(){}
DataNodeConnectionImpl::DataNodeConnectionImpl(asio::io_service * io_service, DataNodeConnectionImpl::DataNodeConnectionImpl(std::shared_ptr<IoService> io_service,
const ::hadoop::hdfs::DatanodeInfoProto &dn_proto, const ::hadoop::hdfs::DatanodeInfoProto &dn_proto,
const hadoop::common::TokenProto *token, const hadoop::common::TokenProto *token,
LibhdfsEvents *event_handlers) : event_handlers_(event_handlers) LibhdfsEvents *event_handlers) : event_handlers_(event_handlers)
{ {
using namespace ::asio::ip; using namespace ::asio::ip;
conn_.reset(new tcp::socket(*io_service)); conn_.reset(new tcp::socket(io_service->GetRaw()));
auto datanode_addr = dn_proto.id(); auto datanode_addr = dn_proto.id();
endpoints_[0] = tcp::endpoint(address::from_string(datanode_addr.ipaddr()), endpoints_[0] = tcp::endpoint(address::from_string(datanode_addr.ipaddr()),
datanode_addr.xferport()); datanode_addr.xferport());
@ -68,5 +68,22 @@ void DataNodeConnectionImpl::Cancel() {
} }
} }
void DataNodeConnectionImpl::async_read_some(const MutableBuffer &buf,
std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler)
{
event_handlers_->call("DN_read_req", "", "", buf.end() - buf.begin());
mutex_guard state_lock(state_lock_);
conn_->async_read_some(buf, handler);
}
void DataNodeConnectionImpl::async_write_some(const ConstBuffer &buf,
std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler)
{
event_handlers_->call("DN_write_req", "", "", buf.end() - buf.begin());
mutex_guard state_lock(state_lock_);
conn_->async_write_some(buf, handler);
}
} }

View File

@ -18,7 +18,7 @@
#ifndef LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_ #ifndef LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_
#define LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_ #define LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_
#include "common/hdfs_ioservice.h" #include "hdfspp/ioservice.h"
#include "common/async_stream.h" #include "common/async_stream.h"
#include "ClientNamenodeProtocol.pb.h" #include "ClientNamenodeProtocol.pb.h"
#include "common/libhdfs_events_impl.h" #include "common/libhdfs_events_impl.h"
@ -58,13 +58,14 @@ private:
// held (briefly) while posting async ops to the asio task queue // held (briefly) while posting async ops to the asio task queue
std::mutex state_lock_; std::mutex state_lock_;
public: public:
MEMCHECKED_CLASS(DataNodeConnectionImpl)
std::unique_ptr<asio::ip::tcp::socket, SocketDeleter> conn_; std::unique_ptr<asio::ip::tcp::socket, SocketDeleter> conn_;
std::array<asio::ip::tcp::endpoint, 1> endpoints_; std::array<asio::ip::tcp::endpoint, 1> endpoints_;
std::string uuid_; std::string uuid_;
LibhdfsEvents *event_handlers_; LibhdfsEvents *event_handlers_;
virtual ~DataNodeConnectionImpl(); virtual ~DataNodeConnectionImpl();
DataNodeConnectionImpl(asio::io_service * io_service, const ::hadoop::hdfs::DatanodeInfoProto &dn_proto, DataNodeConnectionImpl(std::shared_ptr<IoService> io_service, const ::hadoop::hdfs::DatanodeInfoProto &dn_proto,
const hadoop::common::TokenProto *token, const hadoop::common::TokenProto *token,
LibhdfsEvents *event_handlers); LibhdfsEvents *event_handlers);
@ -72,24 +73,11 @@ public:
void Cancel() override; void Cancel() override;
void async_read_some(const MutableBuffers &buf, void async_read_some(const MutableBuffer &buf,
std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) override;
override {
event_handlers_->call("DN_read_req", "", "", buf.end() - buf.begin());
void async_write_some(const ConstBuffer &buf,
mutex_guard state_lock(state_lock_); std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) override;
conn_->async_read_some(buf, handler);
};
void async_write_some(const ConstBuffers &buf,
std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler)
override {
event_handlers_->call("DN_write_req", "", "", buf.end() - buf.begin());
mutex_guard state_lock(state_lock_);
conn_->async_write_some(buf, handler);
}
}; };
} }

View File

@ -36,10 +36,10 @@ FileHandle::~FileHandle() {}
FileHandleImpl::FileHandleImpl(const std::string & cluster_name, FileHandleImpl::FileHandleImpl(const std::string & cluster_name,
const std::string & path, const std::string & path,
::asio::io_service *io_service, const std::string &client_name, std::shared_ptr<IoService> io_service, const std::string &client_name,
const std::shared_ptr<const struct FileInfo> file_info, const std::shared_ptr<const struct FileInfo> file_info,
std::shared_ptr<BadDataNodeTracker> bad_data_nodes, std::shared_ptr<BadDataNodeTracker> bad_data_nodes,
std::shared_ptr<LibhdfsEvents> event_handlers) std::shared_ptr<LibhdfsEvents> event_handlers)
: cluster_name_(cluster_name), path_(path), io_service_(io_service), client_name_(client_name), file_info_(file_info), : cluster_name_(cluster_name), path_(path), io_service_(io_service), client_name_(client_name), file_info_(file_info),
bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()), event_handlers_(event_handlers), bytes_read_(0) { bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()), event_handlers_(event_handlers), bytes_read_(0) {
LOG_TRACE(kFileHandle, << "FileHandleImpl::FileHandleImpl(" LOG_TRACE(kFileHandle, << "FileHandleImpl::FileHandleImpl("
@ -167,7 +167,7 @@ bool FileHandleImpl::CheckSeekBounds(ssize_t desired_position) {
* on the FileHandle * on the FileHandle
*/ */
void FileHandleImpl::AsyncPreadSome( void FileHandleImpl::AsyncPreadSome(
size_t offset, const MutableBuffers &buffers, size_t offset, const MutableBuffer &buffer,
std::shared_ptr<NodeExclusionRule> excluded_nodes, std::shared_ptr<NodeExclusionRule> excluded_nodes,
const std::function<void(const Status &, const std::string &, size_t)> handler) { const std::function<void(const Status &, const std::string &, size_t)> handler) {
using ::hadoop::hdfs::DatanodeInfoProto; using ::hadoop::hdfs::DatanodeInfoProto;
@ -233,7 +233,7 @@ void FileHandleImpl::AsyncPreadSome(
uint64_t offset_within_block = offset - block->offset(); uint64_t offset_within_block = offset - block->offset();
uint64_t size_within_block = std::min<uint64_t>( uint64_t size_within_block = std::min<uint64_t>(
block->b().numbytes() - offset_within_block, asio::buffer_size(buffers)); block->b().numbytes() - offset_within_block, asio::buffer_size(buffer));
LOG_DEBUG(kFileHandle, << "FileHandleImpl::AsyncPreadSome(" LOG_DEBUG(kFileHandle, << "FileHandleImpl::AsyncPreadSome("
<< FMT_THIS_ADDR << "), ...) Datanode hostname=" << dnHostName << ", IP Address=" << dnIpAddr << FMT_THIS_ADDR << "), ...) Datanode hostname=" << dnHostName << ", IP Address=" << dnIpAddr
@ -268,7 +268,7 @@ void FileHandleImpl::AsyncPreadSome(
handler(status, dn_id, transferred); handler(status, dn_id, transferred);
}; };
auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name] auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block, buffer, reader, dn_id, client_name]
(Status status, std::shared_ptr<DataNodeConnection> dn) { (Status status, std::shared_ptr<DataNodeConnection> dn) {
(void)dn; (void)dn;
event_response event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0); event_response event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0);
@ -281,7 +281,7 @@ void FileHandleImpl::AsyncPreadSome(
if (status.ok()) { if (status.ok()) {
reader->AsyncReadBlock( reader->AsyncReadBlock(
client_name, *block, offset_within_block, client_name, *block, offset_within_block,
asio::buffer(buffers, size_within_block), read_handler); asio::buffer(buffer, size_within_block), read_handler);
} else { } else {
handler(status, dn_id, 0); handler(status, dn_id, 0);
} }
@ -307,7 +307,7 @@ std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReader
} }
std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection( std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection(
::asio::io_service * io_service, std::shared_ptr<IoService> io_service,
const ::hadoop::hdfs::DatanodeInfoProto & dn, const ::hadoop::hdfs::DatanodeInfoProto & dn,
const hadoop::common::TokenProto * token) { const hadoop::common::TokenProto * token) {
LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateDataNodeConnection(" LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateDataNodeConnection("

View File

@ -18,7 +18,7 @@
#ifndef LIBHDFSPP_LIB_FS_FILEHANDLE_H_ #ifndef LIBHDFSPP_LIB_FS_FILEHANDLE_H_
#define LIBHDFSPP_LIB_FS_FILEHANDLE_H_ #define LIBHDFSPP_LIB_FS_FILEHANDLE_H_
#include "common/hdfs_ioservice.h" #include "hdfspp/ioservice.h"
#include "common/async_stream.h" #include "common/async_stream.h"
#include "common/cancel_tracker.h" #include "common/cancel_tracker.h"
#include "common/libhdfs_events_impl.h" #include "common/libhdfs_events_impl.h"
@ -26,12 +26,10 @@
#include "reader/fileinfo.h" #include "reader/fileinfo.h"
#include "reader/readergroup.h" #include "reader/readergroup.h"
#include "asio.hpp"
#include "bad_datanode_tracker.h" #include "bad_datanode_tracker.h"
#include "ClientNamenodeProtocol.pb.h" #include "ClientNamenodeProtocol.pb.h"
#include <mutex> #include <mutex>
#include <iostream>
namespace hdfs { namespace hdfs {
@ -53,7 +51,7 @@ public:
MEMCHECKED_CLASS(FileHandleImpl) MEMCHECKED_CLASS(FileHandleImpl)
FileHandleImpl(const std::string & cluster_name, FileHandleImpl(const std::string & cluster_name,
const std::string & path, const std::string & path,
::asio::io_service *io_service, const std::string &client_name, std::shared_ptr<IoService> io_service, const std::string &client_name,
const std::shared_ptr<const struct FileInfo> file_info, const std::shared_ptr<const struct FileInfo> file_info,
std::shared_ptr<BadDataNodeTracker> bad_data_nodes, std::shared_ptr<BadDataNodeTracker> bad_data_nodes,
std::shared_ptr<LibhdfsEvents> event_handlers); std::shared_ptr<LibhdfsEvents> event_handlers);
@ -93,7 +91,7 @@ public:
* If trying to begin a read past the EOF, status will be Status::InvalidOffset. * If trying to begin a read past the EOF, status will be Status::InvalidOffset.
* *
*/ */
void AsyncPreadSome(size_t offset, const MutableBuffers &buffers, void AsyncPreadSome(size_t offset, const MutableBuffer &buffer,
std::shared_ptr<NodeExclusionRule> excluded_nodes, std::shared_ptr<NodeExclusionRule> excluded_nodes,
const std::function<void(const Status &status, const std::function<void(const Status &status,
const std::string &dn_id, size_t bytes_read)> handler); const std::string &dn_id, size_t bytes_read)> handler);
@ -124,13 +122,13 @@ protected:
std::shared_ptr<DataNodeConnection> dn, std::shared_ptr<DataNodeConnection> dn,
std::shared_ptr<hdfs::LibhdfsEvents> event_handlers); std::shared_ptr<hdfs::LibhdfsEvents> event_handlers);
virtual std::shared_ptr<DataNodeConnection> CreateDataNodeConnection( virtual std::shared_ptr<DataNodeConnection> CreateDataNodeConnection(
::asio::io_service *io_service, std::shared_ptr<IoService> io_service,
const ::hadoop::hdfs::DatanodeInfoProto & dn, const ::hadoop::hdfs::DatanodeInfoProto & dn,
const hadoop::common::TokenProto * token); const hadoop::common::TokenProto * token);
private: private:
const std::string cluster_name_; const std::string cluster_name_;
const std::string path_; const std::string path_;
::asio::io_service * const io_service_; std::shared_ptr<IoService> io_service_;
const std::string client_name_; const std::string client_name_;
const std::shared_ptr<const struct FileInfo> file_info_; const std::shared_ptr<const struct FileInfo> file_info_;
std::shared_ptr<BadDataNodeTracker> bad_node_tracker_; std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;

View File

@ -18,6 +18,7 @@
#include "filesystem.h" #include "filesystem.h"
#include "filehandle.h"
#include "common/namenode_info.h" #include "common/namenode_info.h"
#include <functional> #include <functional>
@ -104,6 +105,54 @@ FileSystem *FileSystem::New() {
* FILESYSTEM IMPLEMENTATION * FILESYSTEM IMPLEMENTATION
****************************************************************************/ ****************************************************************************/
struct FileSystemImpl::FindSharedState {
//Name pattern (can have wild-cards) to find
const std::string name;
//Maximum depth to recurse after the end of path is reached.
//Can be set to 0 for pure path globbing and ignoring name pattern entirely.
const uint32_t maxdepth;
//Vector of all sub-directories from the path argument (each can have wild-cards)
std::vector<std::string> dirs;
//Callback from Find
const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler;
//outstanding_requests is incremented once for every GetListing call.
std::atomic<uint64_t> outstanding_requests;
//Boolean needed to abort all recursion on error or on user command
std::atomic<bool> aborted;
//Shared variables will need protection with a lock
std::mutex lock;
FindSharedState(const std::string path_, const std::string name_, const uint32_t maxdepth_,
const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler_,
uint64_t outstanding_recuests_, bool aborted_)
: name(name_),
maxdepth(maxdepth_),
handler(handler_),
outstanding_requests(outstanding_recuests_),
aborted(aborted_),
lock() {
//Constructing the list of sub-directories
std::stringstream ss(path_);
if(path_.back() != '/'){
ss << "/";
}
for (std::string token; std::getline(ss, token, '/'); ) {
dirs.push_back(token);
}
}
};
struct FileSystemImpl::FindOperationalState {
const std::string path;
const uint32_t depth;
const bool search_path;
FindOperationalState(const std::string path_, const uint32_t depth_, const bool search_path_)
: path(path_),
depth(depth_),
search_path(search_path_) {
}
};
const std::string get_effective_user_name(const std::string &user_name) { const std::string get_effective_user_name(const std::string &user_name) {
if (!user_name.empty()) if (!user_name.empty())
return user_name; return user_name;
@ -134,10 +183,10 @@ const std::string get_effective_user_name(const std::string &user_name) {
} }
FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_name, const Options &options) : FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_name, const Options &options) :
io_service_(static_cast<IoServiceImpl *>(io_service)), options_(options), io_service_(io_service), options_(options),
client_name_(GetRandomClientName()), client_name_(GetRandomClientName()),
nn_( nn_(
&io_service_->io_service(), options, client_name_, io_service_, options, client_name_,
get_effective_user_name(user_name), kNamenodeProtocol, get_effective_user_name(user_name), kNamenodeProtocol,
kNamenodeProtocolVersion kNamenodeProtocolVersion
), ),
@ -166,10 +215,10 @@ FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_n
} }
FileSystemImpl::FileSystemImpl(std::shared_ptr<IoService> io_service, const std::string& user_name, const Options &options) : FileSystemImpl::FileSystemImpl(std::shared_ptr<IoService> io_service, const std::string& user_name, const Options &options) :
io_service_(std::static_pointer_cast<IoServiceImpl>(io_service)), options_(options), io_service_(io_service), options_(options),
client_name_(GetRandomClientName()), client_name_(GetRandomClientName()),
nn_( nn_(
&io_service_->io_service(), options, client_name_, io_service_, options, client_name_,
get_effective_user_name(user_name), kNamenodeProtocol, get_effective_user_name(user_name), kNamenodeProtocol,
kNamenodeProtocolVersion kNamenodeProtocolVersion
), ),
@ -178,7 +227,7 @@ FileSystemImpl::FileSystemImpl(std::shared_ptr<IoService> io_service, const std:
{ {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl(" LOG_DEBUG(kFileSystem, << "FileSystemImpl::FileSystemImpl("
<< FMT_THIS_ADDR << ", shared IoService@" << io_service_.get() << ") called"); << FMT_THIS_ADDR << ", shared IoService@" << io_service_.get() << ") called");
int worker_thread_count = io_service_->get_worker_thread_count(); int worker_thread_count = io_service_->GetWorkerThreadCount();
if(worker_thread_count < 1) { if(worker_thread_count < 1) {
LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl IoService provided doesn't have any worker threads. " LOG_WARN(kFileSystem, << "FileSystemImpl::FileSystemImpl IoService provided doesn't have any worker threads. "
<< "It needs at least 1 worker to connect to an HDFS cluster.") << "It needs at least 1 worker to connect to an HDFS cluster.")
@ -217,7 +266,7 @@ void FileSystemImpl::Connect(const std::string &server,
auto name_service = options_.services.find(server); auto name_service = options_.services.find(server);
if(name_service != options_.services.end()) { if(name_service != options_.services.end()) {
cluster_name_ = name_service->first; cluster_name_ = name_service->first;
resolved_namenodes = BulkResolve(&io_service_->io_service(), name_service->second); resolved_namenodes = BulkResolve(io_service_, name_service->second);
} else { } else {
cluster_name_ = server + ":" + service; cluster_name_ = server + ":" + service;
@ -230,7 +279,7 @@ void FileSystemImpl::Connect(const std::string &server,
handler(Status::Error(("Invalid namenode " + cluster_name_ + " in config").c_str()), this); handler(Status::Error(("Invalid namenode " + cluster_name_ + " in config").c_str()), this);
} }
resolved_namenodes = BulkResolve(&io_service_->io_service(), {tmp_info}); resolved_namenodes = BulkResolve(io_service_, {tmp_info});
} }
for(unsigned int i=0;i<resolved_namenodes.size();i++) { for(unsigned int i=0;i<resolved_namenodes.size();i++) {
@ -282,7 +331,7 @@ int FileSystemImpl::WorkerThreadCount() {
if(!io_service_) { if(!io_service_) {
return -1; return -1;
} else { } else {
return io_service_->get_worker_thread_count(); return io_service_->GetWorkerThreadCount();
} }
} }
@ -339,7 +388,7 @@ void FileSystemImpl::Open(
LOG_DEBUG(kFileSystem, << "Operation not allowed on standby datanode"); LOG_DEBUG(kFileSystem, << "Operation not allowed on standby datanode");
} }
} }
handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, &io_service_->io_service(), client_name_, file_info, bad_node_tracker_, event_handlers_) handler(stat, stat.ok() ? new FileHandleImpl(cluster_name_, path, io_service_, client_name_, file_info, bad_node_tracker_, event_handlers_)
: nullptr); : nullptr);
}); });
} }

View File

@ -18,19 +18,18 @@
#ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_ #ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_
#define LIBHDFSPP_LIB_FS_FILESYSTEM_H_ #define LIBHDFSPP_LIB_FS_FILESYSTEM_H_
#include "filehandle.h" #include "namenode_operations.h"
#include "hdfspp/hdfspp.h"
#include "fs/bad_datanode_tracker.h" #include "fs/bad_datanode_tracker.h"
#include "reader/block_reader.h" #include "hdfspp/hdfspp.h"
#include "reader/fileinfo.h" #include "reader/fileinfo.h"
#include "asio.hpp"
#include <thread> #include <thread>
#include "namenode_operations.h"
namespace hdfs { namespace hdfs {
class FileHandle;
/* /*
* FileSystem: The consumer's main point of interaction with the cluster as * FileSystem: The consumer's main point of interaction with the cluster as
* a whole. * a whole.
@ -48,6 +47,7 @@ public:
MEMCHECKED_CLASS(FileSystemImpl) MEMCHECKED_CLASS(FileSystemImpl)
typedef std::function<void(const Status &, FileSystem *)> ConnectCallback; typedef std::function<void(const Status &, FileSystem *)> ConnectCallback;
// Note: Longer term it'd be cleaner to take a rvalue reference to a shared_ptr to get ownership
explicit FileSystemImpl(IoService *&io_service, const std::string& user_name, const Options &options); explicit FileSystemImpl(IoService *&io_service, const std::string& user_name, const Options &options);
explicit FileSystemImpl(std::shared_ptr<IoService>, const std::string& user_name, const Options &options); explicit FileSystemImpl(std::shared_ptr<IoService>, const std::string& user_name, const Options &options);
~FileSystemImpl() override; ~FileSystemImpl() override;
@ -215,7 +215,7 @@ private:
* A side effect of this is that requests may outlive the RpcEngine they * A side effect of this is that requests may outlive the RpcEngine they
* reference. * reference.
**/ **/
std::shared_ptr<IoServiceImpl> io_service_; std::shared_ptr<IoService> io_service_;
const Options options_; const Options options_;
const std::string client_name_; const std::string client_name_;
std::string cluster_name_; std::string cluster_name_;
@ -234,53 +234,11 @@ private:
void GetListingShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more, void GetListingShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more,
std::string path, const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler); std::string path, const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler);
/**
struct FindSharedState { * Helper struct to store state for recursive find
//Name pattern (can have wild-cards) to find */
const std::string name; struct FindSharedState;
//Maximum depth to recurse after the end of path is reached. struct FindOperationalState;
//Can be set to 0 for pure path globbing and ignoring name pattern entirely.
const uint32_t maxdepth;
//Vector of all sub-directories from the path argument (each can have wild-cards)
std::vector<std::string> dirs;
//Callback from Find
const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler;
//outstanding_requests is incremented once for every GetListing call.
std::atomic<uint64_t> outstanding_requests;
//Boolean needed to abort all recursion on error or on user command
std::atomic<bool> aborted;
//Shared variables will need protection with a lock
std::mutex lock;
FindSharedState(const std::string path_, const std::string name_, const uint32_t maxdepth_,
const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler_,
uint64_t outstanding_recuests_, bool aborted_)
: name(name_),
maxdepth(maxdepth_),
handler(handler_),
outstanding_requests(outstanding_recuests_),
aborted(aborted_),
lock() {
//Constructing the list of sub-directories
std::stringstream ss(path_);
if(path_.back() != '/'){
ss << "/";
}
for (std::string token; std::getline(ss, token, '/'); ) {
dirs.push_back(token);
}
}
};
struct FindOperationalState {
const std::string path;
const uint32_t depth;
const bool search_path;
FindOperationalState(const std::string path_, const uint32_t depth_, const bool search_path_)
: path(path_),
depth(depth_),
search_path(search_path_) {
}
};
void FindShim(const Status &stat, const std::vector<StatInfo> & stat_infos, void FindShim(const Status &stat, const std::vector<StatInfo> & stat_infos,
bool directory_has_more, std::shared_ptr<FindOperationalState> current_state, std::shared_ptr<FindSharedState> shared_state); bool directory_has_more, std::shared_ptr<FindOperationalState> current_state, std::shared_ptr<FindSharedState> shared_state);

View File

@ -42,7 +42,7 @@ namespace hdfs {
class NameNodeOperations { class NameNodeOperations {
public: public:
MEMCHECKED_CLASS(NameNodeOperations) MEMCHECKED_CLASS(NameNodeOperations)
NameNodeOperations(::asio::io_service *io_service, const Options &options, NameNodeOperations(std::shared_ptr<IoService> io_service, const Options &options,
const std::string &client_name, const std::string &user_name, const std::string &client_name, const std::string &user_name,
const char *protocol_name, int protocol_version) : const char *protocol_name, int protocol_version) :
io_service_(io_service), io_service_(io_service),
@ -119,7 +119,7 @@ private:
static void DirectoryListingProtoToStatInfo(std::shared_ptr<std::vector<StatInfo>> stat_infos, const ::hadoop::hdfs::DirectoryListingProto & dl); static void DirectoryListingProtoToStatInfo(std::shared_ptr<std::vector<StatInfo>> stat_infos, const ::hadoop::hdfs::DirectoryListingProto & dl);
static void GetFsStatsResponseProtoToFsInfo(hdfs::FsInfo & fs_info, const std::shared_ptr<::hadoop::hdfs::GetFsStatsResponseProto> & fs); static void GetFsStatsResponseProtoToFsInfo(hdfs::FsInfo & fs_info, const std::shared_ptr<::hadoop::hdfs::GetFsStatsResponseProto> & fs);
::asio::io_service * io_service_; std::shared_ptr<IoService> io_service_;
// This is the only permanent owner of the RpcEngine, however the RPC layer // This is the only permanent owner of the RpcEngine, however the RPC layer
// needs to reference count it prevent races during FileSystem destruction. // needs to reference count it prevent races during FileSystem destruction.

View File

@ -431,7 +431,7 @@ private:
std::shared_ptr<DataNodeConnection> shared_conn_; std::shared_ptr<DataNodeConnection> shared_conn_;
}; };
void BlockReaderImpl::AsyncReadPacket(const MutableBuffers &buffers, void BlockReaderImpl::AsyncReadPacket(const MutableBuffer &buffer,
const std::function<void(const Status &, size_t bytes_transferred)> &handler) const std::function<void(const Status &, size_t bytes_transferred)> &handler)
{ {
assert(state_ != kOpen && "Not connected"); assert(state_ != kOpen && "Not connected");
@ -450,7 +450,7 @@ void BlockReaderImpl::AsyncReadPacket(const MutableBuffers &buffers,
.Push(new ReadChecksum(this)) .Push(new ReadChecksum(this))
.Push(new ReadPadding(this)) .Push(new ReadPadding(this))
.Push(new ReadData( .Push(new ReadData(
this, m->state().bytes_transferred, buffers)) this, m->state().bytes_transferred, buffer))
.Push(new AckRead(this)); .Push(new AckRead(this));
auto self = this->shared_from_this(); auto self = this->shared_from_this();
@ -460,14 +460,14 @@ void BlockReaderImpl::AsyncReadPacket(const MutableBuffers &buffers,
} }
size_t BlockReaderImpl::ReadPacket(const MutableBuffers &buffers, Status *status) size_t BlockReaderImpl::ReadPacket(const MutableBuffer &buffer, Status *status)
{ {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacket called"); LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacket called");
size_t transferred = 0; size_t transferred = 0;
auto done = std::make_shared<std::promise<void>>(); auto done = std::make_shared<std::promise<void>>();
auto future = done->get_future(); auto future = done->get_future();
AsyncReadPacket(buffers, AsyncReadPacket(buffer,
[status, &transferred, done](const Status &stat, size_t t) { [status, &transferred, done](const Status &stat, size_t t) {
*status = stat; *status = stat;
transferred = t; transferred = t;
@ -504,7 +504,7 @@ private:
struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation
{ {
ReadBlockContinuation(BlockReader *reader, MutableBuffers buffer, size_t *transferred) ReadBlockContinuation(BlockReader *reader, MutableBuffer buffer, size_t *transferred)
: reader_(reader), buffer_(buffer), buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {} : reader_(reader), buffer_(buffer), buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {}
virtual void Run(const Next &next) override { virtual void Run(const Next &next) override {
@ -517,7 +517,7 @@ struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation
private: private:
BlockReader *reader_; BlockReader *reader_;
const MutableBuffers buffer_; const MutableBuffer buffer_;
const size_t buffer_size_; const size_t buffer_size_;
size_t *transferred_; size_t *transferred_;
std::function<void(const Status &)> next_; std::function<void(const Status &)> next_;
@ -542,7 +542,7 @@ void BlockReaderImpl::AsyncReadBlock(
const std::string & client_name, const std::string & client_name,
const hadoop::hdfs::LocatedBlockProto &block, const hadoop::hdfs::LocatedBlockProto &block,
size_t offset, size_t offset,
const MutableBuffers &buffers, const MutableBuffer &buffer,
const std::function<void(const Status &, size_t)> handler) const std::function<void(const Status &, size_t)> handler)
{ {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadBlock(" LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadBlock("
@ -551,10 +551,10 @@ void BlockReaderImpl::AsyncReadBlock(
auto m = continuation::Pipeline<size_t>::Create(cancel_state_); auto m = continuation::Pipeline<size_t>::Create(cancel_state_);
size_t * bytesTransferred = &m->state(); size_t * bytesTransferred = &m->state();
size_t size = asio::buffer_size(buffers); size_t size = asio::buffer_size(buffer);
m->Push(new RequestBlockContinuation(this, client_name, &block.b(), size, offset)) m->Push(new RequestBlockContinuation(this, client_name, &block.b(), size, offset))
.Push(new ReadBlockContinuation(this, buffers, bytesTransferred)); .Push(new ReadBlockContinuation(this, buffer, bytesTransferred));
m->Run([handler] (const Status &status, const size_t totalBytesTransferred) { m->Run([handler] (const Status &status, const size_t totalBytesTransferred) {
handler(status, totalBytesTransferred); handler(status, totalBytesTransferred);

View File

@ -72,11 +72,11 @@ public:
virtual void AsyncReadBlock( virtual void AsyncReadBlock(
const std::string & client_name, const std::string & client_name,
const hadoop::hdfs::LocatedBlockProto &block, size_t offset, const hadoop::hdfs::LocatedBlockProto &block, size_t offset,
const MutableBuffers &buffers, const MutableBuffer &buffer,
const std::function<void(const Status &, size_t)> handler) = 0; const std::function<void(const Status &, size_t)> handler) = 0;
virtual void AsyncReadPacket( virtual void AsyncReadPacket(
const MutableBuffers &buffers, const MutableBuffer &buffer,
const std::function<void(const Status &, size_t bytes_transferred)> &handler) = 0; const std::function<void(const Status &, size_t bytes_transferred)> &handler) = 0;
virtual void AsyncRequestBlock( virtual void AsyncRequestBlock(
@ -98,7 +98,7 @@ public:
chunk_padding_bytes_(0), cancel_state_(cancel_state), event_handlers_(event_handlers.get()) {} chunk_padding_bytes_(0), cancel_state_(cancel_state), event_handlers_(event_handlers.get()) {}
virtual void AsyncReadPacket( virtual void AsyncReadPacket(
const MutableBuffers &buffers, const MutableBuffer &buffer,
const std::function<void(const Status &, size_t bytes_transferred)> &handler) override; const std::function<void(const Status &, size_t bytes_transferred)> &handler) override;
virtual void AsyncRequestBlock( virtual void AsyncRequestBlock(
@ -111,12 +111,12 @@ public:
virtual void AsyncReadBlock( virtual void AsyncReadBlock(
const std::string & client_name, const std::string & client_name,
const hadoop::hdfs::LocatedBlockProto &block, size_t offset, const hadoop::hdfs::LocatedBlockProto &block, size_t offset,
const MutableBuffers &buffers, const MutableBuffer &buffer,
const std::function<void(const Status &, size_t)> handler) override; const std::function<void(const Status &, size_t)> handler) override;
virtual void CancelOperation() override; virtual void CancelOperation() override;
size_t ReadPacket(const MutableBuffers &buffers, Status *status); size_t ReadPacket(const MutableBuffer &buffer, Status *status);
Status RequestBlock( Status RequestBlock(
const std::string &client_name, const std::string &client_name,

View File

@ -44,13 +44,13 @@ public:
template <class Handler> void Handshake(const Handler &next); template <class Handler> void Handshake(const Handler &next);
void async_read_some(const MutableBuffers &buf, void async_read_some(const MutableBuffer &buf,
std::function<void (const asio::error_code & error, std::function<void (const asio::error_code & error,
std::size_t bytes_transferred) > handler) override { std::size_t bytes_transferred) > handler) override {
stream_->async_read_some(buf, handler); stream_->async_read_some(buf, handler);
} }
void async_write_some(const ConstBuffers &buf, void async_write_some(const ConstBuffer &buf,
std::function<void (const asio::error_code & error, std::function<void (const asio::error_code & error,
std::size_t bytes_transferred) > handler) override { std::size_t bytes_transferred) > handler) override {
stream_->async_write_some(buf, handler); stream_->async_write_some(buf, handler);

View File

@ -35,7 +35,7 @@ static std::string format_endpoints(const std::vector<::asio::ip::tcp::endpoint>
} }
HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers, HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers,
::asio::io_service *ioservice, std::shared_ptr<IoService> ioservice,
std::shared_ptr<LibhdfsEvents> event_handlers) std::shared_ptr<LibhdfsEvents> event_handlers)
: enabled_(false), resolved_(false), : enabled_(false), resolved_(false),
ioservice_(ioservice), event_handlers_(event_handlers) ioservice_(ioservice), event_handlers_(event_handlers)

View File

@ -40,7 +40,7 @@ namespace hdfs {
class HANamenodeTracker { class HANamenodeTracker {
public: public:
HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers, HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers,
::asio::io_service *ioservice, std::shared_ptr<IoService> ioservice,
std::shared_ptr<LibhdfsEvents> event_handlers_); std::shared_ptr<LibhdfsEvents> event_handlers_);
virtual ~HANamenodeTracker(); virtual ~HANamenodeTracker();
@ -66,7 +66,7 @@ class HANamenodeTracker {
bool resolved_; bool resolved_;
// Keep service in case a second round of DNS lookup is required // Keep service in case a second round of DNS lookup is required
::asio::io_service *ioservice_; std::shared_ptr<IoService> ioservice_;
// Event handlers, for now this is the simplest place to catch all failover events // Event handlers, for now this is the simplest place to catch all failover events
// and push info out to client application. Possibly move into RPCEngine. // and push info out to client application. Possibly move into RPCEngine.

View File

@ -20,6 +20,7 @@
#include "request.h" #include "request.h"
#include "rpc_engine.h" #include "rpc_engine.h"
#include "sasl_protocol.h" #include "sasl_protocol.h"
#include "hdfspp/ioservice.h"
#include "RpcHeader.pb.h" #include "RpcHeader.pb.h"
#include "ProtobufRpcEngine.pb.h" #include "ProtobufRpcEngine.pb.h"
@ -118,7 +119,7 @@ Request::Request(std::shared_ptr<LockFreeRpcEngine> engine, const std::string &m
: engine_(engine), : engine_(engine),
method_name_(method_name), method_name_(method_name),
call_id_(call_id), call_id_(call_id),
timer_(engine->io_service()), timer_(engine->io_service()->GetRaw()),
handler_(std::move(handler)), handler_(std::move(handler)),
retry_count_(engine->retry_policy() ? 0 : kNoRetry), retry_count_(engine->retry_policy() ? 0 : kNoRetry),
failover_count_(0) failover_count_(0)
@ -129,7 +130,7 @@ Request::Request(std::shared_ptr<LockFreeRpcEngine> engine, const std::string &m
Request::Request(std::shared_ptr<LockFreeRpcEngine> engine, Handler &&handler) Request::Request(std::shared_ptr<LockFreeRpcEngine> engine, Handler &&handler)
: engine_(engine), : engine_(engine),
call_id_(-1/*Handshake ID*/), call_id_(-1/*Handshake ID*/),
timer_(engine->io_service()), timer_(engine->io_service()->GetRaw()),
handler_(std::move(handler)), handler_(std::move(handler)),
retry_count_(engine->retry_policy() ? 0 : kNoRetry), retry_count_(engine->retry_policy() ? 0 : kNoRetry),
failover_count_(0) { failover_count_(0) {

View File

@ -83,7 +83,7 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
void SetAuthInfo(const AuthInfo& auth_info); void SetAuthInfo(const AuthInfo& auth_info);
std::weak_ptr<LockFreeRpcEngine> engine() { return engine_; } std::weak_ptr<LockFreeRpcEngine> engine() { return engine_; }
::asio::io_service *GetIoService(); std::shared_ptr<IoService> GetIoService();
protected: protected:
struct Response { struct Response {

View File

@ -70,27 +70,27 @@ RpcConnection::RpcConnection(std::shared_ptr<LockFreeRpcEngine> engine)
: engine_(engine), : engine_(engine),
connected_(kNotYetConnected) {} connected_(kNotYetConnected) {}
::asio::io_service *RpcConnection::GetIoService() { std::shared_ptr<IoService> RpcConnection::GetIoService() {
std::shared_ptr<LockFreeRpcEngine> pinnedEngine = engine_.lock(); std::shared_ptr<LockFreeRpcEngine> pinnedEngine = engine_.lock();
if(!pinnedEngine) { if(!pinnedEngine) {
LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid RpcEngine"); LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid RpcEngine");
return nullptr; return nullptr;
} }
return &pinnedEngine->io_service(); return pinnedEngine->io_service();
} }
void RpcConnection::StartReading() { void RpcConnection::StartReading() {
auto shared_this = shared_from_this(); auto shared_this = shared_from_this();
::asio::io_service *service = GetIoService(); std::shared_ptr<IoService> service = GetIoService();
if(!service) { if(!service) {
LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid IoService"); LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid IoService");
return; return;
} }
service->post([shared_this, this] () { service->PostLambda(
OnRecvCompleted(::asio::error_code(), 0); [shared_this, this] () { OnRecvCompleted(::asio::error_code(), 0); }
}); );
} }
void RpcConnection::HandshakeComplete(const Status &s) { void RpcConnection::HandshakeComplete(const Status &s) {
@ -164,13 +164,14 @@ void RpcConnection::ContextComplete(const Status &s) {
void RpcConnection::AsyncFlushPendingRequests() { void RpcConnection::AsyncFlushPendingRequests() {
std::shared_ptr<RpcConnection> shared_this = shared_from_this(); std::shared_ptr<RpcConnection> shared_this = shared_from_this();
::asio::io_service *service = GetIoService(); std::shared_ptr<IoService> service = GetIoService();
if(!service) { if(!service) {
LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid IoService"); LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid IoService");
return; return;
} }
service->post([shared_this, this]() { std::function<void()> task = [shared_this, this]()
{
std::lock_guard<std::mutex> state_lock(connection_state_lock_); std::lock_guard<std::mutex> state_lock(connection_state_lock_);
LOG_TRACE(kRPC, << "RpcConnection::AsyncFlushPendingRequests called (connected=" << ToString(connected_) << ")"); LOG_TRACE(kRPC, << "RpcConnection::AsyncFlushPendingRequests called (connected=" << ToString(connected_) << ")");
@ -178,7 +179,10 @@ void RpcConnection::AsyncFlushPendingRequests() {
if (!outgoing_request_) { if (!outgoing_request_) {
FlushPendingRequests(); FlushPendingRequests();
} }
}); };
service->PostTask(task);
} }
Status RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) { Status RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) {
@ -228,15 +232,17 @@ Status RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) {
return status; return status;
} }
::asio::io_service *service = GetIoService(); std::shared_ptr<IoService> service = GetIoService();
if(!service) { if(!service) {
LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid IoService"); LOG_ERROR(kRPC, << "RpcConnection@" << this << " attempted to access invalid IoService");
return Status::Error("RpcConnection attempted to access invalid IoService"); return Status::Error("RpcConnection attempted to access invalid IoService");
} }
service->post([req, response, status]() { service->PostLambda(
req->OnResponseArrived(response->in.get(), status); // Never call back while holding a lock [req, response, status]() {
}); req->OnResponseArrived(response->in.get(), status); // Never call back while holding a lock
}
);
return Status::OK(); return Status::OK();
} }

View File

@ -26,6 +26,7 @@
#include "common/logging.h" #include "common/logging.h"
#include "common/util.h" #include "common/util.h"
#include "common/libhdfs_events_impl.h" #include "common/libhdfs_events_impl.h"
#include "hdfspp/ioservice.h"
#include <asio/connect.hpp> #include <asio/connect.hpp>
#include <asio/read.hpp> #include <asio/read.hpp>
@ -76,8 +77,8 @@ template <class Socket>
RpcConnectionImpl<Socket>::RpcConnectionImpl(std::shared_ptr<RpcEngine> engine) RpcConnectionImpl<Socket>::RpcConnectionImpl(std::shared_ptr<RpcEngine> engine)
: RpcConnection(engine), : RpcConnection(engine),
options_(engine->options()), options_(engine->options()),
socket_(engine->io_service()), socket_(engine->io_service()->GetRaw()),
connect_timer_(engine->io_service()) connect_timer_(engine->io_service()->GetRaw())
{ {
LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this); LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this);
} }
@ -353,7 +354,7 @@ void RpcConnectionImpl<Socket>::FlushPendingRequests() {
OnSendCompleted(ec, size); OnSendCompleted(ec, size);
}); });
} else { // Nothing to send for this request, inform the handler immediately } else { // Nothing to send for this request, inform the handler immediately
::asio::io_service *service = GetIoService(); std::shared_ptr<IoService> service = GetIoService();
if(!service) { if(!service) {
LOG_ERROR(kRPC, << "RpcConnectionImpl@" << this << " attempted to access null IoService"); LOG_ERROR(kRPC, << "RpcConnectionImpl@" << this << " attempted to access null IoService");
// No easy way to bail out of this context, but the only way to get here is when // No easy way to bail out of this context, but the only way to get here is when
@ -361,7 +362,7 @@ void RpcConnectionImpl<Socket>::FlushPendingRequests() {
return; return;
} }
service->post( service->PostTask(
// Never hold locks when calling a callback // Never hold locks when calling a callback
[req]() { req->OnResponseArrived(nullptr, Status::OK()); } [req]() { req->OnResponseArrived(nullptr, Status::OK()); }
); );

View File

@ -30,7 +30,7 @@ template <class T>
using optional = std::experimental::optional<T>; using optional = std::experimental::optional<T>;
RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options, RpcEngine::RpcEngine(std::shared_ptr<IoService> io_service, const Options &options,
const std::string &client_name, const std::string &user_name, const std::string &client_name, const std::string &user_name,
const char *protocol_name, int protocol_version) const char *protocol_name, int protocol_version)
: io_service_(io_service), : io_service_(io_service),
@ -40,7 +40,7 @@ RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
protocol_name_(protocol_name), protocol_name_(protocol_name),
protocol_version_(protocol_version), protocol_version_(protocol_version),
call_id_(0), call_id_(0),
retry_timer(*io_service), retry_timer(io_service->GetRaw()),
event_handlers_(std::make_shared<LibhdfsEvents>()), event_handlers_(std::make_shared<LibhdfsEvents>()),
connect_canceled_(false) connect_canceled_(false)
{ {
@ -86,7 +86,7 @@ bool RpcEngine::CancelPendingConnect() {
void RpcEngine::Shutdown() { void RpcEngine::Shutdown() {
LOG_DEBUG(kRPC, << "RpcEngine::Shutdown called"); LOG_DEBUG(kRPC, << "RpcEngine::Shutdown called");
io_service_->post([this]() { io_service_->PostLambda([this]() {
std::lock_guard<std::mutex> state_lock(engine_state_lock_); std::lock_guard<std::mutex> state_lock(engine_state_lock_);
conn_.reset(); conn_.reset();
}); });
@ -154,7 +154,7 @@ void RpcEngine::AsyncRpc(
// In case user-side code isn't checking the status of Connect before doing RPC // In case user-side code isn't checking the status of Connect before doing RPC
if(connect_canceled_) { if(connect_canceled_) {
io_service_->post( io_service_->PostLambda(
[handler](){ handler(Status::Canceled()); } [handler](){ handler(Status::Canceled()); }
); );
return; return;
@ -190,7 +190,7 @@ void RpcEngine::AsyncRpcCommsError(
std::vector<std::shared_ptr<Request>> pendingRequests) { std::vector<std::shared_ptr<Request>> pendingRequests) {
LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called; status=\"" << status.ToString() << "\" conn=" << failedConnection.get() << " reqs=" << std::to_string(pendingRequests.size())); LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called; status=\"" << status.ToString() << "\" conn=" << failedConnection.get() << " reqs=" << std::to_string(pendingRequests.size()));
io_service().post([this, status, failedConnection, pendingRequests]() { io_service_->PostLambda([this, status, failedConnection, pendingRequests]() {
RpcCommsError(status, failedConnection, pendingRequests); RpcCommsError(status, failedConnection, pendingRequests);
}); });
} }
@ -238,7 +238,7 @@ void RpcEngine::RpcCommsError(
// on. There might be a good argument for caching the first error // on. There might be a good argument for caching the first error
// rather than the last one, that gets messy // rather than the last one, that gets messy
io_service().post([req, status]() { io_service()->PostLambda([req, status]() {
req->OnResponseArrived(nullptr, status); // Never call back while holding a lock req->OnResponseArrived(nullptr, status); // Never call back while holding a lock
}); });
it = pendingRequests.erase(it); it = pendingRequests.erase(it);
@ -283,7 +283,7 @@ void RpcEngine::RpcCommsError(
for(unsigned int i=0; i<pendingRequests.size(); i++) { for(unsigned int i=0; i<pendingRequests.size(); i++) {
std::shared_ptr<Request> sharedCurrentRequest = pendingRequests[i]; std::shared_ptr<Request> sharedCurrentRequest = pendingRequests[i];
io_service().post([sharedCurrentRequest, badEndpointStatus]() { io_service()->PostLambda([sharedCurrentRequest, badEndpointStatus]() {
sharedCurrentRequest->OnResponseArrived(nullptr, badEndpointStatus); // Never call back while holding a lock sharedCurrentRequest->OnResponseArrived(nullptr, badEndpointStatus); // Never call back while holding a lock
}); });
} }

View File

@ -60,6 +60,7 @@ class RpcConnection;
class SaslProtocol; class SaslProtocol;
class RpcConnection; class RpcConnection;
class Request; class Request;
class IoService;
/* /*
* These methods of the RpcEngine will never acquire locks, and are safe for * These methods of the RpcEngine will never acquire locks, and are safe for
@ -83,7 +84,7 @@ public:
virtual const std::string &user_name() = 0; virtual const std::string &user_name() = 0;
virtual const std::string &protocol_name() = 0; virtual const std::string &protocol_name() = 0;
virtual int protocol_version() = 0; virtual int protocol_version() = 0;
virtual ::asio::io_service &io_service() = 0; virtual std::shared_ptr<IoService> io_service() const = 0;
virtual const Options &options() = 0; virtual const Options &options() = 0;
}; };
@ -107,7 +108,7 @@ class RpcEngine : public LockFreeRpcEngine, public std::enable_shared_from_this<
kCallIdSasl = -33 kCallIdSasl = -33
}; };
RpcEngine(::asio::io_service *io_service, const Options &options, RpcEngine(std::shared_ptr<IoService> service, const Options &options,
const std::string &client_name, const std::string &user_name, const std::string &client_name, const std::string &user_name,
const char *protocol_name, int protocol_version); const char *protocol_name, int protocol_version);
@ -145,7 +146,7 @@ class RpcEngine : public LockFreeRpcEngine, public std::enable_shared_from_this<
const std::string &user_name() override { return auth_info_.getUser(); } const std::string &user_name() override { return auth_info_.getUser(); }
const std::string &protocol_name() override { return protocol_name_; } const std::string &protocol_name() override { return protocol_name_; }
int protocol_version() override { return protocol_version_; } int protocol_version() override { return protocol_version_; }
::asio::io_service &io_service() override { return *io_service_; } std::shared_ptr<IoService> io_service() const override { return io_service_; }
const Options &options() override { return options_; } const Options &options() override { return options_; }
static std::string GetRandomClientName(); static std::string GetRandomClientName();
@ -162,7 +163,7 @@ protected:
std::vector<::asio::ip::tcp::endpoint> last_endpoints_; std::vector<::asio::ip::tcp::endpoint> last_endpoints_;
private: private:
::asio::io_service * const io_service_; mutable std::shared_ptr<IoService> io_service_;
const Options options_; const Options options_;
const std::string client_name_; const std::string client_name_;
const std::string client_id_; const std::string client_id_;

View File

@ -16,11 +16,12 @@
* limitations under the License. * limitations under the License.
*/ */
#include "fs/filesystem.h"
#include "fs/bad_datanode_tracker.h"
#include "common/libhdfs_events_impl.h" #include "common/libhdfs_events_impl.h"
#include "common/util.h" #include "common/util.h"
#include "fs/filesystem.h"
#include "fs/filehandle.h"
#include "fs/bad_datanode_tracker.h"
#include "reader/block_reader.h"
#include <gmock/gmock.h> #include <gmock/gmock.h>
@ -54,7 +55,7 @@ public:
const std::string & client_name, const std::string & client_name,
const hadoop::hdfs::LocatedBlockProto &block, const hadoop::hdfs::LocatedBlockProto &block,
size_t offset, size_t offset,
const MutableBuffers &buffers, const MutableBuffer &buffer,
const std::function<void(const Status &, size_t)> handler)); const std::function<void(const Status &, size_t)> handler));
virtual void CancelOperation() override { virtual void CancelOperation() override {
@ -67,14 +68,14 @@ class MockDNConnection : public DataNodeConnection, public std::enable_shared_fr
handler(Status::OK(), shared_from_this()); handler(Status::OK(), shared_from_this());
} }
void async_read_some(const MutableBuffers &buf, void async_read_some(const MutableBuffer &buf,
std::function<void (const asio::error_code & error, std::function<void (const asio::error_code & error,
std::size_t bytes_transferred) > handler) override { std::size_t bytes_transferred) > handler) override {
(void)buf; (void)buf;
handler(asio::error::fault, 0); handler(asio::error::fault, 0);
} }
void async_write_some(const ConstBuffers &buf, void async_write_some(const ConstBuffer &buf,
std::function<void (const asio::error_code & error, std::function<void (const asio::error_code & error,
std::size_t bytes_transferred) > handler) override { std::size_t bytes_transferred) > handler) override {
(void)buf; (void)buf;
@ -101,7 +102,7 @@ protected:
return mock_reader_; return mock_reader_;
} }
std::shared_ptr<DataNodeConnection> CreateDataNodeConnection( std::shared_ptr<DataNodeConnection> CreateDataNodeConnection(
::asio::io_service *io_service, std::shared_ptr<IoService> io_service,
const ::hadoop::hdfs::DatanodeInfoProto & dn, const ::hadoop::hdfs::DatanodeInfoProto & dn,
const hadoop::common::TokenProto * token) override { const hadoop::common::TokenProto * token) override {
(void) io_service; (void) dn; (void) token; (void) io_service; (void) dn; (void) token;
@ -130,12 +131,12 @@ TEST(BadDataNodeTest, TestNoNodes) {
char buf[4096] = { char buf[4096] = {
0, 0,
}; };
IoServiceImpl io_service; std::shared_ptr<IoService> io_service = IoService::MakeShared();
auto bad_node_tracker = std::make_shared<BadDataNodeTracker>(); auto bad_node_tracker = std::make_shared<BadDataNodeTracker>();
auto monitors = std::make_shared<LibhdfsEvents>(); auto monitors = std::make_shared<LibhdfsEvents>();
bad_node_tracker->AddBadNode("foo"); bad_node_tracker->AddBadNode("foo");
PartialMockFileHandle is("cluster", "file", &io_service.io_service(), GetRandomClientName(), file_info, bad_node_tracker, monitors); PartialMockFileHandle is("cluster", "file", io_service, GetRandomClientName(), file_info, bad_node_tracker, monitors);
Status stat; Status stat;
size_t read = 0; size_t read = 0;
@ -170,7 +171,7 @@ TEST(BadDataNodeTest, NNEventCallback) {
char buf[4096] = { char buf[4096] = {
0, 0,
}; };
IoServiceImpl io_service; std::shared_ptr<IoService> io_service = IoService::MakeShared();
auto tracker = std::make_shared<BadDataNodeTracker>(); auto tracker = std::make_shared<BadDataNodeTracker>();
@ -191,7 +192,7 @@ TEST(BadDataNodeTest, NNEventCallback) {
return event_response::make_ok(); return event_response::make_ok();
}); });
PartialMockFileHandle is("cluster", "file", &io_service.io_service(), GetRandomClientName(), file_info, tracker, monitors); PartialMockFileHandle is("cluster", "file", io_service, GetRandomClientName(), file_info, tracker, monitors);
Status stat; Status stat;
size_t read = 0; size_t read = 0;
@ -234,10 +235,10 @@ TEST(BadDataNodeTest, RecoverableError) {
char buf[4096] = { char buf[4096] = {
0, 0,
}; };
IoServiceImpl io_service; std::shared_ptr<IoService> io_service = IoService::MakeShared();
auto tracker = std::make_shared<BadDataNodeTracker>(); auto tracker = std::make_shared<BadDataNodeTracker>();
auto monitors = std::make_shared<LibhdfsEvents>(); auto monitors = std::make_shared<LibhdfsEvents>();
PartialMockFileHandle is("cluster", "file", &io_service.io_service(), GetRandomClientName(), file_info, tracker, monitors); PartialMockFileHandle is("cluster", "file", io_service, GetRandomClientName(), file_info, tracker, monitors);
Status stat; Status stat;
size_t read = 0; size_t read = 0;
EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_)) EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))
@ -285,10 +286,10 @@ TEST(BadDataNodeTest, InternalError) {
char buf[4096] = { char buf[4096] = {
0, 0,
}; };
IoServiceImpl io_service; std::shared_ptr<IoService> io_service = IoService::MakeShared();
auto tracker = std::make_shared<BadDataNodeTracker>(); auto tracker = std::make_shared<BadDataNodeTracker>();
auto monitors = std::make_shared<LibhdfsEvents>(); auto monitors = std::make_shared<LibhdfsEvents>();
PartialMockFileHandle is("cluster", "file", &io_service.io_service(), GetRandomClientName(), file_info, tracker, monitors); PartialMockFileHandle is("cluster", "file", io_service, GetRandomClientName(), file_info, tracker, monitors);
Status stat; Status stat;
size_t read = 0; size_t read = 0;
EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_)) EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))

View File

@ -16,13 +16,15 @@
* limitations under the License. * limitations under the License.
*/ */
#include "common/hdfs_ioservice.h" #include "hdfspp/ioservice.h"
#include <future> #include <future>
#include <functional> #include <functional>
#include <thread> #include <thread>
#include <string> #include <string>
#include <google/protobuf/stubs/common.h>
#include <gmock/gmock.h> #include <gmock/gmock.h>
using ::testing::_; using ::testing::_;
@ -34,7 +36,7 @@ using namespace hdfs;
// Make sure IoService spins up specified number of threads // Make sure IoService spins up specified number of threads
TEST(IoServiceTest, InitThreads) { TEST(IoServiceTest, InitThreads) {
#ifndef DISABLE_CONCURRENT_WORKERS #ifndef DISABLE_CONCURRENT_WORKERS
std::shared_ptr<IoServiceImpl> service = std::static_pointer_cast<IoServiceImpl>(IoService::MakeShared()); std::shared_ptr<IoService> service = IoService::MakeShared();
EXPECT_NE(service, nullptr); EXPECT_NE(service, nullptr);
unsigned int thread_count = 4; unsigned int thread_count = 4;
@ -50,7 +52,7 @@ TEST(IoServiceTest, InitThreads) {
// Make sure IoService defaults to logical thread count // Make sure IoService defaults to logical thread count
TEST(IoServiceTest, InitDefaultThreads) { TEST(IoServiceTest, InitDefaultThreads) {
#ifndef DISABLE_CONCURRENT_WORKERS #ifndef DISABLE_CONCURRENT_WORKERS
std::shared_ptr<IoServiceImpl> service = std::static_pointer_cast<IoServiceImpl>(IoService::MakeShared()); std::shared_ptr<IoService> service = IoService::MakeShared();
EXPECT_NE(service, nullptr); EXPECT_NE(service, nullptr);
unsigned int thread_count = std::thread::hardware_concurrency(); unsigned int thread_count = std::thread::hardware_concurrency();
@ -66,7 +68,7 @@ TEST(IoServiceTest, InitDefaultThreads) {
// Check IoService::PostTask // Check IoService::PostTask
TEST(IoServiceTest, SimplePost) { TEST(IoServiceTest, SimplePost) {
std::shared_ptr<IoServiceImpl> service = std::static_pointer_cast<IoServiceImpl>(IoService::MakeShared()); std::shared_ptr<IoService> service = IoService::MakeShared();
EXPECT_NE(service, nullptr); EXPECT_NE(service, nullptr);
unsigned int thread_count = std::thread::hardware_concurrency(); unsigned int thread_count = std::thread::hardware_concurrency();

View File

@ -49,7 +49,7 @@ public:
virtual ~MockConnectionBase(); virtual ~MockConnectionBase();
typedef std::pair<asio::error_code, std::string> ProducerResult; typedef std::pair<asio::error_code, std::string> ProducerResult;
void async_read_some(const MutableBuffers &buf, void async_read_some(const MutableBuffer &buf,
std::function<void (const asio::error_code & error, std::function<void (const asio::error_code & error,
std::size_t bytes_transferred) > handler) override { std::size_t bytes_transferred) > handler) override {
if (produced_.size() == 0) { if (produced_.size() == 0) {
@ -72,7 +72,7 @@ public:
io_service_->post(std::bind(handler, asio::error_code(), len)); io_service_->post(std::bind(handler, asio::error_code(), len));
} }
void async_write_some(const ConstBuffers &buf, void async_write_some(const ConstBuffer &buf,
std::function<void (const asio::error_code & error, std::function<void (const asio::error_code & error,
std::size_t bytes_transferred) > handler) override { std::size_t bytes_transferred) > handler) override {
// CompletionResult res = OnWrite(buf); // CompletionResult res = OnWrite(buf);

View File

@ -69,14 +69,14 @@ public:
/* event handler to trigger side effects */ /* event handler to trigger side effects */
std::function<void(void)> OnRead; std::function<void(void)> OnRead;
void async_read_some(const MutableBuffers &buf, void async_read_some(const MutableBuffer &buf,
std::function<void (const asio::error_code & error, std::function<void (const asio::error_code & error,
std::size_t bytes_transferred) > handler) override { std::size_t bytes_transferred) > handler) override {
this->OnRead(); this->OnRead();
this->MockConnectionBase::async_read_some(buf, handler); this->MockConnectionBase::async_read_some(buf, handler);
} }
void async_write_some(const ConstBuffers &buf, void async_write_some(const ConstBuffer &buf,
std::function<void (const asio::error_code & error, std::function<void (const asio::error_code & error,
std::size_t bytes_transferred) > handler) override { std::size_t bytes_transferred) > handler) override {
this->MockConnectionBase::async_write_some(buf, handler); this->MockConnectionBase::async_write_some(buf, handler);

View File

@ -16,6 +16,8 @@
* limitations under the License. * limitations under the License.
*/ */
#include "hdfspp/ioservice.h"
#include "mock_connection.h" #include "mock_connection.h"
#include "test.pb.h" #include "test.pb.h"
#include "RpcHeader.pb.h" #include "RpcHeader.pb.h"
@ -23,7 +25,6 @@
#include "common/namenode_info.h" #include "common/namenode_info.h"
#include <google/protobuf/io/coded_stream.h> #include <google/protobuf/io/coded_stream.h>
#include <gmock/gmock.h> #include <gmock/gmock.h>
using ::hadoop::common::RpcResponseHeaderProto; using ::hadoop::common::RpcResponseHeaderProto;
@ -104,9 +105,10 @@ static inline std::pair<error_code, string> RpcResponse(
using namespace hdfs; using namespace hdfs;
TEST(RpcEngineTest, TestRoundTrip) { TEST(RpcEngineTest, TestRoundTrip) {
::asio::io_service io_service;
std::shared_ptr<IoService> io_service = IoService::MakeShared();
Options options; Options options;
std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(&io_service, options, "foo", "", "protocol", 1); std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(io_service, options, "foo", "", "protocol", 1);
auto conn = auto conn =
std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine); std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine);
conn->TEST_set_connected(true); conn->TEST_set_connected(true);
@ -129,20 +131,20 @@ TEST(RpcEngineTest, TestRoundTrip) {
EchoRequestProto req; EchoRequestProto req;
req.set_message("foo"); req.set_message("foo");
std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
engine->AsyncRpc("test", &req, resp, [resp, &complete,&io_service](const Status &stat) { engine->AsyncRpc("test", &req, resp, [resp, &complete,io_service](const Status &stat) {
ASSERT_TRUE(stat.ok()); ASSERT_TRUE(stat.ok());
ASSERT_EQ("foo", resp->message()); ASSERT_EQ("foo", resp->message());
complete = true; complete = true;
io_service.stop(); io_service->Stop();
}); });
io_service.run(); io_service->Run();
ASSERT_TRUE(complete); ASSERT_TRUE(complete);
} }
TEST(RpcEngineTest, TestConnectionResetAndFail) { TEST(RpcEngineTest, TestConnectionResetAndFail) {
::asio::io_service io_service; std::shared_ptr<IoService> io_service = IoService::MakeShared();
Options options; Options options;
std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(&io_service, options, "foo", "", "protocol", 1); std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(io_service, options, "foo", "", "protocol", 1);
auto conn = auto conn =
std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine); std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine);
conn->TEST_set_connected(true); conn->TEST_set_connected(true);
@ -164,23 +166,23 @@ TEST(RpcEngineTest, TestConnectionResetAndFail) {
req.set_message("foo"); req.set_message("foo");
std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
engine->AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) { engine->AsyncRpc("test", &req, resp, [&complete, io_service](const Status &stat) {
complete = true; complete = true;
io_service.stop(); io_service->Stop();
ASSERT_FALSE(stat.ok()); ASSERT_FALSE(stat.ok());
}); });
io_service.run(); io_service->Run();
ASSERT_TRUE(complete); ASSERT_TRUE(complete);
} }
TEST(RpcEngineTest, TestConnectionResetAndRecover) { TEST(RpcEngineTest, TestConnectionResetAndRecover) {
::asio::io_service io_service; std::shared_ptr<IoService> io_service = IoService::MakeShared();
Options options; Options options;
options.max_rpc_retries = 1; options.max_rpc_retries = 1;
options.rpc_retry_delay_ms = 0; options.rpc_retry_delay_ms = 0;
std::shared_ptr<SharedConnectionEngine> engine std::shared_ptr<SharedConnectionEngine> engine
= std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); = std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1);
// Normally determined during RpcEngine::Connect, but in this case options // Normally determined during RpcEngine::Connect, but in this case options
// provides enough info to determine policy here. // provides enough info to determine policy here.
@ -206,22 +208,22 @@ TEST(RpcEngineTest, TestConnectionResetAndRecover) {
req.set_message("foo"); req.set_message("foo");
std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
engine->AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) { engine->AsyncRpc("test", &req, resp, [&complete, io_service](const Status &stat) {
complete = true; complete = true;
io_service.stop(); io_service->Stop();
ASSERT_TRUE(stat.ok()); ASSERT_TRUE(stat.ok());
}); });
io_service.run(); io_service->Run();
ASSERT_TRUE(complete); ASSERT_TRUE(complete);
} }
TEST(RpcEngineTest, TestConnectionResetAndRecoverWithDelay) { TEST(RpcEngineTest, TestConnectionResetAndRecoverWithDelay) {
::asio::io_service io_service; std::shared_ptr<IoService> io_service = IoService::MakeShared();
Options options; Options options;
options.max_rpc_retries = 1; options.max_rpc_retries = 1;
options.rpc_retry_delay_ms = 1; options.rpc_retry_delay_ms = 1;
std::shared_ptr<SharedConnectionEngine> engine = std::shared_ptr<SharedConnectionEngine> engine =
std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1);
// Normally determined during RpcEngine::Connect, but in this case options // Normally determined during RpcEngine::Connect, but in this case options
// provides enough info to determine policy here. // provides enough info to determine policy here.
@ -246,17 +248,17 @@ TEST(RpcEngineTest, TestConnectionResetAndRecoverWithDelay) {
req.set_message("foo"); req.set_message("foo");
std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
engine->AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) { engine->AsyncRpc("test", &req, resp, [&complete, io_service](const Status &stat) {
complete = true; complete = true;
io_service.stop(); io_service->Stop();
ASSERT_TRUE(stat.ok()); ASSERT_TRUE(stat.ok());
}); });
::asio::deadline_timer timer(io_service); ::asio::deadline_timer timer(io_service->GetRaw());
timer.expires_from_now(std::chrono::hours(100)); timer.expires_from_now(std::chrono::hours(100));
timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); }); timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); });
io_service.run(); io_service->Run();
ASSERT_TRUE(complete); ASSERT_TRUE(complete);
} }
@ -267,7 +269,7 @@ TEST(RpcEngineTest, TestConnectionFailure)
SharedMockConnection::SetSharedConnectionData(producer); SharedMockConnection::SetSharedConnectionData(producer);
// Error and no retry // Error and no retry
::asio::io_service io_service; std::shared_ptr<IoService> io_service = IoService::MakeShared();
bool complete = false; bool complete = false;
@ -275,16 +277,16 @@ TEST(RpcEngineTest, TestConnectionFailure)
options.max_rpc_retries = 0; options.max_rpc_retries = 0;
options.rpc_retry_delay_ms = 0; options.rpc_retry_delay_ms = 0;
std::shared_ptr<SharedConnectionEngine> engine std::shared_ptr<SharedConnectionEngine> engine
= std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); = std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1);
EXPECT_CALL(*producer, Produce()) EXPECT_CALL(*producer, Produce())
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))); .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")));
engine->Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) { engine->Connect("", make_endpoint(), [&complete, io_service](const Status &stat) {
complete = true; complete = true;
io_service.stop(); io_service->Stop();
ASSERT_FALSE(stat.ok()); ASSERT_FALSE(stat.ok());
}); });
io_service.run(); io_service->Run();
ASSERT_TRUE(complete); ASSERT_TRUE(complete);
} }
@ -294,7 +296,7 @@ TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure)
producer->checkProducerForConnect = true; producer->checkProducerForConnect = true;
SharedMockConnection::SetSharedConnectionData(producer); SharedMockConnection::SetSharedConnectionData(producer);
::asio::io_service io_service; std::shared_ptr<IoService> io_service = IoService::MakeShared();
bool complete = false; bool complete = false;
@ -302,18 +304,18 @@ TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure)
options.max_rpc_retries = 2; options.max_rpc_retries = 2;
options.rpc_retry_delay_ms = 0; options.rpc_retry_delay_ms = 0;
std::shared_ptr<SharedConnectionEngine> engine = std::shared_ptr<SharedConnectionEngine> engine =
std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1);
EXPECT_CALL(*producer, Produce()) EXPECT_CALL(*producer, Produce())
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))) .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))) .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))); .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")));
engine->Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) { engine->Connect("", make_endpoint(), [&complete, io_service](const Status &stat) {
complete = true; complete = true;
io_service.stop(); io_service->Stop();
ASSERT_FALSE(stat.ok()); ASSERT_FALSE(stat.ok());
}); });
io_service.run(); io_service->Run();
ASSERT_TRUE(complete); ASSERT_TRUE(complete);
} }
@ -323,7 +325,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover)
producer->checkProducerForConnect = true; producer->checkProducerForConnect = true;
SharedMockConnection::SetSharedConnectionData(producer); SharedMockConnection::SetSharedConnectionData(producer);
::asio::io_service io_service; std::shared_ptr<IoService> io_service = IoService::MakeShared();
bool complete = false; bool complete = false;
@ -331,29 +333,30 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover)
options.max_rpc_retries = 1; options.max_rpc_retries = 1;
options.rpc_retry_delay_ms = 0; options.rpc_retry_delay_ms = 0;
std::shared_ptr<SharedConnectionEngine> engine = std::shared_ptr<SharedConnectionEngine> engine =
std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1);
EXPECT_CALL(*producer, Produce()) EXPECT_CALL(*producer, Produce())
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))) .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
.WillOnce(Return(std::make_pair(::asio::error_code(), ""))) .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
.WillOnce(Return(std::make_pair(::asio::error::would_block, ""))); .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
engine->Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) { engine->Connect("", make_endpoint(), [&complete, io_service](const Status &stat) {
complete = true; complete = true;
io_service.stop(); io_service->Stop();
ASSERT_TRUE(stat.ok()); ASSERT_TRUE(stat.ok());
}); });
io_service.run(); io_service->Run();
ASSERT_TRUE(complete); ASSERT_TRUE(complete);
} }
TEST(RpcEngineTest, TestEventCallbacks) TEST(RpcEngineTest, TestEventCallbacks)
{ {
::asio::io_service io_service; std::shared_ptr<IoService> io_service = IoService::MakeShared();
Options options; Options options;
options.max_rpc_retries = 99; options.max_rpc_retries = 99;
options.rpc_retry_delay_ms = 0; options.rpc_retry_delay_ms = 0;
std::shared_ptr<SharedConnectionEngine> engine = std::shared_ptr<SharedConnectionEngine> engine =
std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1);
// Normally determined during RpcEngine::Connect, but in this case options // Normally determined during RpcEngine::Connect, but in this case options
// provides enough info to determine policy here. // provides enough info to determine policy here.
@ -399,17 +402,18 @@ TEST(RpcEngineTest, TestEventCallbacks)
std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
bool complete = false; bool complete = false;
engine->AsyncRpc("test", &req, resp, [&complete, &io_service](const Status &stat) { engine->AsyncRpc("test", &req, resp, [&complete, io_service](const Status &stat) {
complete = true; complete = true;
io_service.stop(); io_service->Stop();
ASSERT_TRUE(stat.ok()); ASSERT_TRUE(stat.ok());
}); });
// If you're adding event hooks you'll most likely need to update this. // If you're adding event hooks you'll most likely need to update this.
// It's a brittle test but makes it hard to miss control flow changes in RPC retry. // It's a brittle test but makes it hard to miss control flow changes in RPC retry.
for(const auto& m : callbacks) for(const auto& m : callbacks) {
std::cerr << m << std::endl; std::cerr << m << std::endl;
io_service.run(); }
io_service->Run();
ASSERT_TRUE(complete); ASSERT_TRUE(complete);
ASSERT_EQ(9, callbacks.size()); ASSERT_EQ(9, callbacks.size());
ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error
@ -430,7 +434,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover)
producer->checkProducerForConnect = true; producer->checkProducerForConnect = true;
SharedMockConnection::SetSharedConnectionData(producer); SharedMockConnection::SetSharedConnectionData(producer);
::asio::io_service io_service; std::shared_ptr<IoService> io_service = IoService::MakeShared();
bool complete = false; bool complete = false;
@ -438,31 +442,31 @@ TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover)
options.max_rpc_retries = 1; options.max_rpc_retries = 1;
options.rpc_retry_delay_ms = 1; options.rpc_retry_delay_ms = 1;
std::shared_ptr<SharedConnectionEngine> engine = std::shared_ptr<SharedConnectionEngine> engine =
std::make_shared<SharedConnectionEngine>(&io_service, options, "foo", "", "protocol", 1); std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1);
EXPECT_CALL(*producer, Produce()) EXPECT_CALL(*producer, Produce())
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))) .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
.WillOnce(Return(std::make_pair(::asio::error_code(), ""))) .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
.WillOnce(Return(std::make_pair(::asio::error::would_block, ""))); .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
engine->Connect("", make_endpoint(), [&complete, &io_service](const Status &stat) { engine->Connect("", make_endpoint(), [&complete, io_service](const Status &stat) {
complete = true; complete = true;
io_service.stop(); io_service->Stop();
ASSERT_TRUE(stat.ok()); ASSERT_TRUE(stat.ok());
}); });
::asio::deadline_timer timer(io_service); ::asio::deadline_timer timer(io_service->GetRaw());
timer.expires_from_now(std::chrono::hours(100)); timer.expires_from_now(std::chrono::hours(100));
timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); }); timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); });
io_service.run(); io_service->Run();
ASSERT_TRUE(complete); ASSERT_TRUE(complete);
} }
TEST(RpcEngineTest, TestTimeout) { TEST(RpcEngineTest, TestTimeout) {
::asio::io_service io_service; std::shared_ptr<IoService> io_service = IoService::MakeShared();
Options options; Options options;
options.rpc_timeout = 1; options.rpc_timeout = 1;
std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(&io_service, options, "foo", "", "protocol", 1); std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(io_service, options, "foo", "", "protocol", 1);
auto conn = auto conn =
std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine); std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine);
conn->TEST_set_connected(true); conn->TEST_set_connected(true);
@ -481,15 +485,15 @@ TEST(RpcEngineTest, TestTimeout) {
std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto());
engine->AsyncRpc("test", &req, resp, [resp, &complete,&io_service](const Status &stat) { engine->AsyncRpc("test", &req, resp, [resp, &complete,&io_service](const Status &stat) {
complete = true; complete = true;
io_service.stop(); io_service->Stop();
ASSERT_FALSE(stat.ok()); ASSERT_FALSE(stat.ok());
}); });
::asio::deadline_timer timer(io_service); ::asio::deadline_timer timer(io_service->GetRaw());
timer.expires_from_now(std::chrono::hours(100)); timer.expires_from_now(std::chrono::hours(100));
timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); }); timer.async_wait([](const asio::error_code & err){(void)err; ASSERT_FALSE("Timed out"); });
io_service.run(); io_service->Run();
ASSERT_TRUE(complete); ASSERT_TRUE(complete);
} }