diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc index 7a4b4cf33ef..8f6e77a5395 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc @@ -26,7 +26,7 @@ #include #include #include - +#include namespace hdfs { @@ -73,18 +73,20 @@ int DelimitedPBMessageSize(const ::google::protobuf::MessageLite *msg) { return ::google::protobuf::io::CodedOutputStream::VarintSize32(size) + size; } -std::string GetRandomClientName() { +std::shared_ptr GetRandomClientName() { std::vectorbuf(8); - RAND_pseudo_bytes(&buf[0], 8); + if (RAND_bytes(&buf[0], static_cast(buf.size())) != 1) { + return nullptr; + } std::ostringstream oss; oss << "DFSClient_" << getpid() << "_" << std::this_thread::get_id() << "_" << std::setw(2) << std::hex << std::uppercase << std::setfill('0'); - for (unsigned char b: buf) + for (auto b : buf) { oss << static_cast(b); - - return oss.str(); + } + return std::make_shared(oss.str()); } std::string Base64Encode(const std::string &src) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h index a7f4f958e79..140f66e8482 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h @@ -22,6 +22,7 @@ #include "common/logging.h" #include +#include #include #include @@ -61,7 +62,7 @@ std::string SerializeDelimitedProtobufMessage(const ::google::protobuf::MessageL std::string Base64Encode(const std::string &src); // Return a new high-entropy client name -std::string GetRandomClientName(); +std::shared_ptr GetRandomClientName(); // Returns true if _someone_ is holding the lock (not necessarily this thread, // but a std::mutex doesn't track which thread is holding the lock) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc index 169def364b7..7c9e24c0d88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc @@ -24,6 +24,8 @@ #include "hdfspp/events.h" #include +#include +#include #include #include @@ -38,7 +40,7 @@ FileHandle::~FileHandle() {} FileHandleImpl::FileHandleImpl(const std::string & cluster_name, const std::string & path, - std::shared_ptr io_service, const std::string &client_name, + std::shared_ptr io_service, const std::shared_ptr &client_name, const std::shared_ptr file_info, std::shared_ptr bad_data_nodes, std::shared_ptr event_handlers) @@ -191,6 +193,11 @@ void FileHandleImpl::AsyncPreadSome( return; } + if (client_name_ == nullptr) { + handler(Status::Error("AsyncPreadSome: Unable to generate random client name"), "", 0); + return; + } + /** * Note: block and chosen_dn will end up pointing to things inside * the blocks_ vector. They shouldn't be directly deleted. @@ -245,7 +252,7 @@ void FileHandleImpl::AsyncPreadSome( // steal the FileHandle's dn and put it back when we're done std::shared_ptr dn = CreateDataNodeConnection(io_service_, chosen_dn, &block->blocktoken()); std::string dn_id = dn->uuid_; - std::string client_name = client_name_; + std::string client_name = *client_name_; // Wrap the DN in a block reader to handle the state and logic of the // block request protocol diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h index 57da237f977..724b1a14bc2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h @@ -29,7 +29,9 @@ #include "bad_datanode_tracker.h" #include "ClientNamenodeProtocol.pb.h" +#include #include +#include namespace hdfs { @@ -51,10 +53,11 @@ public: MEMCHECKED_CLASS(FileHandleImpl) FileHandleImpl(const std::string & cluster_name, const std::string & path, - std::shared_ptr io_service, const std::string &client_name, - const std::shared_ptr file_info, - std::shared_ptr bad_data_nodes, - std::shared_ptr event_handlers); + std::shared_ptr io_service, + const std::shared_ptr &client_name, + const std::shared_ptr file_info, + std::shared_ptr bad_data_nodes, + std::shared_ptr event_handlers); /* * Reads the file at the specified offset into the buffer. @@ -129,7 +132,7 @@ private: const std::string cluster_name_; const std::string path_; std::shared_ptr io_service_; - const std::string client_name_; + const std::shared_ptr client_name_; const std::shared_ptr file_info_; std::shared_ptr bad_node_tracker_; bool CheckSeekBounds(ssize_t desired_position); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h index 935e7c96c7b..7fdb6a1e3f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -23,6 +23,7 @@ #include "hdfspp/hdfspp.h" #include "reader/fileinfo.h" +#include #include namespace hdfs { @@ -217,7 +218,7 @@ private: **/ std::shared_ptr io_service_; const Options options_; - const std::string client_name_; + const std::shared_ptr client_name_; std::string cluster_name_; NameNodeOperations nn_; std::shared_ptr bad_node_tracker_; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h index 3470a48b3c7..445aa08653d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h @@ -26,6 +26,8 @@ #include "ClientNamenodeProtocol.pb.h" #include "ClientNamenodeProtocol.hrpc.inl" +#include +#include namespace hdfs { @@ -43,7 +45,7 @@ class NameNodeOperations { public: MEMCHECKED_CLASS(NameNodeOperations) NameNodeOperations(std::shared_ptr io_service, const Options &options, - const std::string &client_name, const std::string &user_name, + const std::shared_ptr &client_name, const std::string &user_name, const char *protocol_name, int protocol_version) : io_service_(io_service), engine_(std::make_shared(io_service, options, client_name, user_name, protocol_name, protocol_version)), diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc index 82fdfeb033d..a5de92e61bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.cc @@ -306,13 +306,19 @@ std::shared_ptr RpcConnection::PrepareContextPacket() { return std::make_shared(); } + const auto& client_name = pinnedEngine->client_name(); + if (client_name == nullptr) { + LOG_ERROR(kRPC, << "RpcConnection@" << this << " unable to generate random client name"); + return std::make_shared(); + } + std::shared_ptr serializedPacketBuffer = std::make_shared(); RpcRequestHeaderProto headerProto; headerProto.set_rpckind(RPC_PROTOCOL_BUFFER); headerProto.set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET); headerProto.set_callid(RpcEngine::kCallIdConnectionContext); - headerProto.set_clientid(pinnedEngine->client_name()); + headerProto.set_clientid(*client_name); IpcConnectionContextProto handshakeContextProto; handshakeContextProto.set_protocol(pinnedEngine->protocol_name()); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc index 065dffad96b..e3274cb88aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -36,7 +37,7 @@ using optional = std::experimental::optional; RpcEngine::RpcEngine(std::shared_ptr io_service, const Options &options, - const std::string &client_name, const std::string &user_name, + const std::shared_ptr &client_name, const std::string &user_name, const char *protocol_name, int protocol_version) : io_service_(io_service), options_(options), diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h index 3bf7dcabda0..1445a1860de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h @@ -38,6 +38,7 @@ #include #include #include +#include namespace hdfs { @@ -79,7 +80,7 @@ public: virtual const RetryPolicy *retry_policy() = 0; virtual int NextCallId() = 0; - virtual const std::string &client_name() = 0; + virtual const std::shared_ptr &client_name() = 0; virtual const std::unique_ptr &client_id() = 0; virtual const std::string &user_name() = 0; virtual const std::string &protocol_name() = 0; @@ -109,7 +110,7 @@ class RpcEngine : public LockFreeRpcEngine, public std::enable_shared_from_this< }; RpcEngine(std::shared_ptr service, const Options &options, - const std::string &client_name, const std::string &user_name, + const std::shared_ptr &client_name, const std::string &user_name, const char *protocol_name, int protocol_version); void Connect(const std::string & cluster_name, @@ -141,7 +142,7 @@ class RpcEngine : public LockFreeRpcEngine, public std::enable_shared_from_this< void TEST_SetRetryPolicy(std::unique_ptr policy); std::unique_ptr TEST_GenerateRetryPolicyUsingOptions(); - const std::string &client_name() override { return client_name_; } + const std::shared_ptr &client_name() override { return client_name_; } const std::unique_ptr &client_id() override { return client_id_; } const std::string &user_name() override { return auth_info_.getUser(); } const std::string &protocol_name() override { return protocol_name_; } @@ -165,7 +166,7 @@ protected: private: mutable std::shared_ptr io_service_; const Options options_; - const std::string client_name_; + const std::shared_ptr client_name_; const std::unique_ptr client_id_; const std::string protocol_name_; const int protocol_version_; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc index 5417af8f4cf..911f7cae28a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc @@ -23,11 +23,16 @@ #include "fs/bad_datanode_tracker.h" #include "reader/block_reader.h" +#include #include +#include +#include #include #include + + using hadoop::common::TokenProto; using hadoop::hdfs::DatanodeInfoProto; using hadoop::hdfs::DatanodeIDProto; @@ -139,7 +144,10 @@ TEST(BadDataNodeTest, TestNoNodes) { auto monitors = std::make_shared(); bad_node_tracker->AddBadNode("foo"); - PartialMockFileHandle is("cluster", "file", io_service, GetRandomClientName(), file_info, bad_node_tracker, monitors); + const auto client_name = GetRandomClientName(); + ASSERT_NE(client_name, nullptr); + + PartialMockFileHandle is("cluster", "file", io_service, client_name, file_info, bad_node_tracker, monitors); Status stat; size_t read = 0; @@ -195,7 +203,11 @@ TEST(BadDataNodeTest, NNEventCallback) { return event_response::make_ok(); }); - PartialMockFileHandle is("cluster", "file", io_service, GetRandomClientName(), file_info, tracker, monitors); + + const auto client_name = GetRandomClientName(); + ASSERT_NE(client_name, nullptr); + + PartialMockFileHandle is("cluster", "file", io_service, client_name, file_info, tracker, monitors); Status stat; size_t read = 0; @@ -241,7 +253,11 @@ TEST(BadDataNodeTest, RecoverableError) { std::shared_ptr io_service = IoService::MakeShared(); auto tracker = std::make_shared(); auto monitors = std::make_shared(); - PartialMockFileHandle is("cluster", "file", io_service, GetRandomClientName(), file_info, tracker, monitors); + + const auto client_name = GetRandomClientName(); + ASSERT_NE(client_name, nullptr); + + PartialMockFileHandle is("cluster", "file", io_service, client_name, file_info, tracker, monitors); Status stat; size_t read = 0; EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_)) @@ -292,7 +308,11 @@ TEST(BadDataNodeTest, InternalError) { std::shared_ptr io_service = IoService::MakeShared(); auto tracker = std::make_shared(); auto monitors = std::make_shared(); - PartialMockFileHandle is("cluster", "file", io_service, GetRandomClientName(), file_info, tracker, monitors); + + const auto client_name = GetRandomClientName(); + ASSERT_NE(client_name, nullptr); + + PartialMockFileHandle is("cluster", "file", io_service, client_name, file_info, tracker, monitors); Status stat; size_t read = 0; EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_)) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc index dfee686b602..4843da91786 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc @@ -27,8 +27,10 @@ #include #include -#include #include +#include +#include +#include #include #include #include @@ -165,8 +167,10 @@ TEST(RemoteBlockReaderTest, TestReadSingleTrunk) { EXPECT_CALL(reader, AsyncReadPacket(_, _)) .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf))); + const auto client_name = GetRandomClientName(); + ASSERT_NE(client_name, nullptr); reader.AsyncReadBlock( - GetRandomClientName(), block, 0, boost::asio::buffer(buf, sizeof(buf)), + *client_name, block, 0, boost::asio::buffer(buf, sizeof(buf)), [&stat, &read](const Status &status, size_t transferred) { stat = status; read = transferred; @@ -192,8 +196,10 @@ TEST(RemoteBlockReaderTest, TestReadMultipleTrunk) { .Times(4) .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)); + const auto client_name = GetRandomClientName(); + ASSERT_NE(client_name, nullptr); reader.AsyncReadBlock( - GetRandomClientName(), block, 0, boost::asio::buffer(buf, sizeof(buf)), + *client_name, block, 0, boost::asio::buffer(buf, sizeof(buf)), [&stat, &read](const Status &status, size_t transferred) { stat = status; read = transferred; @@ -220,8 +226,10 @@ TEST(RemoteBlockReaderTest, TestReadError) { .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) .WillOnce(InvokeArgument<1>(Status::Error("error"), 0)); + const auto client_name = GetRandomClientName(); + ASSERT_NE(client_name, nullptr); reader.AsyncReadBlock( - GetRandomClientName(), block, 0, boost::asio::buffer(buf, sizeof(buf)), + *client_name, block, 0, boost::asio::buffer(buf, sizeof(buf)), [&stat, &read](const Status &status, size_t transferred) { stat = status; read = transferred; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc index 744e7eba16d..931f873d6de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc @@ -24,8 +24,14 @@ #include "rpc/rpc_connection_impl.h" #include "common/namenode_info.h" +#include +#include + #include +#include #include +#include +#include #include #include @@ -108,7 +114,9 @@ TEST(RpcEngineTest, TestRoundTrip) { std::shared_ptr io_service = IoService::MakeShared(); Options options; - std::shared_ptr engine = std::make_shared(io_service, options, "foo", "", "protocol", 1); + auto engine = std::make_shared( + io_service, options, std::make_shared("foo"), "", "protocol", + 1); auto conn = std::make_shared >(engine); conn->TEST_set_connected(true); @@ -144,7 +152,9 @@ TEST(RpcEngineTest, TestRoundTrip) { TEST(RpcEngineTest, TestConnectionResetAndFail) { std::shared_ptr io_service = IoService::MakeShared(); Options options; - std::shared_ptr engine = std::make_shared(io_service, options, "foo", "", "protocol", 1); + auto engine = std::make_shared( + io_service, options, std::make_shared("foo"), "", "protocol", + 1); auto conn = std::make_shared >(engine); conn->TEST_set_connected(true); @@ -181,8 +191,9 @@ TEST(RpcEngineTest, TestConnectionResetAndRecover) { Options options; options.max_rpc_retries = 1; options.rpc_retry_delay_ms = 0; - std::shared_ptr engine - = std::make_shared(io_service, options, "foo", "", "protocol", 1); + auto engine = std::make_shared( + io_service, options, std::make_shared("foo"), "", "protocol", + 1); // Normally determined during RpcEngine::Connect, but in this case options // provides enough info to determine policy here. @@ -222,8 +233,9 @@ TEST(RpcEngineTest, TestConnectionResetAndRecoverWithDelay) { Options options; options.max_rpc_retries = 1; options.rpc_retry_delay_ms = 1; - std::shared_ptr engine = - std::make_shared(io_service, options, "foo", "", "protocol", 1); + auto engine = std::make_shared( + io_service, options, std::make_shared("foo"), "", "protocol", + 1); // Normally determined during RpcEngine::Connect, but in this case options // provides enough info to determine policy here. @@ -276,8 +288,10 @@ TEST(RpcEngineTest, TestConnectionFailure) Options options; options.max_rpc_retries = 0; options.rpc_retry_delay_ms = 0; - std::shared_ptr engine - = std::make_shared(io_service, options, "foo", "", "protocol", 1); + auto engine = std::make_shared( + io_service, options, std::make_shared("foo"), "", "protocol", + 1); + EXPECT_CALL(*producer, Produce()) .WillOnce(Return(std::make_pair(make_error_code(boost::asio::error::connection_reset), ""))); @@ -303,8 +317,9 @@ TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure) Options options; options.max_rpc_retries = 2; options.rpc_retry_delay_ms = 0; - std::shared_ptr engine = - std::make_shared(io_service, options, "foo", "", "protocol", 1); + auto engine = std::make_shared( + io_service, options, std::make_shared("foo"), "", "protocol", + 1); EXPECT_CALL(*producer, Produce()) .WillOnce(Return(std::make_pair(make_error_code(boost::asio::error::connection_reset), ""))) .WillOnce(Return(std::make_pair(make_error_code(boost::asio::error::connection_reset), ""))) @@ -332,8 +347,9 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover) Options options; options.max_rpc_retries = 1; options.rpc_retry_delay_ms = 0; - std::shared_ptr engine = - std::make_shared(io_service, options, "foo", "", "protocol", 1); + auto engine = std::make_shared( + io_service, options, std::make_shared("foo"), "", "protocol", + 1); EXPECT_CALL(*producer, Produce()) .WillOnce(Return(std::make_pair(make_error_code(boost::asio::error::connection_reset), ""))) .WillOnce(Return(std::make_pair(boost::system::error_code(), ""))) @@ -355,8 +371,9 @@ TEST(RpcEngineTest, TestEventCallbacks) Options options; options.max_rpc_retries = 99; options.rpc_retry_delay_ms = 0; - std::shared_ptr engine = - std::make_shared(io_service, options, "foo", "", "protocol", 1); + auto engine = std::make_shared( + io_service, options, std::make_shared("foo"), "", "protocol", + 1); // Normally determined during RpcEngine::Connect, but in this case options // provides enough info to determine policy here. @@ -441,8 +458,9 @@ TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover) Options options; options.max_rpc_retries = 1; options.rpc_retry_delay_ms = 1; - std::shared_ptr engine = - std::make_shared(io_service, options, "foo", "", "protocol", 1); + auto engine = std::make_shared( + io_service, options, std::make_shared("foo"), "", "protocol", + 1); EXPECT_CALL(*producer, Produce()) .WillOnce(Return(std::make_pair(make_error_code(boost::asio::error::connection_reset), ""))) .WillOnce(Return(std::make_pair(boost::system::error_code(), ""))) @@ -466,7 +484,9 @@ TEST(RpcEngineTest, TestTimeout) { std::shared_ptr io_service = IoService::MakeShared(); Options options; options.rpc_timeout = 1; - std::shared_ptr engine = std::make_shared(io_service, options, "foo", "", "protocol", 1); + auto engine = std::make_shared( + io_service, options, std::make_shared("foo"), "", "protocol", + 1); auto conn = std::make_shared >(engine); conn->TEST_set_connected(true);