HDFS-15929. Replace RAND_pseudo_bytes in util.cc (#2826)

This commit is contained in:
Gautham B A 2021-03-31 02:06:02 +05:30 committed by GitHub
parent 8668abf87e
commit 7dfff496fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 119 additions and 47 deletions

View File

@ -26,7 +26,7 @@
#include <sstream> #include <sstream>
#include <iomanip> #include <iomanip>
#include <thread> #include <thread>
#include <memory>
namespace hdfs { namespace hdfs {
@ -73,18 +73,20 @@ int DelimitedPBMessageSize(const ::google::protobuf::MessageLite *msg) {
return ::google::protobuf::io::CodedOutputStream::VarintSize32(size) + size; return ::google::protobuf::io::CodedOutputStream::VarintSize32(size) + size;
} }
std::string GetRandomClientName() { std::shared_ptr<std::string> GetRandomClientName() {
std::vector<unsigned char>buf(8); std::vector<unsigned char>buf(8);
RAND_pseudo_bytes(&buf[0], 8); if (RAND_bytes(&buf[0], static_cast<int>(buf.size())) != 1) {
return nullptr;
}
std::ostringstream oss; std::ostringstream oss;
oss << "DFSClient_" << getpid() << "_" << oss << "DFSClient_" << getpid() << "_" <<
std::this_thread::get_id() << "_" << std::this_thread::get_id() << "_" <<
std::setw(2) << std::hex << std::uppercase << std::setfill('0'); std::setw(2) << std::hex << std::uppercase << std::setfill('0');
for (unsigned char b: buf) for (auto b : buf) {
oss << static_cast<unsigned>(b); oss << static_cast<unsigned>(b);
}
return oss.str(); return std::make_shared<std::string>(oss.str());
} }
std::string Base64Encode(const std::string &src) { std::string Base64Encode(const std::string &src) {

View File

@ -22,6 +22,7 @@
#include "common/logging.h" #include "common/logging.h"
#include <mutex> #include <mutex>
#include <memory>
#include <string> #include <string>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
@ -61,7 +62,7 @@ std::string SerializeDelimitedProtobufMessage(const ::google::protobuf::MessageL
std::string Base64Encode(const std::string &src); std::string Base64Encode(const std::string &src);
// Return a new high-entropy client name // Return a new high-entropy client name
std::string GetRandomClientName(); std::shared_ptr<std::string> GetRandomClientName();
// Returns true if _someone_ is holding the lock (not necessarily this thread, // Returns true if _someone_ is holding the lock (not necessarily this thread,
// but a std::mutex doesn't track which thread is holding the lock) // but a std::mutex doesn't track which thread is holding the lock)

View File

@ -24,6 +24,8 @@
#include "hdfspp/events.h" #include "hdfspp/events.h"
#include <future> #include <future>
#include <memory>
#include <string>
#include <tuple> #include <tuple>
#include <boost/asio/buffer.hpp> #include <boost/asio/buffer.hpp>
@ -38,7 +40,7 @@ FileHandle::~FileHandle() {}
FileHandleImpl::FileHandleImpl(const std::string & cluster_name, FileHandleImpl::FileHandleImpl(const std::string & cluster_name,
const std::string & path, const std::string & path,
std::shared_ptr<IoService> io_service, const std::string &client_name, std::shared_ptr<IoService> io_service, const std::shared_ptr<std::string> &client_name,
const std::shared_ptr<const struct FileInfo> file_info, const std::shared_ptr<const struct FileInfo> file_info,
std::shared_ptr<BadDataNodeTracker> bad_data_nodes, std::shared_ptr<BadDataNodeTracker> bad_data_nodes,
std::shared_ptr<LibhdfsEvents> event_handlers) std::shared_ptr<LibhdfsEvents> event_handlers)
@ -191,6 +193,11 @@ void FileHandleImpl::AsyncPreadSome(
return; return;
} }
if (client_name_ == nullptr) {
handler(Status::Error("AsyncPreadSome: Unable to generate random client name"), "", 0);
return;
}
/** /**
* Note: block and chosen_dn will end up pointing to things inside * Note: block and chosen_dn will end up pointing to things inside
* the blocks_ vector. They shouldn't be directly deleted. * the blocks_ vector. They shouldn't be directly deleted.
@ -245,7 +252,7 @@ void FileHandleImpl::AsyncPreadSome(
// steal the FileHandle's dn and put it back when we're done // steal the FileHandle's dn and put it back when we're done
std::shared_ptr<DataNodeConnection> dn = CreateDataNodeConnection(io_service_, chosen_dn, &block->blocktoken()); std::shared_ptr<DataNodeConnection> dn = CreateDataNodeConnection(io_service_, chosen_dn, &block->blocktoken());
std::string dn_id = dn->uuid_; std::string dn_id = dn->uuid_;
std::string client_name = client_name_; std::string client_name = *client_name_;
// Wrap the DN in a block reader to handle the state and logic of the // Wrap the DN in a block reader to handle the state and logic of the
// block request protocol // block request protocol

View File

@ -29,7 +29,9 @@
#include "bad_datanode_tracker.h" #include "bad_datanode_tracker.h"
#include "ClientNamenodeProtocol.pb.h" #include "ClientNamenodeProtocol.pb.h"
#include <memory>
#include <mutex> #include <mutex>
#include <string>
namespace hdfs { namespace hdfs {
@ -51,10 +53,11 @@ public:
MEMCHECKED_CLASS(FileHandleImpl) MEMCHECKED_CLASS(FileHandleImpl)
FileHandleImpl(const std::string & cluster_name, FileHandleImpl(const std::string & cluster_name,
const std::string & path, const std::string & path,
std::shared_ptr<IoService> io_service, const std::string &client_name, std::shared_ptr<IoService> io_service,
const std::shared_ptr<const struct FileInfo> file_info, const std::shared_ptr<std::string> &client_name,
std::shared_ptr<BadDataNodeTracker> bad_data_nodes, const std::shared_ptr<const struct FileInfo> file_info,
std::shared_ptr<LibhdfsEvents> event_handlers); std::shared_ptr<BadDataNodeTracker> bad_data_nodes,
std::shared_ptr<LibhdfsEvents> event_handlers);
/* /*
* Reads the file at the specified offset into the buffer. * Reads the file at the specified offset into the buffer.
@ -129,7 +132,7 @@ private:
const std::string cluster_name_; const std::string cluster_name_;
const std::string path_; const std::string path_;
std::shared_ptr<IoService> io_service_; std::shared_ptr<IoService> io_service_;
const std::string client_name_; const std::shared_ptr<std::string> client_name_;
const std::shared_ptr<const struct FileInfo> file_info_; const std::shared_ptr<const struct FileInfo> file_info_;
std::shared_ptr<BadDataNodeTracker> bad_node_tracker_; std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
bool CheckSeekBounds(ssize_t desired_position); bool CheckSeekBounds(ssize_t desired_position);

View File

@ -23,6 +23,7 @@
#include "hdfspp/hdfspp.h" #include "hdfspp/hdfspp.h"
#include "reader/fileinfo.h" #include "reader/fileinfo.h"
#include <memory>
#include <thread> #include <thread>
namespace hdfs { namespace hdfs {
@ -217,7 +218,7 @@ private:
**/ **/
std::shared_ptr<IoService> io_service_; std::shared_ptr<IoService> io_service_;
const Options options_; const Options options_;
const std::string client_name_; const std::shared_ptr<std::string> client_name_;
std::string cluster_name_; std::string cluster_name_;
NameNodeOperations nn_; NameNodeOperations nn_;
std::shared_ptr<BadDataNodeTracker> bad_node_tracker_; std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;

View File

@ -26,6 +26,8 @@
#include "ClientNamenodeProtocol.pb.h" #include "ClientNamenodeProtocol.pb.h"
#include "ClientNamenodeProtocol.hrpc.inl" #include "ClientNamenodeProtocol.hrpc.inl"
#include <memory>
#include <string>
namespace hdfs { namespace hdfs {
@ -43,7 +45,7 @@ class NameNodeOperations {
public: public:
MEMCHECKED_CLASS(NameNodeOperations) MEMCHECKED_CLASS(NameNodeOperations)
NameNodeOperations(std::shared_ptr<IoService> io_service, const Options &options, NameNodeOperations(std::shared_ptr<IoService> io_service, const Options &options,
const std::string &client_name, const std::string &user_name, const std::shared_ptr<std::string> &client_name, const std::string &user_name,
const char *protocol_name, int protocol_version) : const char *protocol_name, int protocol_version) :
io_service_(io_service), io_service_(io_service),
engine_(std::make_shared<RpcEngine>(io_service, options, client_name, user_name, protocol_name, protocol_version)), engine_(std::make_shared<RpcEngine>(io_service, options, client_name, user_name, protocol_name, protocol_version)),

View File

@ -306,13 +306,19 @@ std::shared_ptr<std::string> RpcConnection::PrepareContextPacket() {
return std::make_shared<std::string>(); return std::make_shared<std::string>();
} }
const auto& client_name = pinnedEngine->client_name();
if (client_name == nullptr) {
LOG_ERROR(kRPC, << "RpcConnection@" << this << " unable to generate random client name");
return std::make_shared<std::string>();
}
std::shared_ptr<std::string> serializedPacketBuffer = std::make_shared<std::string>(); std::shared_ptr<std::string> serializedPacketBuffer = std::make_shared<std::string>();
RpcRequestHeaderProto headerProto; RpcRequestHeaderProto headerProto;
headerProto.set_rpckind(RPC_PROTOCOL_BUFFER); headerProto.set_rpckind(RPC_PROTOCOL_BUFFER);
headerProto.set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET); headerProto.set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
headerProto.set_callid(RpcEngine::kCallIdConnectionContext); headerProto.set_callid(RpcEngine::kCallIdConnectionContext);
headerProto.set_clientid(pinnedEngine->client_name()); headerProto.set_clientid(*client_name);
IpcConnectionContextProto handshakeContextProto; IpcConnectionContextProto handshakeContextProto;
handshakeContextProto.set_protocol(pinnedEngine->protocol_name()); handshakeContextProto.set_protocol(pinnedEngine->protocol_name());

View File

@ -24,6 +24,7 @@
#include <algorithm> #include <algorithm>
#include <memory> #include <memory>
#include <string>
#include <boost/date_time/posix_time/posix_time_duration.hpp> #include <boost/date_time/posix_time/posix_time_duration.hpp>
#include <openssl/rand.h> #include <openssl/rand.h>
@ -36,7 +37,7 @@ using optional = std::experimental::optional<T>;
RpcEngine::RpcEngine(std::shared_ptr<IoService> io_service, const Options &options, RpcEngine::RpcEngine(std::shared_ptr<IoService> io_service, const Options &options,
const std::string &client_name, const std::string &user_name, const std::shared_ptr<std::string> &client_name, const std::string &user_name,
const char *protocol_name, int protocol_version) const char *protocol_name, int protocol_version)
: io_service_(io_service), : io_service_(io_service),
options_(options), options_(options),

View File

@ -38,6 +38,7 @@
#include <memory> #include <memory>
#include <vector> #include <vector>
#include <mutex> #include <mutex>
#include <string>
namespace hdfs { namespace hdfs {
@ -79,7 +80,7 @@ public:
virtual const RetryPolicy *retry_policy() = 0; virtual const RetryPolicy *retry_policy() = 0;
virtual int NextCallId() = 0; virtual int NextCallId() = 0;
virtual const std::string &client_name() = 0; virtual const std::shared_ptr<std::string> &client_name() = 0;
virtual const std::unique_ptr<std::string> &client_id() = 0; virtual const std::unique_ptr<std::string> &client_id() = 0;
virtual const std::string &user_name() = 0; virtual const std::string &user_name() = 0;
virtual const std::string &protocol_name() = 0; virtual const std::string &protocol_name() = 0;
@ -109,7 +110,7 @@ class RpcEngine : public LockFreeRpcEngine, public std::enable_shared_from_this<
}; };
RpcEngine(std::shared_ptr<IoService> service, const Options &options, RpcEngine(std::shared_ptr<IoService> service, const Options &options,
const std::string &client_name, const std::string &user_name, const std::shared_ptr<std::string> &client_name, const std::string &user_name,
const char *protocol_name, int protocol_version); const char *protocol_name, int protocol_version);
void Connect(const std::string & cluster_name, void Connect(const std::string & cluster_name,
@ -141,7 +142,7 @@ class RpcEngine : public LockFreeRpcEngine, public std::enable_shared_from_this<
void TEST_SetRetryPolicy(std::unique_ptr<const RetryPolicy> policy); void TEST_SetRetryPolicy(std::unique_ptr<const RetryPolicy> policy);
std::unique_ptr<const RetryPolicy> TEST_GenerateRetryPolicyUsingOptions(); std::unique_ptr<const RetryPolicy> TEST_GenerateRetryPolicyUsingOptions();
const std::string &client_name() override { return client_name_; } const std::shared_ptr<std::string> &client_name() override { return client_name_; }
const std::unique_ptr<std::string> &client_id() override { return client_id_; } const std::unique_ptr<std::string> &client_id() override { return client_id_; }
const std::string &user_name() override { return auth_info_.getUser(); } const std::string &user_name() override { return auth_info_.getUser(); }
const std::string &protocol_name() override { return protocol_name_; } const std::string &protocol_name() override { return protocol_name_; }
@ -165,7 +166,7 @@ protected:
private: private:
mutable std::shared_ptr<IoService> io_service_; mutable std::shared_ptr<IoService> io_service_;
const Options options_; const Options options_;
const std::string client_name_; const std::shared_ptr<std::string> client_name_;
const std::unique_ptr<std::string> client_id_; const std::unique_ptr<std::string> client_id_;
const std::string protocol_name_; const std::string protocol_name_;
const int protocol_version_; const int protocol_version_;

View File

@ -23,11 +23,16 @@
#include "fs/bad_datanode_tracker.h" #include "fs/bad_datanode_tracker.h"
#include "reader/block_reader.h" #include "reader/block_reader.h"
#include <gtest/gtest.h>
#include <gmock/gmock.h> #include <gmock/gmock.h>
#include <gmock/gmock-spec-builders.h>
#include <gmock/gmock-generated-actions.h>
#include <boost/asio/buffer.hpp> #include <boost/asio/buffer.hpp>
#include <boost/asio/error.hpp> #include <boost/asio/error.hpp>
using hadoop::common::TokenProto; using hadoop::common::TokenProto;
using hadoop::hdfs::DatanodeInfoProto; using hadoop::hdfs::DatanodeInfoProto;
using hadoop::hdfs::DatanodeIDProto; using hadoop::hdfs::DatanodeIDProto;
@ -139,7 +144,10 @@ TEST(BadDataNodeTest, TestNoNodes) {
auto monitors = std::make_shared<LibhdfsEvents>(); auto monitors = std::make_shared<LibhdfsEvents>();
bad_node_tracker->AddBadNode("foo"); bad_node_tracker->AddBadNode("foo");
PartialMockFileHandle is("cluster", "file", io_service, GetRandomClientName(), file_info, bad_node_tracker, monitors); const auto client_name = GetRandomClientName();
ASSERT_NE(client_name, nullptr);
PartialMockFileHandle is("cluster", "file", io_service, client_name, file_info, bad_node_tracker, monitors);
Status stat; Status stat;
size_t read = 0; size_t read = 0;
@ -195,7 +203,11 @@ TEST(BadDataNodeTest, NNEventCallback) {
return event_response::make_ok(); return event_response::make_ok();
}); });
PartialMockFileHandle is("cluster", "file", io_service, GetRandomClientName(), file_info, tracker, monitors);
const auto client_name = GetRandomClientName();
ASSERT_NE(client_name, nullptr);
PartialMockFileHandle is("cluster", "file", io_service, client_name, file_info, tracker, monitors);
Status stat; Status stat;
size_t read = 0; size_t read = 0;
@ -241,7 +253,11 @@ TEST(BadDataNodeTest, RecoverableError) {
std::shared_ptr<IoService> io_service = IoService::MakeShared(); std::shared_ptr<IoService> io_service = IoService::MakeShared();
auto tracker = std::make_shared<BadDataNodeTracker>(); auto tracker = std::make_shared<BadDataNodeTracker>();
auto monitors = std::make_shared<LibhdfsEvents>(); auto monitors = std::make_shared<LibhdfsEvents>();
PartialMockFileHandle is("cluster", "file", io_service, GetRandomClientName(), file_info, tracker, monitors);
const auto client_name = GetRandomClientName();
ASSERT_NE(client_name, nullptr);
PartialMockFileHandle is("cluster", "file", io_service, client_name, file_info, tracker, monitors);
Status stat; Status stat;
size_t read = 0; size_t read = 0;
EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_)) EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))
@ -292,7 +308,11 @@ TEST(BadDataNodeTest, InternalError) {
std::shared_ptr<IoService> io_service = IoService::MakeShared(); std::shared_ptr<IoService> io_service = IoService::MakeShared();
auto tracker = std::make_shared<BadDataNodeTracker>(); auto tracker = std::make_shared<BadDataNodeTracker>();
auto monitors = std::make_shared<LibhdfsEvents>(); auto monitors = std::make_shared<LibhdfsEvents>();
PartialMockFileHandle is("cluster", "file", io_service, GetRandomClientName(), file_info, tracker, monitors);
const auto client_name = GetRandomClientName();
ASSERT_NE(client_name, nullptr);
PartialMockFileHandle is("cluster", "file", io_service, client_name, file_info, tracker, monitors);
Status stat; Status stat;
size_t read = 0; size_t read = 0;
EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_)) EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))

