From 2616fe2025af642b25e61ff9f9a9a2624a8bbd24 Mon Sep 17 00:00:00 2001 From: Bob Hansen Date: Mon, 20 Jun 2016 15:37:14 -0400 Subject: [PATCH] HDFS-10526: libhdfs++: Add connect timeouts to async_connect calls. Contributed by Bob Hansen. --- .../native/libhdfspp/include/hdfspp/options.h | 7 +++ .../lib/common/hdfs_configuration.cc | 3 +- .../libhdfspp/lib/common/hdfs_configuration.h | 3 +- .../native/libhdfspp/lib/common/logging.cc | 6 +- .../native/libhdfspp/lib/common/logging.h | 3 + .../native/libhdfspp/lib/common/options.cc | 3 +- .../libhdfspp/lib/rpc/rpc_connection.cc | 5 +- .../native/libhdfspp/lib/rpc/rpc_connection.h | 55 ++++++++++++++++--- .../native/libhdfspp/lib/rpc/rpc_engine.cc | 2 +- .../native/libhdfspp/lib/rpc/rpc_engine.h | 1 + .../tests/hdfs_configuration_test.cc | 14 ++++- 11 files changed, 84 insertions(+), 18 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h index 1828b2afd88..8562f6d7acf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/options.h @@ -33,6 +33,13 @@ struct Options { int rpc_timeout; static const int kDefaultRpcTimeout = 30000; + /** + * Time to wait for an RPC connection before failing + * Default: 30000 + **/ + int rpc_connect_timeout; + static const int kDefaultRpcConnectTimeout = 30000; + /** * Maximum number of retries for RPC operations **/ diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc index 13ba279b95c..ef67af98b71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.cc @@ -45,11 +45,12 @@ Options HdfsConfiguration::GetOptions() { Options result; OptionalSet(result.rpc_timeout, GetInt(kDfsClientSocketTimeoutKey)); + OptionalSet(result.rpc_connect_timeout, GetInt(kIpcClientConnectTimeoutKey)); OptionalSet(result.max_rpc_retries, GetInt(kIpcClientConnectMaxRetriesKey)); OptionalSet(result.rpc_retry_delay_ms, GetInt(kIpcClientConnectRetryIntervalKey)); OptionalSet(result.defaultFS, GetUri(kFsDefaultFsKey)); - optional authentication_value = Get(kHadoopSecurityAuthentication); + optional authentication_value = Get(kHadoopSecurityAuthenticationKey); if (authentication_value ) { std::string fixed_case_value = fixCase(authentication_value.value()); if (fixed_case_value == fixCase(kHadoopSecurityAuthentication_kerberos)) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h index 7db9d37637c..c6ead666fdd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_configuration.h @@ -39,9 +39,10 @@ class HdfsConfiguration : public Configuration { // Keys to look for in the configuration file static constexpr const char * kFsDefaultFsKey = "fs.defaultFS"; static constexpr const char * kDfsClientSocketTimeoutKey = "dfs.client.socket-timeout"; + static constexpr const char * kIpcClientConnectTimeoutKey = "ipc.client.connect.timeout"; static constexpr const char * kIpcClientConnectMaxRetriesKey = "ipc.client.connect.max.retries"; static constexpr const char * kIpcClientConnectRetryIntervalKey = "ipc.client.connect.retry.interval"; - static constexpr const char * kHadoopSecurityAuthentication = "hadoop.security.authentication"; + static constexpr const char * kHadoopSecurityAuthenticationKey = "hadoop.security.authentication"; static constexpr const char * kHadoopSecurityAuthentication_simple = "simple"; static constexpr const char * kHadoopSecurityAuthentication_kerberos = "kerberos"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc index c299761c962..39ed94448bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.cc @@ -136,6 +136,11 @@ LogMessage& LogMessage::operator<<(const std::string& str) { return *this; } +LogMessage& LogMessage::operator<<(const ::asio::ip::tcp::endpoint& endpoint) { + msg_buffer_ << endpoint; + return *this; +} + LogMessage& LogMessage::operator<<(const char *str) { if(str) msg_buffer_ << str; @@ -213,4 +218,3 @@ const char * LogMessage::component_string() const { } } - diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h index 3403646e186..9dc0c5f6388 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/logging.h @@ -26,6 +26,8 @@ #include #include +#include + namespace hdfs { /** @@ -177,6 +179,7 @@ class LogMessage { LogMessage& operator<<(const std::string*); LogMessage& operator<<(const std::string&); + LogMessage& operator<<(const ::asio::ip::tcp::endpoint& endpoint); //convert to a string "true"/"false" LogMessage& operator<<(bool); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc index c7dd2ed98ca..305ea1ae398 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc @@ -27,7 +27,8 @@ const int Options::kDefaultMaxRpcRetries; const int Options::kDefaultRpcRetryDelayMs; const unsigned int Options::kDefaultHostExclusionDuration; -Options::Options() : rpc_timeout(kDefaultRpcTimeout), max_rpc_retries(kDefaultMaxRpcRetries), +Options::Options() : rpc_timeout(kDefaultRpcTimeout), rpc_connect_timeout(kDefaultRpcConnectTimeout), + max_rpc_retries(kDefaultMaxRpcRetries), rpc_retry_delay_ms(kDefaultRpcRetryDelayMs), host_exclusion_duration(kDefaultHostExclusionDuration), defaultFS(), diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc index 749195a278d..8567932a435 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc @@ -183,7 +183,7 @@ void RpcConnection::HandshakeComplete(const Status &s) { LOG_TRACE(kRPC, << "RpcConnectionImpl::HandshakeComplete called"); if (s.ok()) { - if (connected_ == kConnecting) { + if (connected_ == kHandshaking) { auto shared_this = shared_from_this(); connected_ = kAuthenticating; @@ -407,7 +407,7 @@ void RpcConnection::SendRpcRequests(const std::vector > else auth_requests_.push_back(r); } - if (connected_ == kConnected || connected_ == kAuthenticating) { // Dont flush if we're waiting or handshaking + if (connected_ == kConnected || connected_ == kHandshaking || connected_ == kAuthenticating) { // Dont flush if we're waiting or handshaking FlushPendingRequests(); } } @@ -494,6 +494,7 @@ std::string RpcConnection::ToString(ConnectedState connected) { switch(connected) { case kNotYetConnected: return "NotYetConnected"; case kConnecting: return "Connecting"; + case kHandshaking: return "Handshaking"; case kAuthenticating: return "Authenticating"; case kConnected: return "Connected"; case kDisconnected: return "Disconnected"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index 255b98be801..2b47ce12de9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -59,17 +59,20 @@ public: private: const Options options_; + ::asio::ip::tcp::endpoint current_endpoint_; std::vector<::asio::ip::tcp::endpoint> additional_endpoints_; NextLayer next_layer_; + ::asio::deadline_timer connect_timer_; - void ConnectComplete(const ::asio::error_code &ec); + void ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint &remote); }; template RpcConnectionImpl::RpcConnectionImpl(RpcEngine *engine) : RpcConnection(engine), options_(engine->options()), - next_layer_(engine->io_service()) { + next_layer_(engine->io_service()), + connect_timer_(engine->io_service()) { LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called"); } @@ -129,20 +132,43 @@ void RpcConnectionImpl::ConnectAndFlush( additional_endpoints_ = server; ::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front(); additional_endpoints_.erase(additional_endpoints_.begin()); + current_endpoint_ = first_endpoint; auto shared_this = shared_from_this(); - next_layer_.async_connect(first_endpoint, [shared_this, this](const ::asio::error_code &ec) { - ConnectComplete(ec); + next_layer_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code &ec) { + ConnectComplete(ec, first_endpoint); + }); + + // Prompt the timer to timeout + auto weak_this = std::weak_ptr(shared_this); + connect_timer_.expires_from_now( + std::chrono::milliseconds(options_.rpc_connect_timeout)); + connect_timer_.async_wait([shared_this, this, first_endpoint](const ::asio::error_code &ec) { + if (ec) + ConnectComplete(ec, first_endpoint); + else + ConnectComplete(make_error_code(asio::error::host_unreachable), first_endpoint); }); } template -void RpcConnectionImpl::ConnectComplete(const ::asio::error_code &ec) { +void RpcConnectionImpl::ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint & remote) { auto shared_this = RpcConnectionImpl::shared_from_this(); std::lock_guard state_lock(connection_state_lock_); + connect_timer_.cancel(); LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called"); + // Could be an old async connect returning a result after we've moved on + if (remote != current_endpoint_) { + LOG_DEBUG(kRPC, << "Got ConnectComplete for " << remote << " but current_endpoint_ is " << current_endpoint_); + return; + } + if (connected_ != kConnecting) { + LOG_DEBUG(kRPC, << "Got ConnectComplete but current state is " << connected_);; + return; + } + Status status = ToStatus(ec); if(event_handlers_) { auto event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0); @@ -159,6 +185,7 @@ void RpcConnectionImpl::ConnectComplete(const ::asio::error_code &ec) HandshakeComplete(s); }); } else { + LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());; std::string err = SafeDisconnect(get_asio_socket_ptr(&next_layer_)); if(!err.empty()) { LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error closing connection: " << err); @@ -169,10 +196,19 @@ void RpcConnectionImpl::ConnectComplete(const ::asio::error_code &ec) // hit one ::asio::ip::tcp::endpoint next_endpoint = additional_endpoints_.front(); additional_endpoints_.erase(additional_endpoints_.begin()); + current_endpoint_ = next_endpoint; - next_layer_.async_connect(next_endpoint, [shared_this, this](const ::asio::error_code &ec) { - ConnectComplete(ec); + next_layer_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code &ec) { + ConnectComplete(ec, next_endpoint); }); + connect_timer_.expires_from_now( + std::chrono::milliseconds(options_.rpc_connect_timeout)); + connect_timer_.async_wait([shared_this, this, next_endpoint](const ::asio::error_code &ec) { + if (ec) + ConnectComplete(ec, next_endpoint); + else + ConnectComplete(make_error_code(asio::error::host_unreachable), next_endpoint); + }); } else { CommsError(status); } @@ -184,6 +220,7 @@ void RpcConnectionImpl::SendHandshake(RpcCallback &handler) { assert(lock_held(connection_state_lock_)); // Must be holding lock before calling LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called"); + connected_ = kHandshaking; auto shared_this = shared_from_this(); auto handshake_packet = PrepareHandshakePacket(); @@ -250,6 +287,8 @@ void RpcConnectionImpl::FlushPendingRequests() { return; case kConnecting: return; + case kHandshaking: + return; case kAuthenticating: if (auth_requests_.empty()) { return; @@ -379,7 +418,7 @@ void RpcConnectionImpl::Disconnect() { LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called"); request_over_the_wire_.reset(); - if (connected_ == kConnecting || connected_ == kAuthenticating || connected_ == kConnected) { + if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == kAuthenticating || connected_ == kConnected) { // Don't print out errors, we were expecting a disconnect here SafeDisconnect(get_asio_socket_ptr(&next_layer_)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc index f8df97f154b..a8438b1a07b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -197,7 +197,7 @@ void RpcEngine::RpcCommsError( if (head_action->delayMillis > 0) { auto weak_conn = std::weak_ptr(conn_); retry_timer.expires_from_now( - std::chrono::milliseconds(options_.rpc_retry_delay_ms)); + std::chrono::milliseconds(head_action->delayMillis)); retry_timer.async_wait([this, weak_conn](asio::error_code ec) { auto strong_conn = weak_conn.lock(); if ( (!ec) && (strong_conn) ) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h index 066c01f56e4..d0365c3b43f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h @@ -207,6 +207,7 @@ class RpcConnection : public std::enable_shared_from_this { enum ConnectedState { kNotYetConnected, kConnecting, + kHandshaking, kAuthenticating, kConnected, kDisconnected diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc index 035c044a2b4..7e9ca666cc9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc @@ -41,17 +41,25 @@ TEST(HdfsConfigurationTest, TestSetOptions) // Completely empty stream { std::stringstream stream; - simpleConfigStream(stream, HdfsConfiguration::kDfsClientSocketTimeoutKey, 100, - HdfsConfiguration::kIpcClientConnectMaxRetriesKey, 101, - HdfsConfiguration::kIpcClientConnectRetryIntervalKey, 102); + simpleConfigStream(stream, + HdfsConfiguration::kFsDefaultFsKey, "/FDFK", + HdfsConfiguration::kDfsClientSocketTimeoutKey, 100, + HdfsConfiguration::kIpcClientConnectMaxRetriesKey, 101, + HdfsConfiguration::kIpcClientConnectRetryIntervalKey, 102, + HdfsConfiguration::kIpcClientConnectTimeoutKey, 103, + HdfsConfiguration::kHadoopSecurityAuthenticationKey, HdfsConfiguration::kHadoopSecurityAuthentication_kerberos + ); optional config = ConfigurationLoader().Load(stream.str()); EXPECT_TRUE(config && "Read stream"); Options options = config->GetOptions(); + EXPECT_EQ("/FDFK", options.defaultFS.str()); EXPECT_EQ(100, options.rpc_timeout); EXPECT_EQ(101, options.max_rpc_retries); EXPECT_EQ(102, options.rpc_retry_delay_ms); + EXPECT_EQ(103, options.rpc_connect_timeout); + EXPECT_EQ(Options::kKerberos, options.authentication); } }