HDFS-8737. Initial implementation of a Hadoop RPC v9 client. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2015-07-08 13:13:03 -07:00 committed by James Clampffer
parent 95b479b8c1
commit 40a1f3631d
10 changed files with 824 additions and 1 deletions

View File

@ -18,6 +18,9 @@
project (libhdfspp) project (libhdfspp)
find_package(Protobuf REQUIRED)
find_package(Threads)
add_definitions(-DASIO_STANDALONE -DASIO_CPP11_DATE_TIME) add_definitions(-DASIO_STANDALONE -DASIO_CPP11_DATE_TIME)
if(UNIX) if(UNIX)
@ -30,6 +33,13 @@ if(APPLE)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++ -Wno-deprecated-declarations") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++ -Wno-deprecated-declarations")
endif() endif()
include_directories(third_party/gmock-1.7.0) include_directories(
include
lib
${PROJECT_BINARY_DIR}/lib/proto
third_party/asio-1.10.2/include
third_party/gmock-1.7.0
)
add_subdirectory(third_party/gmock-1.7.0) add_subdirectory(third_party/gmock-1.7.0)
add_subdirectory(lib)

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBHDFSPP_STATUS_H_
#define LIBHDFSPP_STATUS_H_
#include <string>
#include <system_error>
namespace hdfs {
class StatusHelper;
class Status {
public:
// Create a success status.
Status() : state_(NULL) { }
~Status() { delete[] state_; }
explicit Status(int code, const char *msg);
// Copy the specified status.
Status(const Status& s);
void operator=(const Status& s);
// Return a success status.
static Status OK() { return Status(); }
static Status InvalidArgument(const char *msg)
{ return Status(kInvalidArgument, msg); }
static Status ResourceUnavailable(const char *msg)
{ return Status(kResourceUnavailable, msg); }
static Status Unimplemented()
{ return Status(kUnimplemented, ""); }
static Status Exception(const char *expception_class_name, const char *error_message)
{ return Status(kException, expception_class_name, error_message); }
// Returns true iff the status indicates success.
bool ok() const { return (state_ == NULL); }
// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.
std::string ToString() const;
int code() const {
return (state_ == NULL) ? kOk : static_cast<int>(state_[4]);
}
private:
// OK status has a NULL state_. Otherwise, state_ is a new[] array
// of the following form:
// state_[0..3] == length of message
// state_[4] == code
// state_[5..] == message
const char* state_;
friend class StatusHelper;
enum Code {
kOk = 0,
kInvalidArgument = static_cast<unsigned>(std::errc::invalid_argument),
kResourceUnavailable = static_cast<unsigned>(std::errc::resource_unavailable_try_again),
kUnimplemented = static_cast<unsigned>(std::errc::function_not_supported),
kException = 256,
};
explicit Status(int code, const char *msg1, const char *msg2);
static const char *CopyState(const char* s);
static const char *ConstructState(int code, const char *msg1, const char *msg2);
};
inline Status::Status(const Status& s) {
state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_);
}
inline void Status::operator=(const Status& s) {
// The following condition catches both aliasing (when this == &s),
// and the common case where both s and *this are ok.
if (state_ != s.state_) {
delete[] state_;
state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_);
}
}
}
#endif

View File

@ -0,0 +1,2 @@
add_subdirectory(rpc)
add_subdirectory(proto)

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIB_COMMON_UTIL_H_
#define LIB_COMMON_UTIL_H_
#include "libhdfspp/status.h"
#include <asio/error_code.hpp>
#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/coded_stream.h>
namespace hdfs {
static inline Status ToStatus(const ::asio::error_code &ec) {
if (ec) {
return Status(ec.value(), ec.message().c_str());
} else {
return Status::OK();
}
}
static inline int DelimitedPBMessageSize(
const ::google::protobuf::MessageLite *msg) {
size_t size = msg->ByteSize();
return ::google::protobuf::io::CodedOutputStream::VarintSize32(size) + size;
}
static inline void ReadDelimitedPBMessage(
::google::protobuf::io::CodedInputStream *in,
::google::protobuf::MessageLite *msg) {
uint32_t size = 0;
in->ReadVarint32(&size);
auto limit = in->PushLimit(size);
msg->ParseFromCodedStream(in);
in->PopLimit(limit);
}
std::string Base64Encode(const std::string &src);
}
#endif

View File

