HDFS-10231: libhdfs++: Fix race conditions in RPC layer. Contributed by Bob Hansen.

This commit is contained in:
Bob Hansen 2016-04-04 08:53:52 -07:00 committed by James Clampffer
parent 015d93a715
commit ed77d8d5df
5 changed files with 158 additions and 96 deletions

View File

@ -158,15 +158,17 @@ void Request::OnResponseArrived(pbio::CodedInputStream *is,
RpcConnection::RpcConnection(LockFreeRpcEngine *engine)
: engine_(engine),
connected_(false) {}
connected_(kNotYetConnected) {}
::asio::io_service &RpcConnection::io_service() {
return engine_->io_service();
}
void RpcConnection::StartReading() {
io_service().post(std::bind(&RpcConnection::OnRecvCompleted, this,
::asio::error_code(), 0));
auto shared_this = shared_from_this();
io_service().post([shared_this, this] () {
OnRecvCompleted(::asio::error_code(), 0);
});
}
void RpcConnection::AsyncFlushPendingRequests() {
@ -174,6 +176,8 @@ void RpcConnection::AsyncFlushPendingRequests() {
io_service().post([shared_this, this]() {
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
LOG_TRACE(kRPC, << "RpcConnection::AsyncRpc called (connected=" << ToString(connected_) << ")");
if (!request_over_the_wire_) {
FlushPendingRequests();
}
@ -281,40 +285,53 @@ void RpcConnection::AsyncRpc(
auto r = std::make_shared<Request>(engine_, method_name, req,
std::move(wrapped_handler));
pending_requests_.push_back(r);
FlushPendingRequests();
}
void RpcConnection::AsyncRawRpc(const std::string &method_name,
const std::string &req,
std::shared_ptr<std::string> resp,
RpcCallback &&handler) {
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
if (connected_ == kDisconnected) {
// Oops. The connection failed _just_ before the engine got a chance
// to send it. Register it as a failure
Status status = Status::ResourceUnavailable("RpcConnection closed before send.");
auto r_vector = std::vector<std::shared_ptr<Request> > (1, r);
assert(r_vector[0].get() != nullptr);
std::shared_ptr<RpcConnection> shared_this = shared_from_this();
auto wrapped_handler = [shared_this, this, resp, handler](
pbio::CodedInputStream *is, const Status &status) {
if (status.ok()) {
uint32_t size = 0;
is->ReadVarint32(&size);
auto limit = is->PushLimit(size);
is->ReadString(resp.get(), limit);
is->PopLimit(limit);
engine_->AsyncRpcCommsError(status, shared_from_this(), r_vector);
} else {
pending_requests_.push_back(r);
if (connected_ == kConnected) { // Dont flush if we're waiting or handshaking
FlushPendingRequests();
}
handler(status);
};
auto r = std::make_shared<Request>(engine_, method_name, req,
std::move(wrapped_handler));
pending_requests_.push_back(r);
FlushPendingRequests();
}
}
void RpcConnection::AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests) {
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
LOG_TRACE(kRPC, << "RpcConnection::AsyncRpc[] called; connected=" << ToString(connected_));
if (connected_ == kDisconnected) {
// Oops. The connection failed _just_ before the engine got a chance
// to send it. Register it as a failure
Status status = Status::ResourceUnavailable("RpcConnection closed before send.");
engine_->AsyncRpcCommsError(status, shared_from_this(), requests);
} else {
pending_requests_.reserve(pending_requests_.size() + requests.size());
for (auto r: requests) {
pending_requests_.push_back(r);
}
if (connected_ == kConnected) { // Dont flush if we're waiting or handshaking
FlushPendingRequests();
}
}
}
void RpcConnection::PreEnqueueRequests(
std::vector<std::shared_ptr<Request>> requests) {
// Public method - acquire lock
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
assert(!connected_);
LOG_DEBUG(kRPC, << "RpcConnection::PreEnqueueRequests called");
assert(connected_ == kNotYetConnected);
pending_requests_.insert(pending_requests_.end(), requests.begin(),
requests.end());
@ -349,7 +366,7 @@ void RpcConnection::CommsError(const Status &status) {
std::make_move_iterator(pending_requests_.end()));
pending_requests_.clear();
engine_->AsyncRpcCommsError(status, requestsToReturn);
engine_->AsyncRpcCommsError(status, shared_from_this(), requestsToReturn);
}
void RpcConnection::ClearAndDisconnect(const ::asio::error_code &ec) {
@ -379,4 +396,15 @@ std::shared_ptr<Request> RpcConnection::RemoveFromRunningQueue(int call_id) {
requests_on_fly_.erase(it);
return req;
}
std::string RpcConnection::ToString(ConnectedState connected) {
switch(connected) {
case kNotYetConnected: return "NotYetConnected";
case kConnecting: return "Connecting";
case kConnected: return "Connected";
case kDisconnected: return "Disconnected";
default: return "Invalid ConnectedState";
}
}
}

View File

@ -34,6 +34,8 @@ template <class NextLayer>
class RpcConnectionImpl : public RpcConnection {
public:
RpcConnectionImpl(RpcEngine *engine);
virtual ~RpcConnectionImpl() override;
virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
RpcCallback &handler);
virtual void ConnectAndFlush(
@ -49,7 +51,7 @@ public:
NextLayer &next_layer() { return next_layer_; }
void TEST_set_connected(bool new_value) { connected_ = new_value; }
void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; }
private:
const Options options_;
@ -66,7 +68,19 @@ RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine)
options_(engine->options()),
next_layer_(engine->io_service()) {
LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called");
}
}
template <class NextLayer>
RpcConnectionImpl<NextLayer>::~RpcConnectionImpl() {
LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this);
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
if (pending_requests_.size() > 0)
LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue");
if (requests_on_fly_.size() > 0)
LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue");
}
template <class NextLayer>
void RpcConnectionImpl<NextLayer>::Connect(
@ -93,6 +107,16 @@ void RpcConnectionImpl<NextLayer>::ConnectAndFlush(
return;
}
if (connected_ == kConnected) {
FlushPendingRequests();
return;
}
if (connected_ != kNotYetConnected) {
LOG_WARN(kRPC, << "RpcConnectionImpl::ConnectAndFlush called while connected=" << ToString(connected_));
return;
}
connected_ = kConnecting;
// Take the first endpoint, but remember the alternatives for later
additional_endpoints_ = server;
::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front();
@ -169,8 +193,8 @@ void RpcConnectionImpl<NextLayer>::Handshake(RpcCallback &handler) {
[handshake_packet, handler, shared_this, this](
const ::asio::error_code &ec, size_t) {
Status status = ToStatus(ec);
if (status.ok()) {
connected_ = true;
if (status.ok() && connected_ == kConnecting) {
connected_ = kConnected;
}
handler(status);
});
@ -208,9 +232,13 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
return;
}
if (!connected_) {
if (connected_ == kDisconnected) {
LOG_WARN(kRPC, << "RpcConnectionImpl::FlushPendingRequests attempted to flush a disconnected connection");
return;
}
if (connected_ != kConnected) {
LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests attempted to flush a " << ToString(connected_) << " connection");
}
// Don't send if we don't need to
if (request_over_the_wire_) {
@ -218,19 +246,25 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
}
std::shared_ptr<Request> req = pending_requests_.front();
auto weak_req = std::weak_ptr<Request>(req);
pending_requests_.erase(pending_requests_.begin());
std::shared_ptr<RpcConnection> shared_this = shared_from_this();
auto weak_this = std::weak_ptr<RpcConnection>(shared_this);
std::shared_ptr<std::string> payload = std::make_shared<std::string>();
req->GetPacket(payload.get());
if (!payload->empty()) {
assert(requests_on_fly_.find(req->call_id()) == requests_on_fly_.end());
requests_on_fly_[req->call_id()] = req;
request_over_the_wire_ = req;
req->timer().expires_from_now(
std::chrono::milliseconds(options_.rpc_timeout));
req->timer().async_wait([shared_this, this, req](const ::asio::error_code &ec) {
this->HandleRpcTimeout(req, ec);
req->timer().async_wait([weak_this, weak_req, this](const ::asio::error_code &ec) {
auto timeout_this = weak_this.lock();
auto timeout_req = weak_req.lock();
if (timeout_this && timeout_req)
this->HandleRpcTimeout(timeout_req, ec);
});
asio::async_write(next_layer_, asio::buffer(*payload),
@ -320,11 +354,11 @@ void RpcConnectionImpl<NextLayer>::Disconnect() {
LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called");
request_over_the_wire_.reset();
if (connected_) {
if (connected_ == kConnecting || connected_ == kConnected) {
next_layer_.cancel();
next_layer_.close();
}
connected_ = false;
connected_ = kDisconnected;
}
}

View File

@ -61,7 +61,6 @@ void RpcEngine::Shutdown() {
LOG_DEBUG(kRPC, << "RpcEngine::Shutdown called");
io_service_->post([this]() {
std::lock_guard<std::mutex> state_lock(engine_state_lock_);
conn_->Disconnect();
conn_.reset();
});
}
@ -122,47 +121,33 @@ std::shared_ptr<RpcConnection> RpcEngine::InitializeConnection()
return result;
}
Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req,
std::shared_ptr<std::string> resp) {
LOG_TRACE(kRPC, << "RpcEngine::RawRpc called");
std::shared_ptr<RpcConnection> conn;
{
std::lock_guard<std::mutex> state_lock(engine_state_lock_);
if (!conn_) {
conn_ = InitializeConnection();
conn_->ConnectAndFlush(last_endpoints_);
}
conn = conn_;
}
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future(stat->get_future());
conn->AsyncRawRpc(method_name, req, resp,
[stat](const Status &status) { stat->set_value(status); });
return future.get();
}
void RpcEngine::AsyncRpcCommsError(
const Status &status,
std::shared_ptr<RpcConnection> failedConnection,
std::vector<std::shared_ptr<Request>> pendingRequests) {
LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called");
LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called; conn=" << failedConnection.get() << " reqs=" << pendingRequests.size());
io_service().post([this, status, pendingRequests]() {
RpcCommsError(status, pendingRequests);
io_service().post([this, status, failedConnection, pendingRequests]() {
RpcCommsError(status, failedConnection, pendingRequests);
});
}
void RpcEngine::RpcCommsError(
const Status &status,
std::shared_ptr<RpcConnection> failedConnection,
std::vector<std::shared_ptr<Request>> pendingRequests) {
(void)status;
LOG_ERROR(kRPC, << "RpcEngine::RpcCommsError called");
LOG_ERROR(kRPC, << "RpcEngine::RpcCommsError called; conn=" << failedConnection.get() << " reqs=" << pendingRequests.size());
std::lock_guard<std::mutex> state_lock(engine_state_lock_);
// If the failed connection is the current one, shut it down
// It will be reconnected when there is work to do
if (failedConnection == conn_) {
conn_.reset();
}
auto head_action = optional<RetryAction>();
// Filter out anything with too many retries already
@ -192,25 +177,35 @@ void RpcEngine::RpcCommsError(
}
}
// Close the connection and retry and requests that might have been sent to
// the NN
if (!pendingRequests.empty() &&
head_action && head_action->action != RetryAction::FAIL) {
conn_ = InitializeConnection();
// If we have reqests that need to be re-sent, ensure that we have a connection
// and send the requests to it
bool haveRequests = !pendingRequests.empty() &&
head_action && head_action->action != RetryAction::FAIL;
conn_->PreEnqueueRequests(pendingRequests);
if (head_action->delayMillis > 0) {
retry_timer.expires_from_now(
std::chrono::milliseconds(options_.rpc_retry_delay_ms));
retry_timer.async_wait([this](asio::error_code ec) {
if (!ec) conn_->ConnectAndFlush(last_endpoints_);
});
if (haveRequests) {
bool needNewConnection = !conn_;
if (needNewConnection) {
conn_ = InitializeConnection();
conn_->PreEnqueueRequests(pendingRequests);
if (head_action->delayMillis > 0) {
auto weak_conn = std::weak_ptr<RpcConnection>(conn_);
retry_timer.expires_from_now(
std::chrono::milliseconds(options_.rpc_retry_delay_ms));
retry_timer.async_wait([this, weak_conn](asio::error_code ec) {
auto strong_conn = weak_conn.lock();
if ( (!ec) && (strong_conn) ) {
strong_conn->ConnectAndFlush(last_endpoints_);
}
});
} else {
conn_->ConnectAndFlush(last_endpoints_);
}
} else {
conn_->ConnectAndFlush(last_endpoints_);
// We have an existing connection (which might be closed; we don't know
// until we hold the connection local) and should just add the new requests
conn_->AsyncRpc(pendingRequests);
}
} else {
// Connection will try again if someone calls AsyncRpc
conn_.reset();
}
}

View File

@ -125,8 +125,7 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
std::shared_ptr<::google::protobuf::MessageLite> resp,
const RpcCallback &handler);
void AsyncRawRpc(const std::string &method_name, const std::string &request,
std::shared_ptr<std::string> resp, RpcCallback &&handler);
void AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests);
// Enqueue requests before the connection is connected. Will be flushed
// on connect
@ -182,7 +181,15 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
// Connection can have deferred connection, especially when we're pausing
// during retry
bool connected_;
enum ConnectedState {
kNotYetConnected,
kConnecting,
kConnected,
kDisconnected
};
static std::string ToString(ConnectedState connected);
ConnectedState connected_;
// The request being sent over the wire; will also be in requests_on_fly_
std::shared_ptr<Request> request_over_the_wire_;
// Requests to be sent over the wire
@ -207,6 +214,7 @@ class LockFreeRpcEngine {
public:
/* Enqueues a CommsError without acquiring a lock*/
virtual void AsyncRpcCommsError(const Status &status,
std::shared_ptr<RpcConnection> failedConnection,
std::vector<std::shared_ptr<Request>> pendingRequests) = 0;
@ -254,19 +262,16 @@ class RpcEngine : public LockFreeRpcEngine {
Status Rpc(const std::string &method_name,
const ::google::protobuf::MessageLite *req,
const std::shared_ptr<::google::protobuf::MessageLite> &resp);
/**
* Send raw bytes as RPC payload. This is intended to be used in JNI
* bindings only.
**/
Status RawRpc(const std::string &method_name, const std::string &req,
std::shared_ptr<std::string> resp);
void Start();
void Shutdown();
/* Enqueues a CommsError without acquiring a lock*/
void AsyncRpcCommsError(const Status &status,
std::shared_ptr<RpcConnection> failedConnection,
std::vector<std::shared_ptr<Request>> pendingRequests) override;
void RpcCommsError(const Status &status,
std::shared_ptr<RpcConnection> failedConnection,
std::vector<std::shared_ptr<Request>> pendingRequests);

View File

@ -106,8 +106,8 @@ TEST(RpcEngineTest, TestRoundTrip) {
::asio::io_service io_service;
Options options;
RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
RpcConnectionImpl<MockRPCConnection> *conn =
new RpcConnectionImpl<MockRPCConnection>(&engine);
auto conn =
std::make_shared<RpcConnectionImpl<MockRPCConnection> >(&engine);
conn->TEST_set_connected(true);
conn->StartReading();
@ -142,8 +142,8 @@ TEST(RpcEngineTest, TestConnectionResetAndFail) {
::asio::io_service io_service;
Options options;
RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
RpcConnectionImpl<MockRPCConnection> *conn =
new RpcConnectionImpl<MockRPCConnection>(&engine);
auto conn =
std::make_shared<RpcConnectionImpl<MockRPCConnection> >(&engine);
conn->TEST_set_connected(true);
conn->StartReading();
@ -436,8 +436,8 @@ TEST(RpcEngineTest, TestTimeout) {
Options options;
options.rpc_timeout = 1;
RpcEngine engine(&io_service, options, "foo", "", "protocol", 1);
RpcConnectionImpl<MockRPCConnection> *conn =
new RpcConnectionImpl<MockRPCConnection>(&engine);
auto conn =
std::make_shared<RpcConnectionImpl<MockRPCConnection> >(&engine);
conn->TEST_set_connected(true);
conn->StartReading();