View File

@ -27,8 +27,10 @@
#include <google/protobuf/io/coded_stream.h> #include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl.h> #include <google/protobuf/io/zero_copy_stream_impl.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <gmock/gmock.h>
#include <gmock/gmock-spec-builders.h>
#include <gmock/gmock-generated-actions.h>
#include <boost/system/error_code.hpp> #include <boost/system/error_code.hpp>
#include <boost/asio/buffer.hpp> #include <boost/asio/buffer.hpp>
#include <boost/asio/io_service.hpp> #include <boost/asio/io_service.hpp>
@ -165,8 +167,10 @@ TEST(RemoteBlockReaderTest, TestReadSingleTrunk) {
EXPECT_CALL(reader, AsyncReadPacket(_, _)) EXPECT_CALL(reader, AsyncReadPacket(_, _))
.WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf))); .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
const auto client_name = GetRandomClientName();
ASSERT_NE(client_name, nullptr);
reader.AsyncReadBlock( reader.AsyncReadBlock(
GetRandomClientName(), block, 0, boost::asio::buffer(buf, sizeof(buf)), *client_name, block, 0, boost::asio::buffer(buf, sizeof(buf)),
[&stat, &read](const Status &status, size_t transferred) { [&stat, &read](const Status &status, size_t transferred) {
stat = status; stat = status;
read = transferred; read = transferred;
@ -192,8 +196,10 @@ TEST(RemoteBlockReaderTest, TestReadMultipleTrunk) {
.Times(4) .Times(4)
.WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)); .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4));
const auto client_name = GetRandomClientName();
ASSERT_NE(client_name, nullptr);
reader.AsyncReadBlock( reader.AsyncReadBlock(
GetRandomClientName(), block, 0, boost::asio::buffer(buf, sizeof(buf)), *client_name, block, 0, boost::asio::buffer(buf, sizeof(buf)),
[&stat, &read](const Status &status, size_t transferred) { [&stat, &read](const Status &status, size_t transferred) {
stat = status; stat = status;
read = transferred; read = transferred;
@ -220,8 +226,10 @@ TEST(RemoteBlockReaderTest, TestReadError) {
.WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
.WillOnce(InvokeArgument<1>(Status::Error("error"), 0)); .WillOnce(InvokeArgument<1>(Status::Error("error"), 0));
const auto client_name = GetRandomClientName();
ASSERT_NE(client_name, nullptr);
reader.AsyncReadBlock( reader.AsyncReadBlock(
GetRandomClientName(), block, 0, boost::asio::buffer(buf, sizeof(buf)), *client_name, block, 0, boost::asio::buffer(buf, sizeof(buf)),
[&stat, &read](const Status &status, size_t transferred) { [&stat, &read](const Status &status, size_t transferred) {
stat = status; stat = status;
read = transferred; read = transferred;

View File

@ -24,8 +24,14 @@
#include "rpc/rpc_connection_impl.h" #include "rpc/rpc_connection_impl.h"
#include "common/namenode_info.h" #include "common/namenode_info.h"
#include <memory>
#include <string>
#include <google/protobuf/io/coded_stream.h> #include <google/protobuf/io/coded_stream.h>
#include <gtest/gtest.h>
#include <gmock/gmock.h> #include <gmock/gmock.h>
#include <gmock/gmock-spec-builders.h>
#include <gmock/gmock-generated-actions.h>
#include <boost/system/error_code.hpp> #include <boost/system/error_code.hpp>
#include <boost/date_time/posix_time/posix_time_duration.hpp> #include <boost/date_time/posix_time/posix_time_duration.hpp>
@ -108,7 +114,9 @@ TEST(RpcEngineTest, TestRoundTrip) {
std::shared_ptr<IoService> io_service = IoService::MakeShared(); std::shared_ptr<IoService> io_service = IoService::MakeShared();
Options options; Options options;
std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(io_service, options, "foo", "", "protocol", 1); auto engine = std::make_shared<RpcEngine>(
io_service, options, std::make_shared<std::string>("foo"), "", "protocol",
1);
auto conn = auto conn =
std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine); std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine);
conn->TEST_set_connected(true); conn->TEST_set_connected(true);
@ -144,7 +152,9 @@ TEST(RpcEngineTest, TestRoundTrip) {
TEST(RpcEngineTest, TestConnectionResetAndFail) { TEST(RpcEngineTest, TestConnectionResetAndFail) {
std::shared_ptr<IoService> io_service = IoService::MakeShared(); std::shared_ptr<IoService> io_service = IoService::MakeShared();
Options options; Options options;
std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(io_service, options, "foo", "", "protocol", 1); auto engine = std::make_shared<RpcEngine>(
io_service, options, std::make_shared<std::string>("foo"), "", "protocol",
1);
auto conn = auto conn =
std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine); std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine);
conn->TEST_set_connected(true); conn->TEST_set_connected(true);
@ -181,8 +191,9 @@ TEST(RpcEngineTest, TestConnectionResetAndRecover) {
Options options; Options options;
options.max_rpc_retries = 1; options.max_rpc_retries = 1;
options.rpc_retry_delay_ms = 0; options.rpc_retry_delay_ms = 0;
std::shared_ptr<SharedConnectionEngine> engine auto engine = std::make_shared<SharedConnectionEngine>(
= std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1); io_service, options, std::make_shared<std::string>("foo"), "", "protocol",
1);
// Normally determined during RpcEngine::Connect, but in this case options // Normally determined during RpcEngine::Connect, but in this case options
// provides enough info to determine policy here. // provides enough info to determine policy here.
@ -222,8 +233,9 @@ TEST(RpcEngineTest, TestConnectionResetAndRecoverWithDelay) {
Options options; Options options;
options.max_rpc_retries = 1; options.max_rpc_retries = 1;
options.rpc_retry_delay_ms = 1; options.rpc_retry_delay_ms = 1;
std::shared_ptr<SharedConnectionEngine> engine = auto engine = std::make_shared<SharedConnectionEngine>(
std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1); io_service, options, std::make_shared<std::string>("foo"), "", "protocol",
1);
// Normally determined during RpcEngine::Connect, but in this case options // Normally determined during RpcEngine::Connect, but in this case options
// provides enough info to determine policy here. // provides enough info to determine policy here.
@ -276,8 +288,10 @@ TEST(RpcEngineTest, TestConnectionFailure)
Options options; Options options;
options.max_rpc_retries = 0; options.max_rpc_retries = 0;
options.rpc_retry_delay_ms = 0; options.rpc_retry_delay_ms = 0;
std::shared_ptr<SharedConnectionEngine> engine auto engine = std::make_shared<SharedConnectionEngine>(
= std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1); io_service, options, std::make_shared<std::string>("foo"), "", "protocol",
1);
EXPECT_CALL(*producer, Produce()) EXPECT_CALL(*producer, Produce())
.WillOnce(Return(std::make_pair(make_error_code(boost::asio::error::connection_reset), ""))); .WillOnce(Return(std::make_pair(make_error_code(boost::asio::error::connection_reset), "")));
@ -303,8 +317,9 @@ TEST(RpcEngineTest, TestConnectionFailureRetryAndFailure)
Options options; Options options;
options.max_rpc_retries = 2; options.max_rpc_retries = 2;
options.rpc_retry_delay_ms = 0; options.rpc_retry_delay_ms = 0;
std::shared_ptr<SharedConnectionEngine> engine = auto engine = std::make_shared<SharedConnectionEngine>(
std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1); io_service, options, std::make_shared<std::string>("foo"), "", "protocol",
1);
EXPECT_CALL(*producer, Produce()) EXPECT_CALL(*producer, Produce())
.WillOnce(Return(std::make_pair(make_error_code(boost::asio::error::connection_reset), ""))) .WillOnce(Return(std::make_pair(make_error_code(boost::asio::error::connection_reset), "")))
.WillOnce(Return(std::make_pair(make_error_code(boost::asio::error::connection_reset), ""))) .WillOnce(Return(std::make_pair(make_error_code(boost::asio::error::connection_reset), "")))
@ -332,8 +347,9 @@ TEST(RpcEngineTest, TestConnectionFailureAndRecover)
Options options; Options options;
options.max_rpc_retries = 1; options.max_rpc_retries = 1;
options.rpc_retry_delay_ms = 0; options.rpc_retry_delay_ms = 0;
std::shared_ptr<SharedConnectionEngine> engine = auto engine = std::make_shared<SharedConnectionEngine>(
std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1); io_service, options, std::make_shared<std::string>("foo"), "", "protocol",
1);
EXPECT_CALL(*producer, Produce()) EXPECT_CALL(*producer, Produce())
.WillOnce(Return(std::make_pair(make_error_code(boost::asio::error::connection_reset), ""))) .WillOnce(Return(std::make_pair(make_error_code(boost::asio::error::connection_reset), "")))
.WillOnce(Return(std::make_pair(boost::system::error_code(), ""))) .WillOnce(Return(std::make_pair(boost::system::error_code(), "")))
@ -355,8 +371,9 @@ TEST(RpcEngineTest, TestEventCallbacks)
Options options; Options options;
options.max_rpc_retries = 99; options.max_rpc_retries = 99;
options.rpc_retry_delay_ms = 0; options.rpc_retry_delay_ms = 0;
std::shared_ptr<SharedConnectionEngine> engine = auto engine = std::make_shared<SharedConnectionEngine>(
std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1); io_service, options, std::make_shared<std::string>("foo"), "", "protocol",
1);
// Normally determined during RpcEngine::Connect, but in this case options // Normally determined during RpcEngine::Connect, but in this case options
// provides enough info to determine policy here. // provides enough info to determine policy here.
@ -441,8 +458,9 @@ TEST(RpcEngineTest, TestConnectionFailureAndAsyncRecover)
Options options; Options options;
options.max_rpc_retries = 1; options.max_rpc_retries = 1;
options.rpc_retry_delay_ms = 1; options.rpc_retry_delay_ms = 1;
std::shared_ptr<SharedConnectionEngine> engine = auto engine = std::make_shared<SharedConnectionEngine>(
std::make_shared<SharedConnectionEngine>(io_service, options, "foo", "", "protocol", 1); io_service, options, std::make_shared<std::string>("foo"), "", "protocol",
1);
EXPECT_CALL(*producer, Produce()) EXPECT_CALL(*producer, Produce())
.WillOnce(Return(std::make_pair(make_error_code(boost::asio::error::connection_reset), ""))) .WillOnce(Return(std::make_pair(make_error_code(boost::asio::error::connection_reset), "")))
.WillOnce(Return(std::make_pair(boost::system::error_code(), ""))) .WillOnce(Return(std::make_pair(boost::system::error_code(), "")))
@ -466,7 +484,9 @@ TEST(RpcEngineTest, TestTimeout) {
std::shared_ptr<IoService> io_service = IoService::MakeShared(); std::shared_ptr<IoService> io_service = IoService::MakeShared();
Options options; Options options;
options.rpc_timeout = 1; options.rpc_timeout = 1;
std::shared_ptr<RpcEngine> engine = std::make_shared<RpcEngine>(io_service, options, "foo", "", "protocol", 1); auto engine = std::make_shared<RpcEngine>(
io_service, options, std::make_shared<std::string>("foo"), "", "protocol",
1);
auto conn = auto conn =
std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine); std::make_shared<RpcConnectionImpl<MockRPCConnection> >(engine);
conn->TEST_set_connected(true); conn->TEST_set_connected(true);