@ -0,0 +1,21 @@
set(CLIENT_PROTO_DIR ${CMAKE_SOURCE_DIR}/../proto)
set(COMMON_PROTO_DIR ${CMAKE_SOURCE_DIR}/../../../../../hadoop-common-project/hadoop-common/src/main/proto)
set(PROTOBUF_IMPORT_DIRS ${CLIENT_PROTO_DIR} ${COMMON_PROTO_DIR})
protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS
${CLIENT_PROTO_DIR}/datatransfer.proto
${CLIENT_PROTO_DIR}/ClientDatanodeProtocol.proto
${CLIENT_PROTO_DIR}/ClientNamenodeProtocol.proto
${CLIENT_PROTO_DIR}/acl.proto
${CLIENT_PROTO_DIR}/datatransfer.proto
${CLIENT_PROTO_DIR}/encryption.proto
${CLIENT_PROTO_DIR}/hdfs.proto
${CLIENT_PROTO_DIR}/inotify.proto
${CLIENT_PROTO_DIR}/xattr.proto
${COMMON_PROTO_DIR}/IpcConnectionContext.proto
${COMMON_PROTO_DIR}/ProtobufRpcEngine.proto
${COMMON_PROTO_DIR}/RpcHeader.proto
${COMMON_PROTO_DIR}/Security.proto
)
add_library(proto ${PROTO_SRCS} ${PROTO_HDRS})

View File

@ -0,0 +1,3 @@
include_directories(${OPENSSL_INCLUDE_DIRS})
add_library(rpc rpc_connection.cc rpc_engine.cc)
add_dependencies(rpc proto)

View File

@ -0,0 +1,225 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "rpc_engine.h"
#include "RpcHeader.pb.h"
#include "ProtobufRpcEngine.pb.h"
#include "IpcConnectionContext.pb.h"
#include "common/util.h"
#include <asio/read.hpp>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
namespace hdfs {
namespace pb = ::google::protobuf;
namespace pbio = ::google::protobuf::io;
using namespace ::hadoop::common;
using namespace ::std::placeholders;
static void
ConstructPacket(std::string *res,
std::initializer_list<const pb::MessageLite *> headers,
const std::string *request) {
int len = 0;
std::for_each(
headers.begin(), headers.end(),
[&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); });
if (request) {
len += pbio::CodedOutputStream::VarintSize32(request->size()) +
request->size();
}
int net_len = htonl(len);
res->reserve(res->size() + sizeof(net_len) + len);
pbio::StringOutputStream ss(res);
pbio::CodedOutputStream os(&ss);
os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len));
uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
assert(buf && "Cannot allocate memory");
std::for_each(
headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) {
buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), buf);
buf = v->SerializeWithCachedSizesToArray(buf);
});
if (request) {
buf = pbio::CodedOutputStream::WriteVarint32ToArray(request->size(), buf);
buf = os.WriteStringToArray(*request, buf);
}
}
static void SetRequestHeader(RpcEngine *engine, int call_id,
const std::string &method_name,
RpcRequestHeaderProto *rpc_header,
RequestHeaderProto *req_header) {
rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER);
rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
rpc_header->set_callid(call_id);
rpc_header->set_clientid(engine->client_name());
req_header->set_methodname(method_name);
req_header->set_declaringclassprotocolname(engine->protocol_name());
req_header->set_clientprotocolversion(engine->protocol_version());
}
RpcConnection::~RpcConnection() {}
RpcConnection::Request::Request(RpcConnection *parent,
const std::string &method_name,
const std::string &request, Handler &&handler)
: call_id_(parent->engine_->NextCallId()), timer_(parent->io_service()),
handler_(std::move(handler)) {
RpcRequestHeaderProto rpc_header;
RequestHeaderProto req_header;
SetRequestHeader(parent->engine_, call_id_, method_name, &rpc_header,
&req_header);
ConstructPacket(&payload_, {&rpc_header, &req_header}, &request);
}
RpcConnection::Request::Request(RpcConnection *parent,
const std::string &method_name,
const pb::MessageLite *request,
Handler &&handler)
: call_id_(parent->engine_->NextCallId()), timer_(parent->io_service()),
handler_(std::move(handler)) {
RpcRequestHeaderProto rpc_header;
RequestHeaderProto req_header;
SetRequestHeader(parent->engine_, call_id_, method_name, &rpc_header,
&req_header);
ConstructPacket(&payload_, {&rpc_header, &req_header, request}, nullptr);
}
void RpcConnection::Request::OnResponseArrived(pbio::CodedInputStream *is,
const Status &status) {
handler_(is, status);
}
RpcConnection::RpcConnection(RpcEngine *engine)
: engine_(engine), resp_state_(kReadLength), resp_length_(0) {}
::asio::io_service &RpcConnection::io_service() {
return engine_->io_service();
}
void RpcConnection::Start() {
io_service().post(std::bind(&RpcConnection::OnRecvCompleted, this,
::asio::error_code(), 0));
}
void RpcConnection::FlushPendingRequests() {
io_service().post([this]() {
if (!request_over_the_wire_) {
OnSendCompleted(::asio::error_code(), 0);
}
});
}
void RpcConnection::HandleRpcResponse(const std::vector<char> &data) {
/* assumed to be called from a context that has already acquired the
* engine_state_lock */
pbio::ArrayInputStream ar(&data[0], data.size());
pbio::CodedInputStream in(&ar);
in.PushLimit(data.size());
RpcResponseHeaderProto h;
ReadDelimitedPBMessage(&in, &h);
auto it = requests_on_fly_.find(h.callid());
if (it == requests_on_fly_.end()) {
// TODO: out of line RPC request
assert(false && "Out of line request with unknown call id");
}
auto req = it->second;
requests_on_fly_.erase(it);
Status stat;
if (h.has_exceptionclassname()) {
stat =
Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str());
}
req->OnResponseArrived(&in, stat);
}
std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() {
static const char kHandshakeHeader[] = {'h', 'r', 'p', 'c',
RpcEngine::kRpcVersion, 0, 0};
auto res =
std::make_shared<std::string>(kHandshakeHeader, sizeof(kHandshakeHeader));
RpcRequestHeaderProto h;
h.set_rpckind(RPC_PROTOCOL_BUFFER);
h.set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
h.set_callid(RpcEngine::kCallIdConnectionContext);
h.set_clientid(engine_->client_name());
IpcConnectionContextProto handshake;
handshake.set_protocol(engine_->protocol_name());
ConstructPacket(res.get(), {&h, &handshake}, nullptr);
return res;
}
void RpcConnection::AsyncRpc(
const std::string &method_name, const ::google::protobuf::MessageLite *req,
std::shared_ptr<::google::protobuf::MessageLite> resp, Callback &&handler) {
std::lock_guard<std::mutex> state_lock(engine_state_lock_);
auto wrapped_handler =
[resp, handler](pbio::CodedInputStream *is, const Status &status) {
if (status.ok()) {
ReadDelimitedPBMessage(is, resp.get());
}
handler(status);
};
auto r = std::make_shared<Request>(this, method_name, req,
std::move(wrapped_handler));
pending_requests_.push_back(r);
FlushPendingRequests();
}
void RpcConnection::AsyncRawRpc(const std::string &method_name,
const std::string &req,
std::shared_ptr<std::string> resp,
Callback &&handler) {
std::lock_guard<std::mutex> state_lock(engine_state_lock_);
auto wrapped_handler =
[this, resp, handler](pbio::CodedInputStream *is, const Status &status) {
if (status.ok()) {
uint32_t size = 0;
is->ReadVarint32(&size);
auto limit = is->PushLimit(size);
is->ReadString(resp.get(), limit);
is->PopLimit(limit);
}
handler(status);
};
auto r = std::make_shared<Request>(this, method_name, req,
std::move(wrapped_handler));
pending_requests_.push_back(r);
FlushPendingRequests();
}
}

