libhdfs++: Get rid of lock in RpcConnectionImpl destructor. Contributed by James Clampffer

This commit is contained in:
James 2016-10-14 10:13:24 -04:00 committed by James Clampffer
parent 7ebecaeede
commit 6dd47cae86
2 changed files with 41 additions and 40 deletions

View File

@ -34,9 +34,11 @@
namespace hdfs { namespace hdfs {
template <class NextLayer> template <class Socket>
class RpcConnectionImpl : public RpcConnection { class RpcConnectionImpl : public RpcConnection {
public: public:
MEMCHECKED_CLASS(RpcConnectionImpl);
RpcConnectionImpl(RpcEngine *engine); RpcConnectionImpl(RpcEngine *engine);
virtual ~RpcConnectionImpl() override; virtual ~RpcConnectionImpl() override;
@ -55,7 +57,7 @@ public:
virtual void FlushPendingRequests() override; virtual void FlushPendingRequests() override;
NextLayer &next_layer() { return next_layer_; } Socket &TEST_get_mutable_socket() { return socket_; }
void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; } void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; }
@ -63,35 +65,34 @@ public:
const Options options_; const Options options_;
::asio::ip::tcp::endpoint current_endpoint_; ::asio::ip::tcp::endpoint current_endpoint_;
std::vector<::asio::ip::tcp::endpoint> additional_endpoints_; std::vector<::asio::ip::tcp::endpoint> additional_endpoints_;
NextLayer next_layer_; Socket socket_;
::asio::deadline_timer connect_timer_; ::asio::deadline_timer connect_timer_;
void ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint &remote); void ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint &remote);
}; };
template <class NextLayer> template <class Socket>
RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine) RpcConnectionImpl<Socket>::RpcConnectionImpl(RpcEngine *engine)
: RpcConnection(engine), : RpcConnection(engine),
options_(engine->options()), options_(engine->options()),
next_layer_(engine->io_service()), socket_(engine->io_service()),
connect_timer_(engine->io_service()) connect_timer_(engine->io_service())
{ {
LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this); LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this);
} }
template <class NextLayer> template <class Socket>
RpcConnectionImpl<NextLayer>::~RpcConnectionImpl() { RpcConnectionImpl<Socket>::~RpcConnectionImpl() {
LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this); LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this);
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
if (pending_requests_.size() > 0) if (pending_requests_.size() > 0)
LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue"); LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue");
if (requests_on_fly_.size() > 0) if (requests_on_fly_.size() > 0)
LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue"); LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue");
} }
template <class NextLayer> template <class Socket>
void RpcConnectionImpl<NextLayer>::Connect( void RpcConnectionImpl<Socket>::Connect(
const std::vector<::asio::ip::tcp::endpoint> &server, const std::vector<::asio::ip::tcp::endpoint> &server,
const AuthInfo & auth_info, const AuthInfo & auth_info,
RpcCallback &handler) { RpcCallback &handler) {
@ -109,8 +110,8 @@ void RpcConnectionImpl<NextLayer>::Connect(
this->ConnectAndFlush(server); // need "this" so compiler can infer type of CAF this->ConnectAndFlush(server); // need "this" so compiler can infer type of CAF
} }
template <class NextLayer> template <class Socket>
void RpcConnectionImpl<NextLayer>::ConnectAndFlush( void RpcConnectionImpl<Socket>::ConnectAndFlush(
const std::vector<::asio::ip::tcp::endpoint> &server) { const std::vector<::asio::ip::tcp::endpoint> &server) {
LOG_INFO(kRPC, << "ConnectAndFlush called"); LOG_INFO(kRPC, << "ConnectAndFlush called");
@ -139,7 +140,7 @@ void RpcConnectionImpl<NextLayer>::ConnectAndFlush(
current_endpoint_ = first_endpoint; current_endpoint_ = first_endpoint;
auto shared_this = shared_from_this(); auto shared_this = shared_from_this();
next_layer_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code &ec) { socket_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code &ec) {
ConnectComplete(ec, first_endpoint); ConnectComplete(ec, first_endpoint);
}); });
@ -155,9 +156,9 @@ void RpcConnectionImpl<NextLayer>::ConnectAndFlush(
}); });
} }
template <class NextLayer> template <class Socket>
void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint & remote) { void RpcConnectionImpl<Socket>::ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint & remote) {
auto shared_this = RpcConnectionImpl<NextLayer>::shared_from_this(); auto shared_this = RpcConnectionImpl<Socket>::shared_from_this();
std::lock_guard<std::mutex> state_lock(connection_state_lock_); std::lock_guard<std::mutex> state_lock(connection_state_lock_);
connect_timer_.cancel(); connect_timer_.cancel();
@ -190,7 +191,7 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec,
}); });
} else { } else {
LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());; LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());;
std::string err = SafeDisconnect(get_asio_socket_ptr(&next_layer_)); std::string err = SafeDisconnect(get_asio_socket_ptr(&socket_));
if(!err.empty()) { if(!err.empty()) {
LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error closing connection: " << err); LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error closing connection: " << err);
} }
@ -202,7 +203,7 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec,
additional_endpoints_.erase(additional_endpoints_.begin()); additional_endpoints_.erase(additional_endpoints_.begin());
current_endpoint_ = next_endpoint; current_endpoint_ = next_endpoint;
next_layer_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code &ec) { socket_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code &ec) {
ConnectComplete(ec, next_endpoint); ConnectComplete(ec, next_endpoint);
}); });
connect_timer_.expires_from_now( connect_timer_.expires_from_now(
@ -219,8 +220,8 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec,
} }
} }
template <class NextLayer> template <class Socket>
void RpcConnectionImpl<NextLayer>::SendHandshake(RpcCallback &handler) { void RpcConnectionImpl<Socket>::SendHandshake(RpcCallback &handler) {
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling assert(lock_held(connection_state_lock_)); // Must be holding lock before calling
LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called"); LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called");
@ -228,7 +229,7 @@ void RpcConnectionImpl<NextLayer>::SendHandshake(RpcCallback &handler) {
auto shared_this = shared_from_this(); auto shared_this = shared_from_this();
auto handshake_packet = PrepareHandshakePacket(); auto handshake_packet = PrepareHandshakePacket();
::asio::async_write(next_layer_, asio::buffer(*handshake_packet), ::asio::async_write(socket_, asio::buffer(*handshake_packet),
[handshake_packet, handler, shared_this, this]( [handshake_packet, handler, shared_this, this](
const ::asio::error_code &ec, size_t) { const ::asio::error_code &ec, size_t) {
Status status = ToStatus(ec); Status status = ToStatus(ec);
@ -236,15 +237,15 @@ void RpcConnectionImpl<NextLayer>::SendHandshake(RpcCallback &handler) {
}); });
} }
template <class NextLayer> template <class Socket>
void RpcConnectionImpl<NextLayer>::SendContext(RpcCallback &handler) { void RpcConnectionImpl<Socket>::SendContext(RpcCallback &handler) {
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling assert(lock_held(connection_state_lock_)); // Must be holding lock before calling
LOG_TRACE(kRPC, << "RpcConnectionImpl::SendContext called"); LOG_TRACE(kRPC, << "RpcConnectionImpl::SendContext called");
auto shared_this = shared_from_this(); auto shared_this = shared_from_this();
auto context_packet = PrepareContextPacket(); auto context_packet = PrepareContextPacket();
::asio::async_write(next_layer_, asio::buffer(*context_packet), ::asio::async_write(socket_, asio::buffer(*context_packet),
[context_packet, handler, shared_this, this]( [context_packet, handler, shared_this, this](
const ::asio::error_code &ec, size_t) { const ::asio::error_code &ec, size_t) {
Status status = ToStatus(ec); Status status = ToStatus(ec);
@ -252,8 +253,8 @@ void RpcConnectionImpl<NextLayer>::SendContext(RpcCallback &handler) {
}); });
} }
template <class NextLayer> template <class Socket>
void RpcConnectionImpl<NextLayer>::OnSendCompleted(const ::asio::error_code &ec, void RpcConnectionImpl<Socket>::OnSendCompleted(const ::asio::error_code &ec,
size_t) { size_t) {
using std::placeholders::_1; using std::placeholders::_1;
using std::placeholders::_2; using std::placeholders::_2;
@ -271,8 +272,8 @@ void RpcConnectionImpl<NextLayer>::OnSendCompleted(const ::asio::error_code &ec,
FlushPendingRequests(); FlushPendingRequests();
} }
template <class NextLayer> template <class Socket>
void RpcConnectionImpl<NextLayer>::FlushPendingRequests() { void RpcConnectionImpl<Socket>::FlushPendingRequests() {
using namespace ::std::placeholders; using namespace ::std::placeholders;
// Lock should be held // Lock should be held
@ -335,7 +336,7 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
this->HandleRpcTimeout(timeout_req, ec); this->HandleRpcTimeout(timeout_req, ec);
}); });
asio::async_write(next_layer_, asio::buffer(*payload), asio::async_write(socket_, asio::buffer(*payload),
[shared_this, this, payload](const ::asio::error_code &ec, [shared_this, this, payload](const ::asio::error_code &ec,
size_t size) { size_t size) {
OnSendCompleted(ec, size); OnSendCompleted(ec, size);
@ -352,8 +353,8 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
} }
template <class NextLayer> template <class Socket>
void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &original_ec, void RpcConnectionImpl<Socket>::OnRecvCompleted(const ::asio::error_code &original_ec,
size_t) { size_t) {
using std::placeholders::_1; using std::placeholders::_1;
using std::placeholders::_2; using std::placeholders::_2;
@ -396,7 +397,7 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ori
auto buf = ::asio::buffer(reinterpret_cast<char *>(&current_response_state_->length_), auto buf = ::asio::buffer(reinterpret_cast<char *>(&current_response_state_->length_),
sizeof(current_response_state_->length_)); sizeof(current_response_state_->length_));
asio::async_read( asio::async_read(
next_layer_, buf, socket_, buf,
[shared_this, this](const ::asio::error_code &ec, size_t size) { [shared_this, this](const ::asio::error_code &ec, size_t size) {
OnRecvCompleted(ec, size); OnRecvCompleted(ec, size);
}); });
@ -405,7 +406,7 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ori
current_response_state_->length_ = ntohl(current_response_state_->length_); current_response_state_->length_ = ntohl(current_response_state_->length_);
current_response_state_->data_.resize(current_response_state_->length_); current_response_state_->data_.resize(current_response_state_->length_);
asio::async_read( asio::async_read(
next_layer_, ::asio::buffer(current_response_state_->data_), socket_, ::asio::buffer(current_response_state_->data_),
[shared_this, this](const ::asio::error_code &ec, size_t size) { [shared_this, this](const ::asio::error_code &ec, size_t size) {
OnRecvCompleted(ec, size); OnRecvCompleted(ec, size);
}); });
@ -425,8 +426,8 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ori
} }
} }
template <class NextLayer> template <class Socket>
void RpcConnectionImpl<NextLayer>::Disconnect() { void RpcConnectionImpl<Socket>::Disconnect() {
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling assert(lock_held(connection_state_lock_)); // Must be holding lock before calling
LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called"); LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called");
@ -434,7 +435,7 @@ void RpcConnectionImpl<NextLayer>::Disconnect() {
request_over_the_wire_.reset(); request_over_the_wire_.reset();
if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == kAuthenticating || connected_ == kConnected) { if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == kAuthenticating || connected_ == kConnected) {
// Don't print out errors, we were expecting a disconnect here // Don't print out errors, we were expecting a disconnect here
SafeDisconnect(get_asio_socket_ptr(&next_layer_)); SafeDisconnect(get_asio_socket_ptr(&socket_));
} }
connected_ = kDisconnected; connected_ = kDisconnected;
} }

