HDFS-9523. libhdfs++: failure to connect to ipv6 host causes CI unit tests to fail. Contributed by Bob Hansen.

This commit is contained in:
James 2015-12-16 12:27:06 -05:00 committed by James Clampffer
parent 31d28e3105
commit e18db92398
5 changed files with 93 additions and 41 deletions

View File

@ -49,7 +49,7 @@ void NameNodeOperations::Connect(const std::string &server,
m->Push(Resolve(io_service_, server, service, m->Push(Resolve(io_service_, server, service,
std::back_inserter(m->state()))) std::back_inserter(m->state())))
.Push(Bind([this, m](const Continuation::Next &next) { .Push(Bind([this, m](const Continuation::Next &next) {
engine_.Connect(m->state().front(), next); engine_.Connect(m->state(), next);
})); }));
m->Run([this, handler](const Status &status, const State &) { m->Run([this, handler](const Status &status, const State &) {
handler(status); handler(status);

View File

@ -33,10 +33,10 @@ template <class NextLayer>
class RpcConnectionImpl : public RpcConnection { class RpcConnectionImpl : public RpcConnection {
public: public:
RpcConnectionImpl(RpcEngine *engine); RpcConnectionImpl(RpcEngine *engine);
virtual void Connect(const ::asio::ip::tcp::endpoint &server, virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
RpcCallback &handler); RpcCallback &handler);
virtual void ConnectAndFlush( virtual void ConnectAndFlush(
const ::asio::ip::tcp::endpoint &server) override; const std::vector<::asio::ip::tcp::endpoint> &server) override;
virtual void Handshake(RpcCallback &handler) override; virtual void Handshake(RpcCallback &handler) override;
virtual void Disconnect() override; virtual void Disconnect() override;
virtual void OnSendCompleted(const ::asio::error_code &ec, virtual void OnSendCompleted(const ::asio::error_code &ec,
@ -52,7 +52,11 @@ public:
private: private:
const Options options_; const Options options_;
std::vector<::asio::ip::tcp::endpoint> additional_endpoints_;
NextLayer next_layer_; NextLayer next_layer_;
void ConnectComplete(const ::asio::error_code &ec);
void HandshakeComplete(const Status &s);
}; };
template <class NextLayer> template <class NextLayer>
@ -63,7 +67,7 @@ RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine)
template <class NextLayer> template <class NextLayer>
void RpcConnectionImpl<NextLayer>::Connect( void RpcConnectionImpl<NextLayer>::Connect(
const ::asio::ip::tcp::endpoint &server, RpcCallback &handler) { const std::vector<::asio::ip::tcp::endpoint> &server, RpcCallback &handler) {
auto connectionSuccessfulReq = std::make_shared<Request>( auto connectionSuccessfulReq = std::make_shared<Request>(
engine_, [handler](::google::protobuf::io::CodedInputStream *is, engine_, [handler](::google::protobuf::io::CodedInputStream *is,
const Status &status) { const Status &status) {
@ -76,28 +80,65 @@ void RpcConnectionImpl<NextLayer>::Connect(
template <class NextLayer> template <class NextLayer>
void RpcConnectionImpl<NextLayer>::ConnectAndFlush( void RpcConnectionImpl<NextLayer>::ConnectAndFlush(
const ::asio::ip::tcp::endpoint &server) { const std::vector<::asio::ip::tcp::endpoint> &server) {
std::shared_ptr<RpcConnection> shared_this = shared_from_this(); std::lock_guard<std::mutex> state_lock(connection_state_lock_);
next_layer_.async_connect(server,
[shared_this, this](const ::asio::error_code &ec) { if (server.empty()) {
std::lock_guard<std::mutex> state_lock(connection_state_lock_); Status s = Status::InvalidArgument("No endpoints provided");
Status status = ToStatus(ec); CommsError(s);
if (status.ok()) { return;
StartReading(); }
Handshake([shared_this, this](const Status &s) {
std::lock_guard<std::mutex> state_lock(connection_state_lock_); // Take the first endpoint, but remember the alternatives for later
if (s.ok()) { additional_endpoints_ = server;
FlushPendingRequests(); ::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front();
} else { additional_endpoints_.erase(additional_endpoints_.begin());
CommsError(s);
}; auto shared_this = shared_from_this();
}); next_layer_.async_connect(first_endpoint, [shared_this, this](const ::asio::error_code &ec) {
} else { ConnectComplete(ec);
CommsError(status); });
}
});
} }
template <class NextLayer>
void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec) {
auto shared_this = RpcConnectionImpl<NextLayer>::shared_from_this();
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
Status status = ToStatus(ec);
if (status.ok()) {
StartReading();
Handshake([shared_this, this](const Status & s) {
HandshakeComplete(s);
});
} else {
next_layer_.close();
if (!additional_endpoints_.empty()) {
// If we have additional endpoints, keep trying until we either run out or
// hit one
::asio::ip::tcp::endpoint next_endpoint = additional_endpoints_.front();
additional_endpoints_.erase(additional_endpoints_.begin());
next_layer_.async_connect(next_endpoint, [shared_this, this](const ::asio::error_code &ec) {
ConnectComplete(ec);
});
} else {
CommsError(status);
}
}
}
template <class NextLayer>
void RpcConnectionImpl<NextLayer>::HandshakeComplete(const Status &s) {
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
if (s.ok()) {
FlushPendingRequests();
} else {
CommsError(s);
};
}
template <class NextLayer> template <class NextLayer>
void RpcConnectionImpl<NextLayer>::Handshake(RpcCallback &handler) { void RpcConnectionImpl<NextLayer>::Handshake(RpcCallback &handler) {
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling assert(lock_held(connection_state_lock_)); // Must be holding lock before calling

View File

@ -39,13 +39,13 @@ RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
call_id_(0), call_id_(0),
retry_timer(*io_service) {} retry_timer(*io_service) {}
void RpcEngine::Connect(const ::asio::ip::tcp::endpoint &server, void RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
RpcCallback &handler) { RpcCallback &handler) {
std::lock_guard<std::mutex> state_lock(engine_state_lock_); std::lock_guard<std::mutex> state_lock(engine_state_lock_);
last_endpoint_ = server; last_endpoints_ = server;
conn_ = NewConnection(); conn_ = NewConnection();
conn_->Connect(server, handler); conn_->Connect(last_endpoints_, handler);
} }
void RpcEngine::Shutdown() { void RpcEngine::Shutdown() {
@ -75,7 +75,7 @@ void RpcEngine::AsyncRpc(
std::lock_guard<std::mutex> state_lock(engine_state_lock_); std::lock_guard<std::mutex> state_lock(engine_state_lock_);
if (!conn_) { if (!conn_) {
conn_ = NewConnection(); conn_ = NewConnection();
conn_->ConnectAndFlush(last_endpoint_); conn_->ConnectAndFlush(last_endpoints_);
} }
conn_->AsyncRpc(method_name, req, resp, handler); conn_->AsyncRpc(method_name, req, resp, handler);
} }
@ -103,7 +103,7 @@ Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req,
std::lock_guard<std::mutex> state_lock(engine_state_lock_); std::lock_guard<std::mutex> state_lock(engine_state_lock_);
if (!conn_) { if (!conn_) {
conn_ = NewConnection(); conn_ = NewConnection();
conn_->ConnectAndFlush(last_endpoint_); conn_->ConnectAndFlush(last_endpoints_);
} }
conn = conn_; conn = conn_;
} }
@ -170,10 +170,10 @@ void RpcEngine::RpcCommsError(
retry_timer.expires_from_now( retry_timer.expires_from_now(
std::chrono::milliseconds(options_.rpc_retry_delay_ms)); std::chrono::milliseconds(options_.rpc_retry_delay_ms));
retry_timer.async_wait([this](asio::error_code ec) { retry_timer.async_wait([this](asio::error_code ec) {
if (!ec) conn_->ConnectAndFlush(last_endpoint_); if (!ec) conn_->ConnectAndFlush(last_endpoints_);
}); });
} else { } else {
conn_->ConnectAndFlush(last_endpoint_); conn_->ConnectAndFlush(last_endpoints_);
} }
} else { } else {
// Connection will try again if someone calls AsyncRpc // Connection will try again if someone calls AsyncRpc

View File

@ -110,9 +110,11 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
RpcConnection(LockFreeRpcEngine *engine); RpcConnection(LockFreeRpcEngine *engine);
virtual ~RpcConnection(); virtual ~RpcConnection();
virtual void Connect(const ::asio::ip::tcp::endpoint &server, // Note that a single server can have multiple endpoints - especially both
// an ipv4 and ipv6 endpoint
virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
RpcCallback &handler) = 0; RpcCallback &handler) = 0;
virtual void ConnectAndFlush(const ::asio::ip::tcp::endpoint &server) = 0; virtual void ConnectAndFlush(const std::vector<::asio::ip::tcp::endpoint> &server) = 0;
virtual void Handshake(RpcCallback &handler) = 0; virtual void Handshake(RpcCallback &handler) = 0;
virtual void Disconnect() = 0; virtual void Disconnect() = 0;
@ -231,7 +233,7 @@ class RpcEngine : public LockFreeRpcEngine {
const std::string &client_name, const char *protocol_name, const std::string &client_name, const char *protocol_name,
int protocol_version); int protocol_version);
void Connect(const ::asio::ip::tcp::endpoint &server, RpcCallback &handler); void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, RpcCallback &handler);
void AsyncRpc(const std::string &method_name, void AsyncRpc(const std::string &method_name,
const ::google::protobuf::MessageLite *req, const ::google::protobuf::MessageLite *req,
@ -272,6 +274,9 @@ class RpcEngine : public LockFreeRpcEngine {
std::shared_ptr<RpcConnection> conn_; std::shared_ptr<RpcConnection> conn_;
virtual std::shared_ptr<RpcConnection> NewConnection(); virtual std::shared_ptr<RpcConnection> NewConnection();
virtual std::unique_ptr<const RetryPolicy> MakeRetryPolicy(const Options &options); virtual std::unique_ptr<const RetryPolicy> MakeRetryPolicy(const Options &options);
// Remember all of the last endpoints in case we need to reconnect and retry
std::vector<::asio::ip::tcp::endpoint> last_endpoints_;
private: private:
::asio::io_service * const io_service_; ::asio::io_service * const io_service_;
const Options options_; const Options options_;
@ -282,9 +287,6 @@ private:
std::atomic_int call_id_; std::atomic_int call_id_;
::asio::deadline_timer retry_timer; ::asio::deadline_timer retry_timer;
// Remember the last endpoint in case we need to reconnect to retry
::asio::ip::tcp::endpoint last_endpoint_;
std::mutex engine_state_lock_; std::mutex engine_state_lock_;
}; };

View File

@ -43,6 +43,12 @@ namespace pbio = ::google::protobuf::io;
namespace hdfs { namespace hdfs {
std::vector<asio::ip::basic_endpoint<asio::ip::tcp>> make_endpoint() {
std::vector<asio::ip::basic_endpoint<asio::ip::tcp>> result;
result.push_back(asio::ip::basic_endpoint<asio::ip::tcp>());
return result;
}
class MockRPCConnection : public MockConnectionBase { class MockRPCConnection : public MockConnectionBase {
public: public:
MockRPCConnection(::asio::io_service &io_service) MockRPCConnection(::asio::io_service &io_service)
@ -61,6 +67,9 @@ class SharedConnectionEngine : public RpcEngine {
protected: protected:
std::shared_ptr<RpcConnection> NewConnection() override { std::shared_ptr<RpcConnection> NewConnection() override {
// Stuff in some dummy endpoints so we don't error out
last_endpoints_ = make_endpoint();
return std::make_shared<RpcConnectionImpl<SharedMockRPCConnection>>(this); return std::make_shared<RpcConnectionImpl<SharedMockRPCConnection>>(this);
} }
@ -257,7 +266,7 @@ TEST(RpcEngineTest, TestConnectionFailure)
EXPECT_CALL(*producer, Produce()) EXPECT_CALL(*producer, Produce())
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))); .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")));
engine.Connect(asio::ip::basic_endpoint<asio::ip::tcp>(), [&complete, &io_service](const Status &stat) { engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) {
complete = true; complete = true;
io_service.stop(); io_service.stop();
ASSERT_FALSE(stat.ok()); ASSERT_FALSE(stat.ok());
@ -285,7 +294,7 @@ TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure)
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))) .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")))
.WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), ""))); .WillOnce(Return(std::make_pair(make_error_code(::asio::error::connection_reset), "")));
engine.Connect(asio::ip::basic_endpoint<asio::ip::tcp>(), [&complete, &io_service](const Status &stat) { engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) {
complete = true; complete = true;
io_service.stop(); io_service.stop();
ASSERT_FALSE(stat.ok()); ASSERT_FALSE(stat.ok());
@ -313,7 +322,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover)
.WillOnce(Return(std::make_pair(::asio::error_code(), ""))) .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
.WillOnce(Return(std::make_pair(::asio::error::would_block, ""))); .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
engine.Connect(asio::ip::basic_endpoint<asio::ip::tcp>(), [&complete, &io_service](const Status &stat) { engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) {
complete = true; complete = true;
io_service.stop(); io_service.stop();
ASSERT_TRUE(stat.ok()); ASSERT_TRUE(stat.ok());
@ -342,7 +351,7 @@ TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover)
.WillOnce(Return(std::make_pair(::asio::error_code(), ""))) .WillOnce(Return(std::make_pair(::asio::error_code(), "")))
.WillOnce(Return(std::make_pair(::asio::error::would_block, ""))); .WillOnce(Return(std::make_pair(::asio::error::would_block, "")));
engine.Connect(asio::ip::basic_endpoint<asio::ip::tcp>(), [&complete, &io_service](const Status &stat) { engine.Connect(make_endpoint(), [&complete, &io_service](const Status &stat) {
complete = true; complete = true;
io_service.stop(); io_service.stop();
ASSERT_TRUE(stat.ok()); ASSERT_TRUE(stat.ok());