View File

@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIB_RPC_RPC_CONNECTION_H_
#define LIB_RPC_RPC_CONNECTION_H_
#include "rpc_engine.h"
#include "common/util.h"
#include <asio/connect.hpp>
#include <asio/read.hpp>
#include <asio/write.hpp>
namespace hdfs {
template <class NextLayer> class RpcConnectionImpl : public RpcConnection {
public:
RpcConnectionImpl(RpcEngine *engine);
virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
Callback &&handler) override;
virtual void Handshake(Callback &&handler) override;
virtual void Shutdown() override;
virtual void OnSendCompleted(const ::asio::error_code &ec,
size_t transferred) override;
virtual void OnRecvCompleted(const ::asio::error_code &ec,
size_t transferred) override;
private:
NextLayer next_layer_;
};
template <class NextLayer>
RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine)
: RpcConnection(engine)
, next_layer_(engine->io_service())
{}
template <class NextLayer>
void RpcConnectionImpl<NextLayer>::Connect(
const std::vector<::asio::ip::tcp::endpoint> &server, Callback &&handler) {
::asio::async_connect(
next_layer_, server.begin(), server.end(),
[handler](const ::asio::error_code &ec,
std::vector<::asio::ip::tcp::endpoint>::const_iterator) {
handler(ToStatus(ec));
});
}
template <class NextLayer>
void RpcConnectionImpl<NextLayer>::Handshake(Callback &&handler) {
auto handshake_packet = PrepareHandshakePacket();
::asio::async_write(
next_layer_, asio::buffer(*handshake_packet),
[handshake_packet, handler](const ::asio::error_code &ec, size_t) {
handler(ToStatus(ec));
});
}
template <class NextLayer>
void RpcConnectionImpl<NextLayer>::OnSendCompleted(const ::asio::error_code &ec,
size_t) {
using std::placeholders::_1;
using std::placeholders::_2;
std::lock_guard<std::mutex> state_lock(engine_state_lock_);
request_over_the_wire_.reset();
if (ec) {
// TODO: Current RPC has failed -- we should abandon the
// connection and do proper clean up
assert(false && "Unimplemented");
}
if (!pending_requests_.size()) {
return;
}
std::shared_ptr<Request> req = pending_requests_.front();
pending_requests_.erase(pending_requests_.begin());
requests_on_fly_[req->call_id()] = req;
request_over_the_wire_ = req;
// TODO: set the timeout for the RPC request
asio::async_write(
next_layer_, asio::buffer(req->payload()),
std::bind(&RpcConnectionImpl<NextLayer>::OnSendCompleted, this, _1, _2));
}
template <class NextLayer>
void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ec,
size_t) {
using std::placeholders::_1;
using std::placeholders::_2;
std::lock_guard<std::mutex> state_lock(engine_state_lock_);
switch (ec.value()) {
case 0:
// No errors
break;
case asio::error::operation_aborted:
// The event loop has been shut down. Ignore the error.
return;
default:
assert(false && "Unimplemented");
}
if (resp_state_ == kReadLength) {
resp_state_ = kReadContent;
auto buf = ::asio::buffer(reinterpret_cast<char *>(&resp_length_),
sizeof(resp_length_));
asio::async_read(next_layer_, buf,
std::bind(&RpcConnectionImpl<NextLayer>::OnRecvCompleted,
this, _1, _2));
} else if (resp_state_ == kReadContent) {
resp_state_ = kParseResponse;
resp_length_ = ntohl(resp_length_);
resp_data_.resize(resp_length_);
asio::async_read(next_layer_, ::asio::buffer(resp_data_),
std::bind(&RpcConnectionImpl<NextLayer>::OnRecvCompleted, this, _1, _2));
} else if (resp_state_ == kParseResponse) {
resp_state_ = kReadLength;
HandleRpcResponse(resp_data_);
resp_data_.clear();
Start();
}
}
template <class NextLayer> void RpcConnectionImpl<NextLayer>::Shutdown() {
next_layer_.close();
}
}
#endif

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "rpc_engine.h"
#include "rpc_connection.h"
#include "common/util.h"
#include <openssl/rand.h>
#include <sstream>
#include <future>
namespace hdfs {
RpcEngine::RpcEngine(::asio::io_service *io_service,
const std::string &client_name, const char *protocol_name,
int protocol_version)
: io_service_(io_service), client_name_(client_name),
protocol_name_(protocol_name), protocol_version_(protocol_version),
call_id_(0)
, conn_(new RpcConnectionImpl<::asio::ip::tcp::socket>(this))
{}
Status
RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &servers) {
using ::asio::ip::tcp;
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future(stat->get_future());
conn_->Connect(servers, [this, stat](const Status &status) {
if (!status.ok()) {
stat->set_value(status);
return;
}
conn_->Handshake(
[this, stat](const Status &status) { stat->set_value(status); });
});
return future.get();
}
void RpcEngine::Start() { conn_->Start(); }
void RpcEngine::Shutdown() {
io_service_->post([this]() { conn_->Shutdown(); });
}
void RpcEngine::AsyncRpc(
const std::string &method_name, const ::google::protobuf::MessageLite *req,
const std::shared_ptr<::google::protobuf::MessageLite> &resp,
std::function<void(const Status &)> &&handler) {
conn_->AsyncRpc(method_name, req, resp, std::move(handler));
}
Status
RpcEngine::Rpc(const std::string &method_name,
const ::google::protobuf::MessageLite *req,
const std::shared_ptr<::google::protobuf::MessageLite> &resp) {
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future(stat->get_future());
AsyncRpc(method_name, req, resp,
[stat](const Status &status) { stat->set_value(status); });
return future.get();
}
Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req,
std::shared_ptr<std::string> resp) {
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future(stat->get_future());
conn_->AsyncRawRpc(method_name, req, resp,
[stat](const Status &status) { stat->set_value(status); });
return future.get();
}
std::string RpcEngine::GetRandomClientName() {
unsigned char buf[6] = {
0,
};
RAND_pseudo_bytes(buf, sizeof(buf));
std::stringstream ss;
ss << "libhdfs++_"
<< Base64Encode(std::string(reinterpret_cast<char *>(buf), sizeof(buf)));
return ss.str();
}
}

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIB_RPC_RPC_ENGINE_H_
#define LIB_RPC_RPC_ENGINE_H_
#include "libhdfspp/status.h"
#include <google/protobuf/message_lite.h>
#include <asio/ip/tcp.hpp>
#include <asio/deadline_timer.hpp>
#include <atomic>
#include <memory>
#include <unordered_map>
#include <vector>
#include <mutex>
namespace hdfs {
class RpcEngine;
class RpcConnection {
public:
typedef std::function<void(const Status &)> Callback;
virtual ~RpcConnection();
RpcConnection(RpcEngine *engine);
virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
Callback &&handler) = 0;
virtual void Handshake(Callback &&handler) = 0;
virtual void Shutdown() = 0;
void Start();
void AsyncRpc(const std::string &method_name,
const ::google::protobuf::MessageLite *req,
std::shared_ptr<::google::protobuf::MessageLite> resp,
Callback &&handler);
void AsyncRawRpc(const std::string &method_name, const std::string &request,
std::shared_ptr<std::string> resp, Callback &&handler);
protected:
RpcEngine *const engine_;
virtual void OnSendCompleted(const ::asio::error_code &ec,
size_t transferred) = 0;
virtual void OnRecvCompleted(const ::asio::error_code &ec,
size_t transferred) = 0;
::asio::io_service &io_service();
std::shared_ptr<std::string> PrepareHandshakePacket();
static std::string
SerializeRpcRequest(const std::string &method_name,
const ::google::protobuf::MessageLite *req);
void HandleRpcResponse(const std::vector<char> &data);
void FlushPendingRequests();
enum ResponseState {
kReadLength,
kReadContent,
kParseResponse,
} resp_state_;
unsigned resp_length_;
std::vector<char> resp_data_;
class Request {
public:
typedef std::function<void(::google::protobuf::io::CodedInputStream *is,
const Status &status)> Handler;
Request(RpcConnection *parent, const std::string &method_name,
const std::string &request, Handler &&callback);
Request(RpcConnection *parent, const std::string &method_name,
const ::google::protobuf::MessageLite *request, Handler &&callback);
int call_id() const { return call_id_; }
::asio::deadline_timer &timer() { return timer_; }
const std::string &payload() const { return payload_; }
void OnResponseArrived(::google::protobuf::io::CodedInputStream *is,
const Status &status);
private:
const int call_id_;
::asio::deadline_timer timer_;
std::string payload_;
Handler handler_;
};
// The request being sent over the wire
std::shared_ptr<Request> request_over_the_wire_;
// Requests to be sent over the wire
std::vector<std::shared_ptr<Request>> pending_requests_;
// Requests that are waiting for responses
std::unordered_map<int, std::shared_ptr<Request>> requests_on_fly_;
// Lock for mutable parts of this class that need to be thread safe
std::mutex engine_state_lock_;
};
class RpcEngine {
public:
enum { kRpcVersion = 9 };
enum {
kCallIdAuthorizationFailed = -1,
kCallIdInvalid = -2,
kCallIdConnectionContext = -3,
kCallIdPing = -4
};
RpcEngine(::asio::io_service *io_service, const std::string &client_name,
const char *protocol_name, int protocol_version);
void AsyncRpc(const std::string &method_name,
const ::google::protobuf::MessageLite *req,
const std::shared_ptr<::google::protobuf::MessageLite> &resp,
std::function<void(const Status &)> &&handler);
Status Rpc(const std::string &method_name,
const ::google::protobuf::MessageLite *req,
const std::shared_ptr<::google::protobuf::MessageLite> &resp);
/**
* Send raw bytes as RPC payload. This is intended to be used in JNI
* bindings only.
**/
Status RawRpc(const std::string &method_name, const std::string &req,
std::shared_ptr<std::string> resp);
Status Connect(const std::vector<::asio::ip::tcp::endpoint> &server);
void Start();
void Shutdown();
int NextCallId() { return ++call_id_; }
const std::string &client_name() const { return client_name_; }
const std::string &protocol_name() const { return protocol_name_; }
int protocol_version() const { return protocol_version_; }
::asio::io_service &io_service() { return *io_service_; }
static std::string GetRandomClientName();
private:
::asio::io_service *io_service_;
const std::string client_name_;
const std::string protocol_name_;
const int protocol_version_;
std::atomic_int call_id_;
std::unique_ptr<RpcConnection> conn_;
};
}
#endif