diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h index a6616c69b75..da63435c08d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h @@ -82,6 +82,34 @@ bool lock_held(T & mutex) { // Returns a string containing error message on failure, otherwise an empty string. std::string SafeDisconnect(asio::ip::tcp::socket *sock); + + +// The following helper function is used for classes that look like the following: +// +// template +// class ObjectThatHoldsSocket { +// socket_like_object sock_; +// void DoSomethingWithAsioTcpSocket(); +// } +// +// The trick here is that ObjectThatHoldsSocket may be templated on a mock socket +// in mock tests. If you have a method that explicitly needs to call some asio +// method unrelated to the mock test you need a way of making sure socket_like_object +// is, in fact, an asio::ip::tcp::socket. Otherwise the mocks need to implement +// lots of no-op boilerplate. This will return the value of the input param if +// it's a asio socket, and nullptr if it's anything else. + +template +inline asio::ip::tcp::socket *get_asio_socket_ptr(sock_t *s) { + (void)s; + return nullptr; +} +template<> +inline asio::ip::tcp::socket *get_asio_socket_ptr + (asio::ip::tcp::socket *s) { + return s; +} + } #endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc index acc80c9e533..27cd6663be0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc @@ -56,8 +56,13 @@ void DataNodeConnectionImpl::Connect( } void DataNodeConnectionImpl::Cancel() { - mutex_guard state_lock(state_lock_); - std::string err = SafeDisconnect(conn_.get()); + std::string err; + + { // scope the lock for disconnect only, log has it's own lock + mutex_guard state_lock(state_lock_); + err = SafeDisconnect(conn_.get()); + } + if(!err.empty()) { LOG_WARN(kBlockReader, << "Error disconnecting socket in DataNodeConnectionImpl::Cancel, " << err); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h index 4c33a4120a2..d7b1c2faec0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -151,7 +151,11 @@ void RpcConnectionImpl::ConnectComplete(const ::asio::error_code &ec) HandshakeComplete(s); }); } else { - next_layer_.close(); + std::string err = SafeDisconnect(get_asio_socket_ptr(&next_layer_)); + if(!err.empty()) { + LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error closing connection: " << err); + } + if (!additional_endpoints_.empty()) { // If we have additional endpoints, keep trying until we either run out or // hit one @@ -355,8 +359,8 @@ void RpcConnectionImpl::Disconnect() { request_over_the_wire_.reset(); if (connected_ == kConnecting || connected_ == kConnected) { - next_layer_.cancel(); - next_layer_.close(); + // Don't print out errors, we were expecting a disconnect here + SafeDisconnect(get_asio_socket_ptr(&next_layer_)); } connected_ = kDisconnected; }