HDFS-11106: libhdfs++: Some refactoring to better organize files (part 2). Contributed by James Clampffer.
This commit is contained in:
parent
0f3f8db113
commit
3e53da2d62
|
@ -16,7 +16,7 @@
|
|||
# limitations under the License.
|
||||
#
|
||||
|
||||
list(APPEND rpc_object_items rpc_connection.cc rpc_engine.cc sasl_protocol.cc sasl_engine.cc)
|
||||
list(APPEND rpc_object_items rpc_connection_impl.cc rpc_engine.cc namenode_tracker.cc request.cc sasl_protocol.cc sasl_engine.cc)
|
||||
if (CMAKE_USING_CYRUS_SASL)
|
||||
list(APPEND rpc_object_items cyrus_sasl_engine.cc)
|
||||
endif (CMAKE_USING_CYRUS_SASL)
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "namenode_tracker.h"
|
||||
|
||||
#include "common/logging.h"
|
||||
#include "common/libhdfs_events_impl.h"
|
||||
#include "common/util.h"
|
||||
|
||||
namespace hdfs {
|
||||
|
||||
static std::string format_endpoints(const std::vector<::asio::ip::tcp::endpoint> &pts) {
|
||||
std::stringstream ss;
|
||||
for(unsigned int i=0; i<pts.size(); i++)
|
||||
if(i == pts.size() - 1)
|
||||
ss << pts[i];
|
||||
else
|
||||
ss << pts[i] << ", ";
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers,
|
||||
::asio::io_service *ioservice,
|
||||
std::shared_ptr<LibhdfsEvents> event_handlers)
|
||||
: enabled_(false), resolved_(false),
|
||||
ioservice_(ioservice), event_handlers_(event_handlers)
|
||||
{
|
||||
LOG_TRACE(kRPC, << "HANamenodeTracker got the following nodes");
|
||||
for(unsigned int i=0;i<servers.size();i++)
|
||||
LOG_TRACE(kRPC, << servers[i].str());
|
||||
|
||||
if(servers.size() >= 2) {
|
||||
LOG_TRACE(kRPC, << "Creating HA namenode tracker");
|
||||
if(servers.size() > 2) {
|
||||
LOG_WARN(kRPC, << "Nameservice declares more than two nodes. Some won't be used.");
|
||||
}
|
||||
|
||||
active_info_ = servers[0];
|
||||
standby_info_ = servers[1];
|
||||
LOG_INFO(kRPC, << "Active namenode url = " << active_info_.uri.str());
|
||||
LOG_INFO(kRPC, << "Standby namenode url = " << standby_info_.uri.str());
|
||||
|
||||
enabled_ = true;
|
||||
if(!active_info_.endpoints.empty() || !standby_info_.endpoints.empty()) {
|
||||
resolved_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
HANamenodeTracker::~HANamenodeTracker() {}
|
||||
|
||||
// Pass in endpoint from current connection, this will do a reverse lookup
|
||||
// and return the info for the standby node. It will also swap its state internally.
|
||||
ResolvedNamenodeInfo HANamenodeTracker::GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint) {
|
||||
LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoint);
|
||||
mutex_guard swap_lock(swap_lock_);
|
||||
|
||||
ResolvedNamenodeInfo failover_node;
|
||||
|
||||
// Connected to standby, switch standby to active
|
||||
if(IsCurrentActive_locked(current_endpoint)) {
|
||||
std::swap(active_info_, standby_info_);
|
||||
if(event_handlers_)
|
||||
event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(),
|
||||
reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
|
||||
failover_node = active_info_;
|
||||
} else if(IsCurrentStandby_locked(current_endpoint)) {
|
||||
// Connected to standby
|
||||
if(event_handlers_)
|
||||
event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(),
|
||||
reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
|
||||
failover_node = active_info_;
|
||||
} else {
|
||||
// Invalid state, throw for testing
|
||||
std::string ep1 = format_endpoints(active_info_.endpoints);
|
||||
std::string ep2 = format_endpoints(standby_info_.endpoints);
|
||||
|
||||
std::stringstream msg;
|
||||
msg << "Looked for " << current_endpoint << " in\n";
|
||||
msg << ep1 << " and\n";
|
||||
msg << ep2 << std::endl;
|
||||
|
||||
LOG_ERROR(kRPC, << "Unable to find RPC connection in config " << msg.str() << ". Bailing out.");
|
||||
throw std::runtime_error(msg.str());
|
||||
}
|
||||
|
||||
if(failover_node.endpoints.empty()) {
|
||||
LOG_WARN(kRPC, << "No endpoints for node " << failover_node.uri.str() << " attempting to resolve again");
|
||||
if(!ResolveInPlace(ioservice_, failover_node)) {
|
||||
LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << failover_node.uri.str()
|
||||
<< "failed. Please make sure your configuration is up to date.");
|
||||
}
|
||||
}
|
||||
return failover_node;
|
||||
}
|
||||
|
||||
bool HANamenodeTracker::IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const {
|
||||
for(unsigned int i=0;i<active_info_.endpoints.size();i++) {
|
||||
if(ep.address() == active_info_.endpoints[i].address()) {
|
||||
if(ep.port() != active_info_.endpoints[i].port())
|
||||
LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << active_info_.endpoints[i] << " trying anyway..");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool HANamenodeTracker::IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const {
|
||||
for(unsigned int i=0;i<standby_info_.endpoints.size();i++) {
|
||||
if(ep.address() == standby_info_.endpoints[i].address()) {
|
||||
if(ep.port() != standby_info_.endpoints[i].port())
|
||||
LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << standby_info_.endpoints[i] << " trying anyway..");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
} // end namespace hdfs
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef LIB_RPC_NAMENODE_TRACKER_H
|
||||
#define LIB_RPC_NAMENODE_TRACKER_H
|
||||
|
||||
#include "common/libhdfs_events_impl.h"
|
||||
#include "common/namenode_info.h"
|
||||
|
||||
#include <asio/ip/tcp.hpp>
|
||||
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <vector>
|
||||
|
||||
namespace hdfs {
|
||||
|
||||
/*
|
||||
* Tracker gives the RpcEngine a quick way to use an endpoint that just
|
||||
* failed in order to lookup a set of endpoints for a failover node.
|
||||
*
|
||||
* Note: For now this only deals with 2 NameNodes, but that's the default
|
||||
* anyway.
|
||||
*/
|
||||
class HANamenodeTracker {
|
||||
public:
|
||||
HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers,
|
||||
::asio::io_service *ioservice,
|
||||
std::shared_ptr<LibhdfsEvents> event_handlers_);
|
||||
|
||||
virtual ~HANamenodeTracker();
|
||||
|
||||
bool is_enabled() const { return enabled_; }
|
||||
bool is_resolved() const { return resolved_; }
|
||||
|
||||
// Get node opposite of the current one if possible (swaps active/standby)
|
||||
// Note: This will always mutate internal state. Use IsCurrentActive/Standby to
|
||||
// get info without changing state
|
||||
ResolvedNamenodeInfo GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint);
|
||||
|
||||
bool IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const;
|
||||
bool IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const;
|
||||
|
||||
private:
|
||||
// If HA should be enabled, according to our options and runtime info like # nodes provided
|
||||
bool enabled_;
|
||||
// If we were able to resolve at least 1 HA namenode
|
||||
bool resolved_;
|
||||
|
||||
// Keep service in case a second round of DNS lookup is required
|
||||
::asio::io_service *ioservice_;
|
||||
|
||||
// Event handlers, for now this is the simplest place to catch all failover events
|
||||
// and push info out to client application. Possibly move into RPCEngine.
|
||||
std::shared_ptr<LibhdfsEvents> event_handlers_;
|
||||
|
||||
// Only support 1 active and 1 standby for now.
|
||||
ResolvedNamenodeInfo active_info_;
|
||||
ResolvedNamenodeInfo standby_info_;
|
||||
|
||||
// Aquire when switching from active-standby
|
||||
std::mutex swap_lock_;
|
||||
};
|
||||
|
||||
} // end namespace hdfs
|
||||
#endif // end include guard
|
|
@ -0,0 +1,190 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
#include "request.h"
|
||||
#include "rpc_engine.h"
|
||||
#include "sasl_protocol.h"
|
||||
|
||||
#include "RpcHeader.pb.h"
|
||||
#include "ProtobufRpcEngine.pb.h"
|
||||
#include "IpcConnectionContext.pb.h"
|
||||
|
||||
#include <sstream>
|
||||
|
||||
namespace hdfs {
|
||||
|
||||
namespace pb = ::google::protobuf;
|
||||
namespace pbio = ::google::protobuf::io;
|
||||
|
||||
using namespace ::hadoop::common;
|
||||
using namespace ::std::placeholders;
|
||||
|
||||
static const int kNoRetry = -1;
|
||||
|
||||
// Protobuf helper functions.
|
||||
static void AddHeadersToPacket(std::string *res,
|
||||
std::initializer_list<const pb::MessageLite *> headers,
|
||||
const std::string *payload) {
|
||||
int len = 0;
|
||||
std::for_each(
|
||||
headers.begin(), headers.end(),
|
||||
[&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); });
|
||||
|
||||
if (payload) {
|
||||
len += payload->size();
|
||||
}
|
||||
|
||||
int net_len = htonl(len);
|
||||
res->reserve(res->size() + sizeof(net_len) + len);
|
||||
|
||||
pbio::StringOutputStream ss(res);
|
||||
pbio::CodedOutputStream os(&ss);
|
||||
os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len));
|
||||
|
||||
uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
|
||||
assert(buf);
|
||||
|
||||
std::for_each(
|
||||
headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) {
|
||||
buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), buf);
|
||||
buf = v->SerializeWithCachedSizesToArray(buf);
|
||||
});
|
||||
|
||||
if (payload) {
|
||||
buf = os.WriteStringToArray(*payload, buf);
|
||||
}
|
||||
}
|
||||
|
||||
static void ConstructPayload(std::string *res, const pb::MessageLite *header) {
|
||||
int len = DelimitedPBMessageSize(header);
|
||||
res->reserve(len);
|
||||
pbio::StringOutputStream ss(res);
|
||||
pbio::CodedOutputStream os(&ss);
|
||||
uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
|
||||
assert(buf);
|
||||
buf = pbio::CodedOutputStream::WriteVarint32ToArray(header->ByteSize(), buf);
|
||||
buf = header->SerializeWithCachedSizesToArray(buf);
|
||||
}
|
||||
|
||||
static void ConstructPayload(std::string *res, const std::string *request) {
|
||||
int len =
|
||||
pbio::CodedOutputStream::VarintSize32(request->size()) + request->size();
|
||||
res->reserve(len);
|
||||
pbio::StringOutputStream ss(res);
|
||||
pbio::CodedOutputStream os(&ss);
|
||||
uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
|
||||
assert(buf);
|
||||
buf = pbio::CodedOutputStream::WriteVarint32ToArray(request->size(), buf);
|
||||
buf = os.WriteStringToArray(*request, buf);
|
||||
}
|
||||
|
||||
static void SetRequestHeader(LockFreeRpcEngine *engine, int call_id,
|
||||
const std::string &method_name, int retry_count,
|
||||
RpcRequestHeaderProto *rpc_header,
|
||||
RequestHeaderProto *req_header) {
|
||||
rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER);
|
||||
rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
|
||||
rpc_header->set_callid(call_id);
|
||||
if (retry_count != kNoRetry)
|
||||
rpc_header->set_retrycount(retry_count);
|
||||
rpc_header->set_clientid(engine->client_id());
|
||||
|
||||
req_header->set_methodname(method_name);
|
||||
req_header->set_declaringclassprotocolname(engine->protocol_name());
|
||||
req_header->set_clientprotocolversion(engine->protocol_version());
|
||||
}
|
||||
|
||||
// Request implementation
|
||||
|
||||
Request::Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id,
|
||||
const std::string &request, Handler &&handler)
|
||||
: engine_(engine),
|
||||
method_name_(method_name),
|
||||
call_id_(call_id),
|
||||
timer_(engine->io_service()),
|
||||
handler_(std::move(handler)),
|
||||
retry_count_(engine->retry_policy() ? 0 : kNoRetry),
|
||||
failover_count_(0) {
|
||||
ConstructPayload(&payload_, &request);
|
||||
}
|
||||
|
||||
|
||||
Request::Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id,
|
||||
const pb::MessageLite *request, Handler &&handler)
|
||||
: engine_(engine),
|
||||
method_name_(method_name),
|
||||
call_id_(call_id),
|
||||
timer_(engine->io_service()),
|
||||
handler_(std::move(handler)),
|
||||
retry_count_(engine->retry_policy() ? 0 : kNoRetry),
|
||||
failover_count_(0) {
|
||||
ConstructPayload(&payload_, request);
|
||||
}
|
||||
|
||||
Request::Request(LockFreeRpcEngine *engine, Handler &&handler)
|
||||
: engine_(engine),
|
||||
call_id_(-1),
|
||||
timer_(engine->io_service()),
|
||||
handler_(std::move(handler)),
|
||||
retry_count_(engine->retry_policy() ? 0 : kNoRetry),
|
||||
failover_count_(0) {
|
||||
}
|
||||
|
||||
void Request::GetPacket(std::string *res) const {
|
||||
LOG_TRACE(kRPC, << "Request::GetPacket called");
|
||||
|
||||
if (payload_.empty())
|
||||
return;
|
||||
|
||||
RpcRequestHeaderProto rpc_header;
|
||||
RequestHeaderProto req_header;
|
||||
SetRequestHeader(engine_, call_id_, method_name_, retry_count_, &rpc_header,
|
||||
&req_header);
|
||||
|
||||
// SASL messages don't have a request header
|
||||
if (method_name_ != SASL_METHOD_NAME)
|
||||
AddHeadersToPacket(res, {&rpc_header, &req_header}, &payload_);
|
||||
else
|
||||
AddHeadersToPacket(res, {&rpc_header}, &payload_);
|
||||
}
|
||||
|
||||
void Request::OnResponseArrived(pbio::CodedInputStream *is,
|
||||
const Status &status) {
|
||||
LOG_TRACE(kRPC, << "Request::OnResponseArrived called");
|
||||
handler_(is, status);
|
||||
}
|
||||
|
||||
std::string Request::GetDebugString() const {
|
||||
// Basic description of this object, aimed at debugging
|
||||
std::stringstream ss;
|
||||
ss << "\nRequest Object:\n";
|
||||
ss << "\tMethod name = \"" << method_name_ << "\"\n";
|
||||
ss << "\tCall id = " << call_id_ << "\n";
|
||||
ss << "\tRetry Count = " << retry_count_ << "\n";
|
||||
ss << "\tFailover count = " << failover_count_ << "\n";
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
int Request::IncrementFailoverCount() {
|
||||
// reset retry count when failing over
|
||||
retry_count_ = 0;
|
||||
return failover_count_++;
|
||||
}
|
||||
|
||||
} // end namespace hdfs
|
|
@ -0,0 +1,87 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#ifndef LIB_RPC_RPC_REQUEST_H
|
||||
#define LIB_RPC_RPC_REQUEST_H
|
||||
|
||||
#include "hdfspp/status.h"
|
||||
#include "common/util.h"
|
||||
#include "common/new_delete.h"
|
||||
|
||||
#include <string>
|
||||
|
||||
#include <google/protobuf/message_lite.h>
|
||||
#include <google/protobuf/io/coded_stream.h>
|
||||
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
|
||||
|
||||
#include <asio/deadline_timer.hpp>
|
||||
|
||||
|
||||
namespace hdfs {
|
||||
|
||||
class LockFreeRpcEngine;
|
||||
class SaslProtocol;
|
||||
|
||||
/*
|
||||
* Internal bookkeeping for an outstanding request from the consumer.
|
||||
*
|
||||
* Threading model: not thread-safe; should only be accessed from a single
|
||||
* thread at a time
|
||||
*/
|
||||
class Request {
|
||||
public:
|
||||
MEMCHECKED_CLASS(Request)
|
||||
typedef std::function<void(::google::protobuf::io::CodedInputStream *is,
|
||||
const Status &status)> Handler;
|
||||
|
||||
Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id,
|
||||
const std::string &request, Handler &&callback);
|
||||
Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id,
|
||||
const ::google::protobuf::MessageLite *request, Handler &&callback);
|
||||
|
||||
// Null request (with no actual message) used to track the state of an
|
||||
// initial Connect call
|
||||
Request(LockFreeRpcEngine *engine, Handler &&handler);
|
||||
|
||||
int call_id() const { return call_id_; }
|
||||
std::string method_name() const { return method_name_; }
|
||||
::asio::deadline_timer &timer() { return timer_; }
|
||||
int IncrementRetryCount() { return retry_count_++; }
|
||||
int IncrementFailoverCount();
|
||||
void GetPacket(std::string *res) const;
|
||||
void OnResponseArrived(::google::protobuf::io::CodedInputStream *is,
|
||||
const Status &status);
|
||||
|
||||
int get_failover_count() {return failover_count_;}
|
||||
|
||||
std::string GetDebugString() const;
|
||||
|
||||
private:
|
||||
LockFreeRpcEngine *const engine_;
|
||||
const std::string method_name_;
|
||||
const int call_id_;
|
||||
|
||||
::asio::deadline_timer timer_;
|
||||
std::string payload_;
|
||||
const Handler handler_;
|
||||
|
||||
int retry_count_;
|
||||
int failover_count_;
|
||||
};
|
||||
|
||||
} // end namespace hdfs
|
||||
#endif // end include guard
|
|
@ -15,430 +15,166 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#ifndef LIB_RPC_RPC_CONNECTION_H_
|
||||
#define LIB_RPC_RPC_CONNECTION_H_
|
||||
#ifndef LIB_RPC_RPC_CONNECTION_H
|
||||
#define LIB_RPC_RPC_CONNECTION_H
|
||||
|
||||
#include "rpc_engine.h"
|
||||
/*
|
||||
* Encapsulates a persistent connection to the NameNode, and the sending of
|
||||
* RPC requests and evaluating their responses.
|
||||
*
|
||||
* Can have multiple RPC requests in-flight simultaneously, but they are
|
||||
* evaluated in-order on the server side in a blocking manner.
|
||||
*
|
||||
* Threading model: public interface is thread-safe
|
||||
* All handlers passed in to method calls will be called from an asio thread,
|
||||
* and will not be holding any internal RpcConnection locks.
|
||||
*/
|
||||
|
||||
#include "request.h"
|
||||
#include "common/auth_info.h"
|
||||
#include "common/logging.h"
|
||||
#include "common/util.h"
|
||||
#include "common/libhdfs_events_impl.h"
|
||||
#include "sasl_protocol.h"
|
||||
#include "common/new_delete.h"
|
||||
#include "hdfspp/status.h"
|
||||
|
||||
#include <asio/connect.hpp>
|
||||
#include <asio/read.hpp>
|
||||
#include <asio/write.hpp>
|
||||
|
||||
#include <system_error>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
#include <deque>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace hdfs {
|
||||
|
||||
template <class Socket>
|
||||
class RpcConnectionImpl : public RpcConnection {
|
||||
public:
|
||||
MEMCHECKED_CLASS(RpcConnectionImpl);
|
||||
typedef const std::function<void(const Status &)> RpcCallback;
|
||||
|
||||
RpcConnectionImpl(RpcEngine *engine);
|
||||
virtual ~RpcConnectionImpl() override;
|
||||
class LockFreeRpcEngine;
|
||||
class SaslProtocol;
|
||||
|
||||
class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
|
||||
public:
|
||||
MEMCHECKED_CLASS(RpcConnection)
|
||||
RpcConnection(LockFreeRpcEngine *engine);
|
||||
virtual ~RpcConnection();
|
||||
|
||||
// Note that a single server can have multiple endpoints - especially both
|
||||
// an ipv4 and ipv6 endpoint
|
||||
virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
|
||||
const AuthInfo & auth_info,
|
||||
RpcCallback &handler);
|
||||
virtual void ConnectAndFlush(
|
||||
const std::vector<::asio::ip::tcp::endpoint> &server) override;
|
||||
virtual void SendHandshake(RpcCallback &handler) override;
|
||||
virtual void SendContext(RpcCallback &handler) override;
|
||||
virtual void Disconnect() override;
|
||||
RpcCallback &handler) = 0;
|
||||
virtual void ConnectAndFlush(const std::vector<::asio::ip::tcp::endpoint> &server) = 0;
|
||||
virtual void Disconnect() = 0;
|
||||
|
||||
void StartReading();
|
||||
void AsyncRpc(const std::string &method_name,
|
||||
const ::google::protobuf::MessageLite *req,
|
||||
std::shared_ptr<::google::protobuf::MessageLite> resp,
|
||||
const RpcCallback &handler);
|
||||
|
||||
void AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests);
|
||||
|
||||
// Enqueue requests before the connection is connected. Will be flushed
|
||||
// on connect
|
||||
void PreEnqueueRequests(std::vector<std::shared_ptr<Request>> requests);
|
||||
|
||||
// Put requests at the front of the current request queue
|
||||
void PrependRequests_locked(std::vector<std::shared_ptr<Request>> requests);
|
||||
|
||||
void SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers);
|
||||
void SetClusterName(std::string cluster_name);
|
||||
|
||||
LockFreeRpcEngine *engine() { return engine_; }
|
||||
::asio::io_service &io_service();
|
||||
|
||||
protected:
|
||||
struct Response {
|
||||
enum ResponseState {
|
||||
kReadLength,
|
||||
kReadContent,
|
||||
kParseResponse,
|
||||
} state_;
|
||||
unsigned length_;
|
||||
std::vector<char> data_;
|
||||
|
||||
std::unique_ptr<::google::protobuf::io::ArrayInputStream> ar;
|
||||
std::unique_ptr<::google::protobuf::io::CodedInputStream> in;
|
||||
|
||||
Response() : state_(kReadLength), length_(0) {}
|
||||
};
|
||||
|
||||
|
||||
// Initial handshaking protocol: connect->handshake-->(auth)?-->context->connected
|
||||
virtual void SendHandshake(RpcCallback &handler) = 0;
|
||||
void HandshakeComplete(const Status &s);
|
||||
void AuthComplete(const Status &s, const AuthInfo & new_auth_info);
|
||||
void AuthComplete_locked(const Status &s, const AuthInfo & new_auth_info);
|
||||
virtual void SendContext(RpcCallback &handler) = 0;
|
||||
void ContextComplete(const Status &s);
|
||||
|
||||
virtual void OnSendCompleted(const ::asio::error_code &ec,
|
||||
size_t transferred) override;
|
||||
size_t transferred) = 0;
|
||||
virtual void OnRecvCompleted(const ::asio::error_code &ec,
|
||||
size_t transferred) override;
|
||||
virtual void FlushPendingRequests() override;
|
||||
size_t transferred) = 0;
|
||||
virtual void FlushPendingRequests()=0; // Synchronously write the next request
|
||||
|
||||
void AsyncRpc_locked(
|
||||
const std::string &method_name,
|
||||
const ::google::protobuf::MessageLite *req,
|
||||
std::shared_ptr<::google::protobuf::MessageLite> resp,
|
||||
const RpcCallback &handler);
|
||||
void SendRpcRequests(const std::vector<std::shared_ptr<Request> > & requests);
|
||||
void AsyncFlushPendingRequests(); // Queue requests to be flushed at a later time
|
||||
|
||||
|
||||
Socket &TEST_get_mutable_socket() { return socket_; }
|
||||
|
||||
void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; }
|
||||
std::shared_ptr<std::string> PrepareHandshakePacket();
|
||||
std::shared_ptr<std::string> PrepareContextPacket();
|
||||
static std::string SerializeRpcRequest(const std::string &method_name,
|
||||
const ::google::protobuf::MessageLite *req);
|
||||
|
||||
private:
|
||||
const Options options_;
|
||||
::asio::ip::tcp::endpoint current_endpoint_;
|
||||
std::vector<::asio::ip::tcp::endpoint> additional_endpoints_;
|
||||
Socket socket_;
|
||||
::asio::deadline_timer connect_timer_;
|
||||
Status HandleRpcResponse(std::shared_ptr<Response> response);
|
||||
void HandleRpcTimeout(std::shared_ptr<Request> req,
|
||||
const ::asio::error_code &ec);
|
||||
void CommsError(const Status &status);
|
||||
|
||||
void ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint &remote);
|
||||
void ClearAndDisconnect(const ::asio::error_code &ec);
|
||||
std::shared_ptr<Request> RemoveFromRunningQueue(int call_id);
|
||||
|
||||
LockFreeRpcEngine *const engine_;
|
||||
std::shared_ptr<Response> current_response_state_;
|
||||
AuthInfo auth_info_;
|
||||
|
||||
// Connection can have deferred connection, especially when we're pausing
|
||||
// during retry
|
||||
enum ConnectedState {
|
||||
kNotYetConnected,
|
||||
kConnecting,
|
||||
kHandshaking,
|
||||
kAuthenticating,
|
||||
kConnected,
|
||||
kDisconnected
|
||||
};
|
||||
static std::string ToString(ConnectedState connected);
|
||||
ConnectedState connected_;
|
||||
|
||||
// State machine for performing a SASL handshake
|
||||
std::shared_ptr<SaslProtocol> sasl_protocol_;
|
||||
// The request being sent over the wire; will also be in requests_on_fly_
|
||||
std::shared_ptr<Request> request_over_the_wire_;
|
||||
// Requests to be sent over the wire
|
||||
std::deque<std::shared_ptr<Request>> pending_requests_;
|
||||
// Requests to be sent over the wire during authentication; not retried if
|
||||
// there is a connection error
|
||||
std::deque<std::shared_ptr<Request>> auth_requests_;
|
||||
// Requests that are waiting for responses
|
||||
typedef std::unordered_map<int, std::shared_ptr<Request>> RequestOnFlyMap;
|
||||
RequestOnFlyMap requests_on_fly_;
|
||||
std::shared_ptr<LibhdfsEvents> event_handlers_;
|
||||
std::string cluster_name_;
|
||||
|
||||
// Lock for mutable parts of this class that need to be thread safe
|
||||
std::mutex connection_state_lock_;
|
||||
|
||||
friend class SaslProtocol;
|
||||
};
|
||||
|
||||
template <class Socket>
|
||||
RpcConnectionImpl<Socket>::RpcConnectionImpl(RpcEngine *engine)
|
||||
: RpcConnection(engine),
|
||||
options_(engine->options()),
|
||||
socket_(engine->io_service()),
|
||||
connect_timer_(engine->io_service())
|
||||
{
|
||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this);
|
||||
}
|
||||
|
||||
template <class Socket>
|
||||
RpcConnectionImpl<Socket>::~RpcConnectionImpl() {
|
||||
LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this);
|
||||
|
||||
if (pending_requests_.size() > 0)
|
||||
LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue");
|
||||
if (requests_on_fly_.size() > 0)
|
||||
LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue");
|
||||
}
|
||||
|
||||
template <class Socket>
|
||||
void RpcConnectionImpl<Socket>::Connect(
|
||||
const std::vector<::asio::ip::tcp::endpoint> &server,
|
||||
const AuthInfo & auth_info,
|
||||
RpcCallback &handler) {
|
||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::Connect called");
|
||||
|
||||
this->auth_info_ = auth_info;
|
||||
|
||||
auto connectionSuccessfulReq = std::make_shared<Request>(
|
||||
engine_, [handler](::google::protobuf::io::CodedInputStream *is,
|
||||
const Status &status) {
|
||||
(void)is;
|
||||
handler(status);
|
||||
});
|
||||
pending_requests_.push_back(connectionSuccessfulReq);
|
||||
this->ConnectAndFlush(server); // need "this" so compiler can infer type of CAF
|
||||
}
|
||||
|
||||
template <class Socket>
|
||||
void RpcConnectionImpl<Socket>::ConnectAndFlush(
|
||||
const std::vector<::asio::ip::tcp::endpoint> &server) {
|
||||
|
||||
LOG_INFO(kRPC, << "ConnectAndFlush called");
|
||||
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
|
||||
|
||||
if (server.empty()) {
|
||||
Status s = Status::InvalidArgument("No endpoints provided");
|
||||
CommsError(s);
|
||||
return;
|
||||
}
|
||||
|
||||
if (connected_ == kConnected) {
|
||||
FlushPendingRequests();
|
||||
return;
|
||||
}
|
||||
if (connected_ != kNotYetConnected) {
|
||||
LOG_WARN(kRPC, << "RpcConnectionImpl::ConnectAndFlush called while connected=" << ToString(connected_));
|
||||
return;
|
||||
}
|
||||
connected_ = kConnecting;
|
||||
|
||||
// Take the first endpoint, but remember the alternatives for later
|
||||
additional_endpoints_ = server;
|
||||
::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front();
|
||||
additional_endpoints_.erase(additional_endpoints_.begin());
|
||||
current_endpoint_ = first_endpoint;
|
||||
|
||||
auto shared_this = shared_from_this();
|
||||
socket_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code &ec) {
|
||||
ConnectComplete(ec, first_endpoint);
|
||||
});
|
||||
|
||||
// Prompt the timer to timeout
|
||||
auto weak_this = std::weak_ptr<RpcConnection>(shared_this);
|
||||
connect_timer_.expires_from_now(
|
||||
std::chrono::milliseconds(options_.rpc_connect_timeout));
|
||||
connect_timer_.async_wait([shared_this, this, first_endpoint](const ::asio::error_code &ec) {
|
||||
if (ec)
|
||||
ConnectComplete(ec, first_endpoint);
|
||||
else
|
||||
ConnectComplete(make_error_code(asio::error::host_unreachable), first_endpoint);
|
||||
});
|
||||
}
|
||||
|
||||
template <class Socket>
|
||||
void RpcConnectionImpl<Socket>::ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint & remote) {
|
||||
auto shared_this = RpcConnectionImpl<Socket>::shared_from_this();
|
||||
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
|
||||
connect_timer_.cancel();
|
||||
|
||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called");
|
||||
|
||||
// Could be an old async connect returning a result after we've moved on
|
||||
if (remote != current_endpoint_) {
|
||||
LOG_DEBUG(kRPC, << "Got ConnectComplete for " << remote << " but current_endpoint_ is " << current_endpoint_);
|
||||
return;
|
||||
}
|
||||
if (connected_ != kConnecting) {
|
||||
LOG_DEBUG(kRPC, << "Got ConnectComplete but current state is " << connected_);;
|
||||
return;
|
||||
}
|
||||
|
||||
Status status = ToStatus(ec);
|
||||
if(event_handlers_) {
|
||||
event_response event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0);
|
||||
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
|
||||
if (event_resp.response() == event_response::kTest_Error) {
|
||||
status = event_resp.status();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
if (status.ok()) {
|
||||
StartReading();
|
||||
SendHandshake([shared_this, this](const Status & s) {
|
||||
HandshakeComplete(s);
|
||||
});
|
||||
} else {
|
||||
LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());;
|
||||
std::string err = SafeDisconnect(get_asio_socket_ptr(&socket_));
|
||||
if(!err.empty()) {
|
||||
LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error closing connection: " << err);
|
||||
}
|
||||
|
||||
if (!additional_endpoints_.empty()) {
|
||||
// If we have additional endpoints, keep trying until we either run out or
|
||||
// hit one
|
||||
::asio::ip::tcp::endpoint next_endpoint = additional_endpoints_.front();
|
||||
additional_endpoints_.erase(additional_endpoints_.begin());
|
||||
current_endpoint_ = next_endpoint;
|
||||
|
||||
socket_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code &ec) {
|
||||
ConnectComplete(ec, next_endpoint);
|
||||
});
|
||||
connect_timer_.expires_from_now(
|
||||
std::chrono::milliseconds(options_.rpc_connect_timeout));
|
||||
connect_timer_.async_wait([shared_this, this, next_endpoint](const ::asio::error_code &ec) {
|
||||
if (ec)
|
||||
ConnectComplete(ec, next_endpoint);
|
||||
else
|
||||
ConnectComplete(make_error_code(asio::error::host_unreachable), next_endpoint);
|
||||
});
|
||||
} else {
|
||||
CommsError(status);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <class Socket>
|
||||
void RpcConnectionImpl<Socket>::SendHandshake(RpcCallback &handler) {
|
||||
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling
|
||||
|
||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called");
|
||||
connected_ = kHandshaking;
|
||||
|
||||
auto shared_this = shared_from_this();
|
||||
auto handshake_packet = PrepareHandshakePacket();
|
||||
::asio::async_write(socket_, asio::buffer(*handshake_packet),
|
||||
[handshake_packet, handler, shared_this, this](
|
||||
const ::asio::error_code &ec, size_t) {
|
||||
Status status = ToStatus(ec);
|
||||
handler(status);
|
||||
});
|
||||
}
|
||||
|
||||
template <class Socket>
|
||||
void RpcConnectionImpl<Socket>::SendContext(RpcCallback &handler) {
|
||||
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling
|
||||
|
||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::SendContext called");
|
||||
|
||||
auto shared_this = shared_from_this();
|
||||
auto context_packet = PrepareContextPacket();
|
||||
::asio::async_write(socket_, asio::buffer(*context_packet),
|
||||
[context_packet, handler, shared_this, this](
|
||||
const ::asio::error_code &ec, size_t) {
|
||||
Status status = ToStatus(ec);
|
||||
handler(status);
|
||||
});
|
||||
}
|
||||
|
||||
template <class Socket>
|
||||
void RpcConnectionImpl<Socket>::OnSendCompleted(const ::asio::error_code &ec,
|
||||
size_t) {
|
||||
using std::placeholders::_1;
|
||||
using std::placeholders::_2;
|
||||
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
|
||||
|
||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::OnSendCompleted called");
|
||||
|
||||
request_over_the_wire_.reset();
|
||||
if (ec) {
|
||||
LOG_WARN(kRPC, << "Network error during RPC write: " << ec.message());
|
||||
CommsError(ToStatus(ec));
|
||||
return;
|
||||
}
|
||||
|
||||
FlushPendingRequests();
|
||||
}
|
||||
|
||||
template <class Socket>
|
||||
void RpcConnectionImpl<Socket>::FlushPendingRequests() {
|
||||
using namespace ::std::placeholders;
|
||||
|
||||
// Lock should be held
|
||||
assert(lock_held(connection_state_lock_));
|
||||
|
||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::FlushPendingRequests called");
|
||||
|
||||
// Don't send if we don't need to
|
||||
if (request_over_the_wire_) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::shared_ptr<Request> req;
|
||||
switch (connected_) {
|
||||
case kNotYetConnected:
|
||||
return;
|
||||
case kConnecting:
|
||||
return;
|
||||
case kHandshaking:
|
||||
return;
|
||||
case kAuthenticating:
|
||||
if (auth_requests_.empty()) {
|
||||
return;
|
||||
}
|
||||
req = auth_requests_.front();
|
||||
auth_requests_.erase(auth_requests_.begin());
|
||||
break;
|
||||
case kConnected:
|
||||
if (pending_requests_.empty()) {
|
||||
return;
|
||||
}
|
||||
req = pending_requests_.front();
|
||||
pending_requests_.erase(pending_requests_.begin());
|
||||
break;
|
||||
case kDisconnected:
|
||||
LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests attempted to flush a " << ToString(connected_) << " connection");
|
||||
return;
|
||||
default:
|
||||
LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests invalid state: " << ToString(connected_));
|
||||
return;
|
||||
}
|
||||
|
||||
std::shared_ptr<RpcConnection> shared_this = shared_from_this();
|
||||
auto weak_this = std::weak_ptr<RpcConnection>(shared_this);
|
||||
auto weak_req = std::weak_ptr<Request>(req);
|
||||
|
||||
std::shared_ptr<std::string> payload = std::make_shared<std::string>();
|
||||
req->GetPacket(payload.get());
|
||||
if (!payload->empty()) {
|
||||
assert(requests_on_fly_.find(req->call_id()) == requests_on_fly_.end());
|
||||
requests_on_fly_[req->call_id()] = req;
|
||||
request_over_the_wire_ = req;
|
||||
|
||||
req->timer().expires_from_now(
|
||||
std::chrono::milliseconds(options_.rpc_timeout));
|
||||
req->timer().async_wait([weak_this, weak_req, this](const ::asio::error_code &ec) {
|
||||
auto timeout_this = weak_this.lock();
|
||||
auto timeout_req = weak_req.lock();
|
||||
if (timeout_this && timeout_req)
|
||||
this->HandleRpcTimeout(timeout_req, ec);
|
||||
});
|
||||
|
||||
asio::async_write(socket_, asio::buffer(*payload),
|
||||
[shared_this, this, payload](const ::asio::error_code &ec,
|
||||
size_t size) {
|
||||
OnSendCompleted(ec, size);
|
||||
});
|
||||
} else { // Nothing to send for this request, inform the handler immediately
|
||||
io_service().post(
|
||||
// Never hold locks when calling a callback
|
||||
[req]() { req->OnResponseArrived(nullptr, Status::OK()); }
|
||||
);
|
||||
|
||||
// Reschedule to flush the next one
|
||||
AsyncFlushPendingRequests();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template <class Socket>
|
||||
void RpcConnectionImpl<Socket>::OnRecvCompleted(const ::asio::error_code &original_ec,
|
||||
size_t) {
|
||||
using std::placeholders::_1;
|
||||
using std::placeholders::_2;
|
||||
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
|
||||
|
||||
::asio::error_code my_ec(original_ec);
|
||||
|
||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called");
|
||||
|
||||
std::shared_ptr<RpcConnection> shared_this = shared_from_this();
|
||||
|
||||
if(event_handlers_) {
|
||||
event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0);
|
||||
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
|
||||
if (event_resp.response() == event_response::kTest_Error) {
|
||||
my_ec = std::make_error_code(std::errc::network_down);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
switch (my_ec.value()) {
|
||||
case 0:
|
||||
// No errors
|
||||
break;
|
||||
case asio::error::operation_aborted:
|
||||
// The event loop has been shut down. Ignore the error.
|
||||
return;
|
||||
default:
|
||||
LOG_WARN(kRPC, << "Network error during RPC read: " << my_ec.message());
|
||||
CommsError(ToStatus(my_ec));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!current_response_state_) { /* start a new one */
|
||||
current_response_state_ = std::make_shared<Response>();
|
||||
}
|
||||
|
||||
if (current_response_state_->state_ == Response::kReadLength) {
|
||||
current_response_state_->state_ = Response::kReadContent;
|
||||
auto buf = ::asio::buffer(reinterpret_cast<char *>(¤t_response_state_->length_),
|
||||
sizeof(current_response_state_->length_));
|
||||
asio::async_read(
|
||||
socket_, buf,
|
||||
[shared_this, this](const ::asio::error_code &ec, size_t size) {
|
||||
OnRecvCompleted(ec, size);
|
||||
});
|
||||
} else if (current_response_state_->state_ == Response::kReadContent) {
|
||||
current_response_state_->state_ = Response::kParseResponse;
|
||||
current_response_state_->length_ = ntohl(current_response_state_->length_);
|
||||
current_response_state_->data_.resize(current_response_state_->length_);
|
||||
asio::async_read(
|
||||
socket_, ::asio::buffer(current_response_state_->data_),
|
||||
[shared_this, this](const ::asio::error_code &ec, size_t size) {
|
||||
OnRecvCompleted(ec, size);
|
||||
});
|
||||
} else if (current_response_state_->state_ == Response::kParseResponse) {
|
||||
// Check return status from the RPC response. We may have received a msg
|
||||
// indicating a server side error.
|
||||
|
||||
Status stat = HandleRpcResponse(current_response_state_);
|
||||
|
||||
if(stat.get_server_exception_type() == Status::kStandbyException) {
|
||||
// May need to bail out, connect to new NN, and restart loop
|
||||
LOG_INFO(kRPC, << "Communicating with standby NN, attempting to reconnect");
|
||||
}
|
||||
|
||||
current_response_state_ = nullptr;
|
||||
StartReading();
|
||||
}
|
||||
}
|
||||
|
||||
template <class Socket>
|
||||
void RpcConnectionImpl<Socket>::Disconnect() {
|
||||
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling
|
||||
|
||||
LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called");
|
||||
|
||||
request_over_the_wire_.reset();
|
||||
if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == kAuthenticating || connected_ == kConnected) {
|
||||
// Don't print out errors, we were expecting a disconnect here
|
||||
SafeDisconnect(get_asio_socket_ptr(&socket_));
|
||||
}
|
||||
connected_ = kDisconnected;
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
} // end namespace hdfs
|
||||
#endif // end include Guard
|
||||
|
|
|
@ -16,17 +16,13 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
#include "rpc_engine.h"
|
||||
#include "rpc_connection_impl.h"
|
||||
#include "sasl_protocol.h"
|
||||
|
||||
#include "RpcHeader.pb.h"
|
||||
#include "ProtobufRpcEngine.pb.h"
|
||||
#include "IpcConnectionContext.pb.h"
|
||||
|
||||
#include "common/logging.h"
|
||||
#include "common/util.h"
|
||||
|
||||
#include <asio/read.hpp>
|
||||
|
||||
namespace hdfs {
|
||||
|
||||
namespace pb = ::google::protobuf;
|
||||
|
@ -70,121 +66,8 @@ static void AddHeadersToPacket(
|
|||
}
|
||||
}
|
||||
|
||||
static void ConstructPayload(std::string *res, const pb::MessageLite *header) {
|
||||
int len = DelimitedPBMessageSize(header);
|
||||
res->reserve(len);
|
||||
pbio::StringOutputStream ss(res);
|
||||
pbio::CodedOutputStream os(&ss);
|
||||
uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
|
||||
assert(buf);
|
||||
buf = pbio::CodedOutputStream::WriteVarint32ToArray(header->ByteSize(), buf);
|
||||
buf = header->SerializeWithCachedSizesToArray(buf);
|
||||
}
|
||||
|
||||
static void ConstructPayload(std::string *res, const std::string *request) {
|
||||
int len =
|
||||
pbio::CodedOutputStream::VarintSize32(request->size()) + request->size();
|
||||
res->reserve(len);
|
||||
pbio::StringOutputStream ss(res);
|
||||
pbio::CodedOutputStream os(&ss);
|
||||
uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
|
||||
assert(buf);
|
||||
buf = pbio::CodedOutputStream::WriteVarint32ToArray(request->size(), buf);
|
||||
buf = os.WriteStringToArray(*request, buf);
|
||||
}
|
||||
|
||||
static void SetRequestHeader(LockFreeRpcEngine *engine, int call_id,
|
||||
const std::string &method_name, int retry_count,
|
||||
RpcRequestHeaderProto *rpc_header,
|
||||
RequestHeaderProto *req_header) {
|
||||
rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER);
|
||||
rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
|
||||
rpc_header->set_callid(call_id);
|
||||
if (retry_count != kNoRetry)
|
||||
rpc_header->set_retrycount(retry_count);
|
||||
rpc_header->set_clientid(engine->client_id());
|
||||
|
||||
req_header->set_methodname(method_name);
|
||||
req_header->set_declaringclassprotocolname(engine->protocol_name());
|
||||
req_header->set_clientprotocolversion(engine->protocol_version());
|
||||
}
|
||||
|
||||
RpcConnection::~RpcConnection() {}
|
||||
|
||||
Request::Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id,
|
||||
const std::string &request, Handler &&handler)
|
||||
: engine_(engine),
|
||||
method_name_(method_name),
|
||||
call_id_(call_id),
|
||||
timer_(engine->io_service()),
|
||||
handler_(std::move(handler)),
|
||||
retry_count_(engine->retry_policy() ? 0 : kNoRetry),
|
||||
failover_count_(0) {
|
||||
ConstructPayload(&payload_, &request);
|
||||
}
|
||||
|
||||
Request::Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id,
|
||||
const pb::MessageLite *request, Handler &&handler)
|
||||
: engine_(engine),
|
||||
method_name_(method_name),
|
||||
call_id_(call_id),
|
||||
timer_(engine->io_service()),
|
||||
handler_(std::move(handler)),
|
||||
retry_count_(engine->retry_policy() ? 0 : kNoRetry),
|
||||
failover_count_(0) {
|
||||
ConstructPayload(&payload_, request);
|
||||
}
|
||||
|
||||
Request::Request(LockFreeRpcEngine *engine, Handler &&handler)
|
||||
: engine_(engine),
|
||||
call_id_(-1),
|
||||
timer_(engine->io_service()),
|
||||
handler_(std::move(handler)),
|
||||
retry_count_(engine->retry_policy() ? 0 : kNoRetry),
|
||||
failover_count_(0) {
|
||||
}
|
||||
|
||||
void Request::GetPacket(std::string *res) const {
|
||||
LOG_TRACE(kRPC, << "Request::GetPacket called");
|
||||
|
||||
if (payload_.empty())
|
||||
return;
|
||||
|
||||
RpcRequestHeaderProto rpc_header;
|
||||
RequestHeaderProto req_header;
|
||||
SetRequestHeader(engine_, call_id_, method_name_, retry_count_, &rpc_header,
|
||||
&req_header);
|
||||
|
||||
// SASL messages don't have a request header
|
||||
if (method_name_ != SASL_METHOD_NAME)
|
||||
AddHeadersToPacket(res, {&rpc_header, &req_header}, &payload_);
|
||||
else
|
||||
AddHeadersToPacket(res, {&rpc_header}, &payload_);
|
||||
}
|
||||
|
||||
void Request::OnResponseArrived(pbio::CodedInputStream *is,
|
||||
const Status &status) {
|
||||
LOG_TRACE(kRPC, << "Request::OnResponseArrived called");
|
||||
handler_(is, status);
|
||||
}
|
||||
|
||||
std::string Request::GetDebugString() const {
|
||||
// Basic description of this object, aimed at debugging
|
||||
std::stringstream ss;
|
||||
ss << "\nRequest Object:\n";
|
||||
ss << "\tMethod name = \"" << method_name_ << "\"\n";
|
||||
ss << "\tCall id = " << call_id_ << "\n";
|
||||
ss << "\tRetry Count = " << retry_count_ << "\n";
|
||||
ss << "\tFailover count = " << failover_count_ << "\n";
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
int Request::IncrementFailoverCount() {
|
||||
// reset retry count when failing over
|
||||
retry_count_ = 0;
|
||||
return failover_count_++;
|
||||
}
|
||||
|
||||
RpcConnection::RpcConnection(LockFreeRpcEngine *engine)
|
||||
: engine_(engine),
|
||||
connected_(kNotYetConnected) {}
|
||||
|
@ -551,13 +434,13 @@ std::shared_ptr<Request> RpcConnection::RemoveFromRunningQueue(int call_id) {
|
|||
std::string RpcConnection::ToString(ConnectedState connected) {
|
||||
switch(connected) {
|
||||
case kNotYetConnected: return "NotYetConnected";
|
||||
case kConnecting: return "Connecting";
|
||||
case kHandshaking: return "Handshaking";
|
||||
case kAuthenticating: return "Authenticating";
|
||||
case kConnected: return "Connected";
|
||||
case kDisconnected: return "Disconnected";
|
||||
default: return "Invalid ConnectedState";
|
||||
case kConnecting: return "Connecting";
|
||||
case kHandshaking: return "Handshaking";
|
||||
case kAuthenticating: return "Authenticating";
|
||||
case kConnected: return "Connected";
|
||||
case kDisconnected: return "Disconnected";
|
||||
default: return "Invalid ConnectedState";
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}// end namespace hdfs
|
|
@ -0,0 +1,445 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#ifndef LIB_RPC_RPC_CONNECTION_IMPL_H_
|
||||
#define LIB_RPC_RPC_CONNECTION_IMPL_H_
|
||||
|
||||
#include "rpc_connection.h"
|
||||
#include "rpc_engine.h"
|
||||
#include "request.h"
|
||||
|
||||
#include "common/auth_info.h"
|
||||
#include "common/logging.h"
|
||||
#include "common/util.h"
|
||||
#include "common/libhdfs_events_impl.h"
|
||||
|
||||
#include <asio/connect.hpp>
|
||||
#include <asio/read.hpp>
|
||||
#include <asio/write.hpp>
|
||||
|
||||
#include <system_error>
|
||||
|
||||
namespace hdfs {
|
||||
|
||||
template <class Socket>
|
||||
class RpcConnectionImpl : public RpcConnection {
|
||||
public:
|
||||
MEMCHECKED_CLASS(RpcConnectionImpl);
|
||||
|
||||
RpcConnectionImpl(RpcEngine *engine);
|
||||
virtual ~RpcConnectionImpl() override;
|
||||
|
||||
virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
|
||||
const AuthInfo & auth_info,
|
||||
RpcCallback &handler);
|
||||
virtual void ConnectAndFlush(
|
||||
const std::vector<::asio::ip::tcp::endpoint> &server) override;
|
||||
virtual void SendHandshake(RpcCallback &handler) override;
|
||||
virtual void SendContext(RpcCallback &handler) override;
|
||||
virtual void Disconnect() override;
|
||||
virtual void OnSendCompleted(const ::asio::error_code &ec,
|
||||
size_t transferred) override;
|
||||
virtual void OnRecvCompleted(const ::asio::error_code &ec,
|
||||
size_t transferred) override;
|
||||
virtual void FlushPendingRequests() override;
|
||||
|
||||
|
||||
Socket &TEST_get_mutable_socket() { return socket_; }
|
||||
|
||||
void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; }
|
||||
|
||||
private:
|
||||
const Options options_;
|
||||
::asio::ip::tcp::endpoint current_endpoint_;
|
||||
std::vector<::asio::ip::tcp::endpoint> additional_endpoints_;
|
||||
Socket socket_;
|
||||
::asio::deadline_timer connect_timer_;
|
||||
|
||||
void ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint &remote);
|
||||
};
|
||||
|
||||
template <class Socket>
|
||||
RpcConnectionImpl<Socket>::RpcConnectionImpl(RpcEngine *engine)
|
||||
: RpcConnection(engine),
|
||||
options_(engine->options()),
|
||||
socket_(engine->io_service()),
|
||||
connect_timer_(engine->io_service())
|
||||
{
|
||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this);
|
||||
}
|
||||
|
||||
template <class Socket>
|
||||
RpcConnectionImpl<Socket>::~RpcConnectionImpl() {
|
||||
LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this);
|
||||
|
||||
if (pending_requests_.size() > 0)
|
||||
LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue");
|
||||
if (requests_on_fly_.size() > 0)
|
||||
LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue");
|
||||
}
|
||||
|
||||
template <class Socket>
|
||||
void RpcConnectionImpl<Socket>::Connect(
|
||||
const std::vector<::asio::ip::tcp::endpoint> &server,
|
||||
const AuthInfo & auth_info,
|
||||
RpcCallback &handler) {
|
||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::Connect called");
|
||||
|
||||
this->auth_info_ = auth_info;
|
||||
|
||||
auto connectionSuccessfulReq = std::make_shared<Request>(
|
||||
engine_, [handler](::google::protobuf::io::CodedInputStream *is,
|
||||
const Status &status) {
|
||||
(void)is;
|
||||
handler(status);
|
||||
});
|
||||
pending_requests_.push_back(connectionSuccessfulReq);
|
||||
this->ConnectAndFlush(server); // need "this" so compiler can infer type of CAF
|
||||
}
|
||||
|
||||
template <class Socket>
|
||||
void RpcConnectionImpl<Socket>::ConnectAndFlush(
|
||||
const std::vector<::asio::ip::tcp::endpoint> &server) {
|
||||
|
||||
LOG_INFO(kRPC, << "ConnectAndFlush called");
|
||||
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
|
||||
|
||||
if (server.empty()) {
|
||||
Status s = Status::InvalidArgument("No endpoints provided");
|
||||
CommsError(s);
|
||||
return;
|
||||
}
|
||||
|
||||
if (connected_ == kConnected) {
|
||||
FlushPendingRequests();
|
||||
return;
|
||||
}
|
||||
if (connected_ != kNotYetConnected) {
|
||||
LOG_WARN(kRPC, << "RpcConnectionImpl::ConnectAndFlush called while connected=" << ToString(connected_));
|
||||
return;
|
||||
}
|
||||
connected_ = kConnecting;
|
||||
|
||||
// Take the first endpoint, but remember the alternatives for later
|
||||
additional_endpoints_ = server;
|
||||
::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front();
|
||||
additional_endpoints_.erase(additional_endpoints_.begin());
|
||||
current_endpoint_ = first_endpoint;
|
||||
|
||||
auto shared_this = shared_from_this();
|
||||
socket_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code &ec) {
|
||||
ConnectComplete(ec, first_endpoint);
|
||||
});
|
||||
|
||||
// Prompt the timer to timeout
|
||||
auto weak_this = std::weak_ptr<RpcConnection>(shared_this);
|
||||
connect_timer_.expires_from_now(
|
||||
std::chrono::milliseconds(options_.rpc_connect_timeout));
|
||||
connect_timer_.async_wait([shared_this, this, first_endpoint](const ::asio::error_code &ec) {
|
||||
if (ec)
|
||||
ConnectComplete(ec, first_endpoint);
|
||||
else
|
||||
ConnectComplete(make_error_code(asio::error::host_unreachable), first_endpoint);
|
||||
});
|
||||
}
|
||||
|
||||
template <class Socket>
|
||||
void RpcConnectionImpl<Socket>::ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint & remote) {
|
||||
auto shared_this = RpcConnectionImpl<Socket>::shared_from_this();
|
||||
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
|
||||
connect_timer_.cancel();
|
||||
|
||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called");
|
||||
|
||||
// Could be an old async connect returning a result after we've moved on
|
||||
if (remote != current_endpoint_) {
|
||||
LOG_DEBUG(kRPC, << "Got ConnectComplete for " << remote << " but current_endpoint_ is " << current_endpoint_);
|
||||
return;
|
||||
}
|
||||
if (connected_ != kConnecting) {
|
||||
LOG_DEBUG(kRPC, << "Got ConnectComplete but current state is " << connected_);;
|
||||
return;
|
||||
}
|
||||
|
||||
Status status = ToStatus(ec);
|
||||
if(event_handlers_) {
|
||||
event_response event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0);
|
||||
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
|
||||
if (event_resp.response() == event_response::kTest_Error) {
|
||||
status = event_resp.status();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
if (status.ok()) {
|
||||
StartReading();
|
||||
SendHandshake([shared_this, this](const Status & s) {
|
||||
HandshakeComplete(s);
|
||||
});
|
||||
} else {
|
||||
LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());;
|
||||
std::string err = SafeDisconnect(get_asio_socket_ptr(&socket_));
|
||||
if(!err.empty()) {
|
||||
LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error closing connection: " << err);
|
||||
}
|
||||
|
||||
if (!additional_endpoints_.empty()) {
|
||||
// If we have additional endpoints, keep trying until we either run out or
|
||||
// hit one
|
||||
::asio::ip::tcp::endpoint next_endpoint = additional_endpoints_.front();
|
||||
additional_endpoints_.erase(additional_endpoints_.begin());
|
||||
current_endpoint_ = next_endpoint;
|
||||
|
||||
socket_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code &ec) {
|
||||
ConnectComplete(ec, next_endpoint);
|
||||
});
|
||||
connect_timer_.expires_from_now(
|
||||
std::chrono::milliseconds(options_.rpc_connect_timeout));
|
||||
connect_timer_.async_wait([shared_this, this, next_endpoint](const ::asio::error_code &ec) {
|
||||
if (ec)
|
||||
ConnectComplete(ec, next_endpoint);
|
||||
else
|
||||
ConnectComplete(make_error_code(asio::error::host_unreachable), next_endpoint);
|
||||
});
|
||||
} else {
|
||||
CommsError(status);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <class Socket>
|
||||
void RpcConnectionImpl<Socket>::SendHandshake(RpcCallback &handler) {
|
||||
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling
|
||||
|
||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called");
|
||||
connected_ = kHandshaking;
|
||||
|
||||
auto shared_this = shared_from_this();
|
||||
auto handshake_packet = PrepareHandshakePacket();
|
||||
::asio::async_write(socket_, asio::buffer(*handshake_packet),
|
||||
[handshake_packet, handler, shared_this, this](
|
||||
const ::asio::error_code &ec, size_t) {
|
||||
Status status = ToStatus(ec);
|
||||
handler(status);
|
||||
});
|
||||
}
|
||||
|
||||
template <class Socket>
|
||||
void RpcConnectionImpl<Socket>::SendContext(RpcCallback &handler) {
|
||||
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling
|
||||
|
||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::SendContext called");
|
||||
|
||||
auto shared_this = shared_from_this();
|
||||
auto context_packet = PrepareContextPacket();
|
||||
::asio::async_write(socket_, asio::buffer(*context_packet),
|
||||
[context_packet, handler, shared_this, this](
|
||||
const ::asio::error_code &ec, size_t) {
|
||||
Status status = ToStatus(ec);
|
||||
handler(status);
|
||||
});
|
||||
}
|
||||
|
||||
template <class Socket>
|
||||
void RpcConnectionImpl<Socket>::OnSendCompleted(const ::asio::error_code &ec,
|
||||
size_t) {
|
||||
using std::placeholders::_1;
|
||||
using std::placeholders::_2;
|
||||
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
|
||||
|
||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::OnSendCompleted called");
|
||||
|
||||
request_over_the_wire_.reset();
|
||||
if (ec) {
|
||||
LOG_WARN(kRPC, << "Network error during RPC write: " << ec.message());
|
||||
CommsError(ToStatus(ec));
|
||||
return;
|
||||
}
|
||||
|
||||
FlushPendingRequests();
|
||||
}
|
||||
|
||||
template <class Socket>
|
||||
void RpcConnectionImpl<Socket>::FlushPendingRequests() {
|
||||
using namespace ::std::placeholders;
|
||||
|
||||
// Lock should be held
|
||||
assert(lock_held(connection_state_lock_));
|
||||
|
||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::FlushPendingRequests called");
|
||||
|
||||
// Don't send if we don't need to
|
||||
if (request_over_the_wire_) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::shared_ptr<Request> req;
|
||||
switch (connected_) {
|
||||
case kNotYetConnected:
|
||||
return;
|
||||
case kConnecting:
|
||||
return;
|
||||
case kHandshaking:
|
||||
return;
|
||||
case kAuthenticating:
|
||||
if (auth_requests_.empty()) {
|
||||
return;
|
||||
}
|
||||
req = auth_requests_.front();
|
||||
auth_requests_.erase(auth_requests_.begin());
|
||||
break;
|
||||
case kConnected:
|
||||
if (pending_requests_.empty()) {
|
||||
return;
|
||||
}
|
||||
req = pending_requests_.front();
|
||||
pending_requests_.erase(pending_requests_.begin());
|
||||
break;
|
||||
case kDisconnected:
|
||||
LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests attempted to flush a " << ToString(connected_) << " connection");
|
||||
return;
|
||||
default:
|
||||
LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests invalid state: " << ToString(connected_));
|
||||
return;
|
||||
}
|
||||
|
||||
std::shared_ptr<RpcConnection> shared_this = shared_from_this();
|
||||
auto weak_this = std::weak_ptr<RpcConnection>(shared_this);
|
||||
auto weak_req = std::weak_ptr<Request>(req);
|
||||
|
||||
std::shared_ptr<std::string> payload = std::make_shared<std::string>();
|
||||
req->GetPacket(payload.get());
|
||||
if (!payload->empty()) {
|
||||
assert(requests_on_fly_.find(req->call_id()) == requests_on_fly_.end());
|
||||
requests_on_fly_[req->call_id()] = req;
|
||||
request_over_the_wire_ = req;
|
||||
|
||||
req->timer().expires_from_now(
|
||||
std::chrono::milliseconds(options_.rpc_timeout));
|
||||
req->timer().async_wait([weak_this, weak_req, this](const ::asio::error_code &ec) {
|
||||
auto timeout_this = weak_this.lock();
|
||||
auto timeout_req = weak_req.lock();
|
||||
if (timeout_this && timeout_req)
|
||||
this->HandleRpcTimeout(timeout_req, ec);
|
||||
});
|
||||
|
||||
asio::async_write(socket_, asio::buffer(*payload),
|
||||
[shared_this, this, payload](const ::asio::error_code &ec,
|
||||
size_t size) {
|
||||
OnSendCompleted(ec, size);
|
||||
});
|
||||
} else { // Nothing to send for this request, inform the handler immediately
|
||||
io_service().post(
|
||||
// Never hold locks when calling a callback
|
||||
[req]() { req->OnResponseArrived(nullptr, Status::OK()); }
|
||||
);
|
||||
|
||||
// Reschedule to flush the next one
|
||||
AsyncFlushPendingRequests();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template <class Socket>
|
||||
void RpcConnectionImpl<Socket>::OnRecvCompleted(const ::asio::error_code &original_ec,
|
||||
size_t) {
|
||||
using std::placeholders::_1;
|
||||
using std::placeholders::_2;
|
||||
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
|
||||
|
||||
::asio::error_code my_ec(original_ec);
|
||||
|
||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called");
|
||||
|
||||
std::shared_ptr<RpcConnection> shared_this = shared_from_this();
|
||||
|
||||
if(event_handlers_) {
|
||||
event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0);
|
||||
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
|
||||
if (event_resp.response() == event_response::kTest_Error) {
|
||||
my_ec = std::make_error_code(std::errc::network_down);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
switch (my_ec.value()) {
|
||||
case 0:
|
||||
// No errors
|
||||
break;
|
||||
case asio::error::operation_aborted:
|
||||
// The event loop has been shut down. Ignore the error.
|
||||
return;
|
||||
default:
|
||||
LOG_WARN(kRPC, << "Network error during RPC read: " << my_ec.message());
|
||||
CommsError(ToStatus(my_ec));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!current_response_state_) { /* start a new one */
|
||||
current_response_state_ = std::make_shared<Response>();
|
||||
}
|
||||
|
||||
if (current_response_state_->state_ == Response::kReadLength) {
|
||||
current_response_state_->state_ = Response::kReadContent;
|
||||
auto buf = ::asio::buffer(reinterpret_cast<char *>(¤t_response_state_->length_),
|
||||
sizeof(current_response_state_->length_));
|
||||
asio::async_read(
|
||||
socket_, buf,
|
||||
[shared_this, this](const ::asio::error_code &ec, size_t size) {
|
||||
OnRecvCompleted(ec, size);
|
||||
});
|
||||
} else if (current_response_state_->state_ == Response::kReadContent) {
|
||||
current_response_state_->state_ = Response::kParseResponse;
|
||||
current_response_state_->length_ = ntohl(current_response_state_->length_);
|
||||
current_response_state_->data_.resize(current_response_state_->length_);
|
||||
asio::async_read(
|
||||
socket_, ::asio::buffer(current_response_state_->data_),
|
||||
[shared_this, this](const ::asio::error_code &ec, size_t size) {
|
||||
OnRecvCompleted(ec, size);
|
||||
});
|
||||
} else if (current_response_state_->state_ == Response::kParseResponse) {
|
||||
// Check return status from the RPC response. We may have received a msg
|
||||
// indicating a server side error.
|
||||
|
||||
Status stat = HandleRpcResponse(current_response_state_);
|
||||
|
||||
if(stat.get_server_exception_type() == Status::kStandbyException) {
|
||||
// May need to bail out, connect to new NN, and restart loop
|
||||
LOG_INFO(kRPC, << "Communicating with standby NN, attempting to reconnect");
|
||||
}
|
||||
|
||||
current_response_state_ = nullptr;
|
||||
StartReading();
|
||||
}
|
||||
}
|
||||
|
||||
template <class Socket>
|
||||
void RpcConnectionImpl<Socket>::Disconnect() {
|
||||
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling
|
||||
|
||||
LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called");
|
||||
|
||||
request_over_the_wire_.reset();
|
||||
if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == kAuthenticating || connected_ == kConnected) {
|
||||
// Don't print out errors, we were expecting a disconnect here
|
||||
SafeDisconnect(get_asio_socket_ptr(&socket_));
|
||||
}
|
||||
connected_ = kDisconnected;
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
|
@ -16,13 +16,12 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
#include "rpc_engine.h"
|
||||
#include "rpc_connection.h"
|
||||
#include "rpc_connection_impl.h"
|
||||
#include "common/util.h"
|
||||
#include "common/logging.h"
|
||||
#include "common/namenode_info.h"
|
||||
#include "optional.hpp"
|
||||
|
||||
#include <future>
|
||||
#include <algorithm>
|
||||
|
||||
namespace hdfs {
|
||||
|
@ -30,114 +29,6 @@ namespace hdfs {
|
|||
template <class T>
|
||||
using optional = std::experimental::optional<T>;
|
||||
|
||||
HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers,
|
||||
::asio::io_service *ioservice,
|
||||
std::shared_ptr<LibhdfsEvents> event_handlers)
|
||||
: enabled_(false), resolved_(false),
|
||||
ioservice_(ioservice), event_handlers_(event_handlers)
|
||||
{
|
||||
LOG_TRACE(kRPC, << "HANamenodeTracker got the following nodes");
|
||||
for(unsigned int i=0;i<servers.size();i++)
|
||||
LOG_TRACE(kRPC, << servers[i].str());
|
||||
|
||||
if(servers.size() >= 2) {
|
||||
LOG_TRACE(kRPC, << "Creating HA namenode tracker");
|
||||
if(servers.size() > 2) {
|
||||
LOG_WARN(kRPC, << "Nameservice declares more than two nodes. Some won't be used.");
|
||||
}
|
||||
|
||||
active_info_ = servers[0];
|
||||
standby_info_ = servers[1];
|
||||
LOG_INFO(kRPC, << "Active namenode url = " << active_info_.uri.str());
|
||||
LOG_INFO(kRPC, << "Standby namenode url = " << standby_info_.uri.str());
|
||||
|
||||
enabled_ = true;
|
||||
if(!active_info_.endpoints.empty() || !standby_info_.endpoints.empty()) {
|
||||
resolved_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
HANamenodeTracker::~HANamenodeTracker() { }
|
||||
|
||||
|
||||
static std::string format_endpoints(const std::vector<::asio::ip::tcp::endpoint> &pts) {
|
||||
std::stringstream ss;
|
||||
for(unsigned int i=0; i<pts.size(); i++)
|
||||
if(i == pts.size() - 1)
|
||||
ss << pts[i];
|
||||
else
|
||||
ss << pts[i] << ", ";
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
// Pass in endpoint from current connection, this will do a reverse lookup
|
||||
// and return the info for the standby node. It will also swap its state internally.
|
||||
ResolvedNamenodeInfo HANamenodeTracker::GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint) {
|
||||
LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoint);
|
||||
mutex_guard swap_lock(swap_lock_);
|
||||
|
||||
ResolvedNamenodeInfo failover_node;
|
||||
|
||||
// Connected to standby, switch standby to active
|
||||
if(IsCurrentActive_locked(current_endpoint)) {
|
||||
std::swap(active_info_, standby_info_);
|
||||
if(event_handlers_)
|
||||
event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(),
|
||||
reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
|
||||
failover_node = active_info_;
|
||||
} else if(IsCurrentStandby_locked(current_endpoint)) {
|
||||
// Connected to standby
|
||||
if(event_handlers_)
|
||||
event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(),
|
||||
reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
|
||||
failover_node = active_info_;
|
||||
} else {
|
||||
// Invalid state, throw for testing
|
||||
std::string ep1 = format_endpoints(active_info_.endpoints);
|
||||
std::string ep2 = format_endpoints(standby_info_.endpoints);
|
||||
|
||||
std::stringstream msg;
|
||||
msg << "Looked for " << current_endpoint << " in\n";
|
||||
msg << ep1 << " and\n";
|
||||
msg << ep2 << std::endl;
|
||||
|
||||
LOG_ERROR(kRPC, << "Unable to find RPC connection in config " << msg.str() << ". Bailing out.");
|
||||
throw std::runtime_error(msg.str());
|
||||
}
|
||||
|
||||
if(failover_node.endpoints.empty()) {
|
||||
LOG_WARN(kRPC, << "No endpoints for node " << failover_node.uri.str() << " attempting to resolve again");
|
||||
if(!ResolveInPlace(ioservice_, failover_node)) {
|
||||
LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << failover_node.uri.str()
|
||||
<< "failed. Please make sure your configuration is up to date.");
|
||||
}
|
||||
}
|
||||
return failover_node;
|
||||
}
|
||||
|
||||
bool HANamenodeTracker::IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const {
|
||||
for(unsigned int i=0;i<active_info_.endpoints.size();i++) {
|
||||
if(ep.address() == active_info_.endpoints[i].address()) {
|
||||
if(ep.port() != active_info_.endpoints[i].port())
|
||||
LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << active_info_.endpoints[i] << " trying anyway..");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool HANamenodeTracker::IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const {
|
||||
for(unsigned int i=0;i<standby_info_.endpoints.size();i++) {
|
||||
if(ep.address() == standby_info_.endpoints[i].address()) {
|
||||
if(ep.port() != standby_info_.endpoints[i].port())
|
||||
LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << standby_info_.endpoints[i] << " trying anyway..");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
|
||||
const std::string &client_name, const std::string &user_name,
|
||||
|
@ -276,19 +167,6 @@ void RpcEngine::AsyncRpc(
|
|||
conn_->AsyncRpc(method_name, req, resp, handler);
|
||||
}
|
||||
|
||||
Status RpcEngine::Rpc(
|
||||
const std::string &method_name, const ::google::protobuf::MessageLite *req,
|
||||
const std::shared_ptr<::google::protobuf::MessageLite> &resp) {
|
||||
|
||||
LOG_TRACE(kRPC, << "RpcEngine::Rpc called");
|
||||
|
||||
auto stat = std::make_shared<std::promise<Status>>();
|
||||
std::future<Status> future(stat->get_future());
|
||||
AsyncRpc(method_name, req, resp,
|
||||
[stat](const Status &status) { stat->set_value(status); });
|
||||
return future.get();
|
||||
}
|
||||
|
||||
std::shared_ptr<RpcConnection> RpcEngine::NewConnection()
|
||||
{
|
||||
LOG_DEBUG(kRPC, << "RpcEngine::NewConnection called");
|
||||
|
@ -304,7 +182,6 @@ std::shared_ptr<RpcConnection> RpcEngine::InitializeConnection()
|
|||
return result;
|
||||
}
|
||||
|
||||
|
||||
void RpcEngine::AsyncRpcCommsError(
|
||||
const Status &status,
|
||||
std::shared_ptr<RpcConnection> failedConnection,
|
||||
|
|
|
@ -25,25 +25,19 @@
|
|||
#include "common/retry_policy.h"
|
||||
#include "common/libhdfs_events_impl.h"
|
||||
#include "common/util.h"
|
||||
#include "common/continuation/asio.h"
|
||||
#include "common/logging.h"
|
||||
#include "common/new_delete.h"
|
||||
#include "common/namenode_info.h"
|
||||
#include "namenode_tracker.h"
|
||||
|
||||
#include <google/protobuf/message_lite.h>
|
||||
#include <google/protobuf/io/coded_stream.h>
|
||||
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
|
||||
|
||||
#include <asio/ip/tcp.hpp>
|
||||
#include <asio/deadline_timer.hpp>
|
||||
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
#include <deque>
|
||||
#include <mutex>
|
||||
#include <future>
|
||||
|
||||
namespace hdfs {
|
||||
|
||||
|
@ -64,193 +58,8 @@ typedef const std::function<void(const Status &)> RpcCallback;
|
|||
class LockFreeRpcEngine;
|
||||
class RpcConnection;
|
||||
class SaslProtocol;
|
||||
|
||||
/*
|
||||
* Internal bookkeeping for an outstanding request from the consumer.
|
||||
*
|
||||
* Threading model: not thread-safe; should only be accessed from a single
|
||||
* thread at a time
|
||||
*/
|
||||
class Request {
|
||||
public:
|
||||
MEMCHECKED_CLASS(Request)
|
||||
typedef std::function<void(::google::protobuf::io::CodedInputStream *is,
|
||||
const Status &status)> Handler;
|
||||
|
||||
Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id,
|
||||
const std::string &request, Handler &&callback);
|
||||
Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id,
|
||||
const ::google::protobuf::MessageLite *request, Handler &&callback);
|
||||
|
||||
// Null request (with no actual message) used to track the state of an
|
||||
// initial Connect call
|
||||
Request(LockFreeRpcEngine *engine, Handler &&handler);
|
||||
|
||||
int call_id() const { return call_id_; }
|
||||
std::string method_name() const { return method_name_; }
|
||||
::asio::deadline_timer &timer() { return timer_; }
|
||||
int IncrementRetryCount() { return retry_count_++; }
|
||||
int IncrementFailoverCount();
|
||||
void GetPacket(std::string *res) const;
|
||||
void OnResponseArrived(::google::protobuf::io::CodedInputStream *is,
|
||||
const Status &status);
|
||||
|
||||
int get_failover_count() {return failover_count_;}
|
||||
|
||||
std::string GetDebugString() const;
|
||||
|
||||
private:
|
||||
LockFreeRpcEngine *const engine_;
|
||||
const std::string method_name_;
|
||||
const int call_id_;
|
||||
|
||||
::asio::deadline_timer timer_;
|
||||
std::string payload_;
|
||||
const Handler handler_;
|
||||
|
||||
int retry_count_;
|
||||
int failover_count_;
|
||||
};
|
||||
|
||||
/*
|
||||
* Encapsulates a persistent connection to the NameNode, and the sending of
|
||||
* RPC requests and evaluating their responses.
|
||||
*
|
||||
* Can have multiple RPC requests in-flight simultaneously, but they are
|
||||
* evaluated in-order on the server side in a blocking manner.
|
||||
*
|
||||
* Threading model: public interface is thread-safe
|
||||
* All handlers passed in to method calls will be called from an asio thread,
|
||||
* and will not be holding any internal RpcConnection locks.
|
||||
*/
|
||||
class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
|
||||
public:
|
||||
MEMCHECKED_CLASS(RpcConnection)
|
||||
RpcConnection(LockFreeRpcEngine *engine);
|
||||
virtual ~RpcConnection();
|
||||
|
||||
// Note that a single server can have multiple endpoints - especially both
|
||||
// an ipv4 and ipv6 endpoint
|
||||
virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
|
||||
const AuthInfo & auth_info,
|
||||
RpcCallback &handler) = 0;
|
||||
virtual void ConnectAndFlush(const std::vector<::asio::ip::tcp::endpoint> &server) = 0;
|
||||
virtual void Disconnect() = 0;
|
||||
|
||||
void StartReading();
|
||||
void AsyncRpc(const std::string &method_name,
|
||||
const ::google::protobuf::MessageLite *req,
|
||||
std::shared_ptr<::google::protobuf::MessageLite> resp,
|
||||
const RpcCallback &handler);
|
||||
|
||||
void AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests);
|
||||
|
||||
// Enqueue requests before the connection is connected. Will be flushed
|
||||
// on connect
|
||||
void PreEnqueueRequests(std::vector<std::shared_ptr<Request>> requests);
|
||||
|
||||
// Put requests at the front of the current request queue
|
||||
void PrependRequests_locked(std::vector<std::shared_ptr<Request>> requests);
|
||||
|
||||
void SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers);
|
||||
void SetClusterName(std::string cluster_name);
|
||||
|
||||
LockFreeRpcEngine *engine() { return engine_; }
|
||||
::asio::io_service &io_service();
|
||||
|
||||
protected:
|
||||
struct Response {
|
||||
enum ResponseState {
|
||||
kReadLength,
|
||||
kReadContent,
|
||||
kParseResponse,
|
||||
} state_;
|
||||
unsigned length_;
|
||||
std::vector<char> data_;
|
||||
|
||||
std::unique_ptr<::google::protobuf::io::ArrayInputStream> ar;
|
||||
std::unique_ptr<::google::protobuf::io::CodedInputStream> in;
|
||||
|
||||
Response() : state_(kReadLength), length_(0) {}
|
||||
};
|
||||
|
||||
|
||||
// Initial handshaking protocol: connect->handshake-->(auth)?-->context->connected
|
||||
virtual void SendHandshake(RpcCallback &handler) = 0;
|
||||
void HandshakeComplete(const Status &s);
|
||||
void AuthComplete(const Status &s, const AuthInfo & new_auth_info);
|
||||
void AuthComplete_locked(const Status &s, const AuthInfo & new_auth_info);
|
||||
virtual void SendContext(RpcCallback &handler) = 0;
|
||||
void ContextComplete(const Status &s);
|
||||
|
||||
|
||||
virtual void OnSendCompleted(const ::asio::error_code &ec,
|
||||
size_t transferred) = 0;
|
||||
virtual void OnRecvCompleted(const ::asio::error_code &ec,
|
||||
size_t transferred) = 0;
|
||||
virtual void FlushPendingRequests()=0; // Synchronously write the next request
|
||||
|
||||
void AsyncRpc_locked(
|
||||
const std::string &method_name,
|
||||
const ::google::protobuf::MessageLite *req,
|
||||
std::shared_ptr<::google::protobuf::MessageLite> resp,
|
||||
const RpcCallback &handler);
|
||||
void SendRpcRequests(const std::vector<std::shared_ptr<Request> > & requests);
|
||||
void AsyncFlushPendingRequests(); // Queue requests to be flushed at a later time
|
||||
|
||||
|
||||
|
||||
std::shared_ptr<std::string> PrepareHandshakePacket();
|
||||
std::shared_ptr<std::string> PrepareContextPacket();
|
||||
static std::string SerializeRpcRequest(
|
||||
const std::string &method_name,
|
||||
const ::google::protobuf::MessageLite *req);
|
||||
Status HandleRpcResponse(std::shared_ptr<Response> response);
|
||||
void HandleRpcTimeout(std::shared_ptr<Request> req,
|
||||
const ::asio::error_code &ec);
|
||||
void CommsError(const Status &status);
|
||||
|
||||
void ClearAndDisconnect(const ::asio::error_code &ec);
|
||||
std::shared_ptr<Request> RemoveFromRunningQueue(int call_id);
|
||||
|
||||
LockFreeRpcEngine *const engine_;
|
||||
std::shared_ptr<Response> current_response_state_;
|
||||
AuthInfo auth_info_;
|
||||
|
||||
// Connection can have deferred connection, especially when we're pausing
|
||||
// during retry
|
||||
enum ConnectedState {
|
||||
kNotYetConnected,
|
||||
kConnecting,
|
||||
kHandshaking,
|
||||
kAuthenticating,
|
||||
kConnected,
|
||||
kDisconnected
|
||||
};
|
||||
static std::string ToString(ConnectedState connected);
|
||||
ConnectedState connected_;
|
||||
|
||||
// State machine for performing a SASL handshake
|
||||
std::shared_ptr<SaslProtocol> sasl_protocol_;
|
||||
// The request being sent over the wire; will also be in requests_on_fly_
|
||||
std::shared_ptr<Request> request_over_the_wire_;
|
||||
// Requests to be sent over the wire
|
||||
std::deque<std::shared_ptr<Request>> pending_requests_;
|
||||
// Requests to be sent over the wire during authentication; not retried if
|
||||
// there is a connection error
|
||||
std::deque<std::shared_ptr<Request>> auth_requests_;
|
||||
// Requests that are waiting for responses
|
||||
typedef std::unordered_map<int, std::shared_ptr<Request>> RequestOnFlyMap;
|
||||
RequestOnFlyMap requests_on_fly_;
|
||||
std::shared_ptr<LibhdfsEvents> event_handlers_;
|
||||
std::string cluster_name_;
|
||||
|
||||
// Lock for mutable parts of this class that need to be thread safe
|
||||
std::mutex connection_state_lock_;
|
||||
|
||||
friend class SaslProtocol;
|
||||
};
|
||||
|
||||
class RpcConnection;
|
||||
class Request;
|
||||
|
||||
/*
|
||||
* These methods of the RpcEngine will never acquire locks, and are safe for
|
||||
|
@ -259,6 +68,7 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
|
|||
class LockFreeRpcEngine {
|
||||
public:
|
||||
MEMCHECKED_CLASS(LockFreeRpcEngine)
|
||||
|
||||
/* Enqueues a CommsError without acquiring a lock*/
|
||||
virtual void AsyncRpcCommsError(const Status &status,
|
||||
std::shared_ptr<RpcConnection> failedConnection,
|
||||
|
@ -278,54 +88,6 @@ public:
|
|||
};
|
||||
|
||||
|
||||
/*
|
||||
* Tracker gives the RpcEngine a quick way to use an endpoint that just
|
||||
* failed in order to lookup a set of endpoints for a failover node.
|
||||
*
|
||||
* Note: For now this only deals with 2 NameNodes, but that's the default
|
||||
* anyway.
|
||||
*/
|
||||
class HANamenodeTracker {
|
||||
public:
|
||||
HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers,
|
||||
::asio::io_service *ioservice,
|
||||
std::shared_ptr<LibhdfsEvents> event_handlers_);
|
||||
|
||||
virtual ~HANamenodeTracker();
|
||||
|
||||
bool is_enabled() const { return enabled_; }
|
||||
bool is_resolved() const { return resolved_; }
|
||||
|
||||
// Get node opposite of the current one if possible (swaps active/standby)
|
||||
// Note: This will always mutate internal state. Use IsCurrentActive/Standby to
|
||||
// get info without changing state
|
||||
ResolvedNamenodeInfo GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint);
|
||||
|
||||
bool IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const;
|
||||
bool IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const;
|
||||
|
||||
private:
|
||||
// If HA should be enabled, according to our options and runtime info like # nodes provided
|
||||
bool enabled_;
|
||||
// If we were able to resolve at least 1 HA namenode
|
||||
bool resolved_;
|
||||
|
||||
// Keep service in case a second round of DNS lookup is required
|
||||
::asio::io_service *ioservice_;
|
||||
|
||||
// Event handlers, for now this is the simplest place to catch all failover events
|
||||
// and push info out to client application. Possibly move into RPCEngine.
|
||||
std::shared_ptr<LibhdfsEvents> event_handlers_;
|
||||
|
||||
// Only support 1 active and 1 standby for now.
|
||||
ResolvedNamenodeInfo active_info_;
|
||||
ResolvedNamenodeInfo standby_info_;
|
||||
|
||||
// Aquire when switching from active-standby
|
||||
std::mutex swap_lock_;
|
||||
};
|
||||
|
||||
|
||||
/*
|
||||
* An engine for reliable communication with a NameNode. Handles connection,
|
||||
* retry, and (someday) failover of the requested messages.
|
||||
|
@ -360,10 +122,6 @@ class RpcEngine : public LockFreeRpcEngine {
|
|||
const std::shared_ptr<::google::protobuf::MessageLite> &resp,
|
||||
const std::function<void(const Status &)> &handler);
|
||||
|
||||
Status Rpc(const std::string &method_name,
|
||||
const ::google::protobuf::MessageLite *req,
|
||||
const std::shared_ptr<::google::protobuf::MessageLite> &resp);
|
||||
|
||||
void Shutdown();
|
||||
|
||||
/* Enqueues a CommsError without acquiring a lock*/
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
|
||||
#include "rpc_engine.h"
|
||||
#include "rpc_connection.h"
|
||||
#include "common/logging.h"
|
||||
|
||||
#include "sasl_engine.h"
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#include "mock_connection.h"
|
||||
#include "test.pb.h"
|
||||
#include "RpcHeader.pb.h"
|
||||
#include "rpc/rpc_connection.h"
|
||||
#include "rpc/rpc_connection_impl.h"
|
||||
#include "common/namenode_info.h"
|
||||
|
||||
#include <google/protobuf/io/coded_stream.h>
|
||||
|
|
Loading…
Reference in New Issue