View File

@ -118,7 +118,7 @@ TEST(RpcEngineTest, TestRoundTrip) {
RpcResponseHeaderProto h; RpcResponseHeaderProto h;
h.set_callid(1); h.set_callid(1);
h.set_status(RpcResponseHeaderProto::SUCCESS); h.set_status(RpcResponseHeaderProto::SUCCESS);
EXPECT_CALL(conn->next_layer(), Produce()) EXPECT_CALL(conn->TEST_get_mutable_socket(), Produce())
.WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString()))); .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString())));
std::shared_ptr<RpcConnection> conn_ptr(conn); std::shared_ptr<RpcConnection> conn_ptr(conn);
@ -153,7 +153,7 @@ TEST(RpcEngineTest, TestConnectionResetAndFail) {
RpcResponseHeaderProto h; RpcResponseHeaderProto h;
h.set_callid(1); h.set_callid(1);
h.set_status(RpcResponseHeaderProto::SUCCESS); h.set_status(RpcResponseHeaderProto::SUCCESS);
EXPECT_CALL(conn->next_layer(), Produce()) EXPECT_CALL(conn->TEST_get_mutable_socket(), Produce())
.WillOnce(Return(RpcResponse( .WillOnce(Return(RpcResponse(
h, "", make_error_code(::asio::error::connection_reset)))); h, "", make_error_code(::asio::error::connection_reset))));
@ -455,7 +455,7 @@ TEST(RpcEngineTest, TestTimeout) {
conn->TEST_set_connected(true); conn->TEST_set_connected(true);
conn->StartReading(); conn->StartReading();
EXPECT_CALL(conn->next_layer(), Produce()) EXPECT_CALL(conn->TEST_get_mutable_socket(), Produce())
.WillOnce(Return(std::make_pair(::asio::error::would_block, ""))); .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
std::shared_ptr<RpcConnection> conn_ptr(conn); std::shared_ptr<RpcConnection> conn_ptr(conn);