HDFS-8758. Implement the continuation library in libhdfspp. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2015-07-09 14:02:55 -07:00 committed by James Clampffer
parent 63eee296c7
commit a6b2fb64c4
4 changed files with 366 additions and 0 deletions

View File

@ -8,6 +8,7 @@ BUILTIN_STL_SUPPORT = YES
INPUT = @PROJECT_SOURCE_DIR@/doc/mainpage.dox \
@PROJECT_SOURCE_DIR@/include/libhdfspp \
@PROJECT_SOURCE_DIR@/lib/common/continuation \
INPUT_ENCODING = UTF-8
RECURSIVE = NO

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIB_COMMON_CONTINUATION_ASIO_H_
#define LIB_COMMON_CONTINUATION_ASIO_H_
#include "continuation.h"
#include "common/util.h"
#include "libhdfspp/status.h"
#include <asio/connect.hpp>
#include <asio/read.hpp>
#include <asio/write.hpp>
#include <asio/ip/tcp.hpp>
namespace hdfs {
namespace continuation {
template <class Stream, class MutableBufferSequence>
class ReadContinuation : public Continuation {
public:
ReadContinuation(Stream *stream, const MutableBufferSequence &buffer)
: stream_(stream), buffer_(buffer) {}
virtual void Run(const Next &next) override {
auto handler =
[next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); };
asio::async_read(*stream_, buffer_, handler);
}
private:
Stream *stream_;
MutableBufferSequence buffer_;
};
template <class Stream, class ConstBufferSequence>
class WriteContinuation : public Continuation {
public:
WriteContinuation(Stream *stream, const ConstBufferSequence &buffer)
: stream_(stream), buffer_(buffer) {}
virtual void Run(const Next &next) override {
auto handler =
[next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); };
asio::async_write(*stream_, buffer_, handler);
}
private:
Stream *stream_;
ConstBufferSequence buffer_;
};
template <class Socket, class Iterator>
class ConnectContinuation : public Continuation {
public:
ConnectContinuation(Socket *socket, Iterator begin, Iterator end,
Iterator *connected_endpoint)
: socket_(socket), begin_(begin), end_(end),
connected_endpoint_(connected_endpoint) {}
virtual void Run(const Next &next) override {
auto handler = [this, next](const asio::error_code &ec, Iterator it) {
if (connected_endpoint_) {
*connected_endpoint_ = it;
}
next(ToStatus(ec));
};
asio::async_connect(*socket_, begin_, end_, handler);
}
private:
Socket *socket_;
Iterator begin_;
Iterator end_;
Iterator *connected_endpoint_;
};
template <class Stream, class ConstBufferSequence>
static inline Continuation *Write(Stream *stream,
const ConstBufferSequence &buffer) {
return new WriteContinuation<Stream, ConstBufferSequence>(stream, buffer);
}
template <class Stream, class MutableBufferSequence>
static inline Continuation *Read(Stream *stream,
const MutableBufferSequence &buffer) {
return new ReadContinuation<Stream, MutableBufferSequence>(stream, buffer);
}
template <class Socket, class Iterator>
static inline Continuation *Connect(Socket *socket, Iterator begin,
Iterator end) {
return new ConnectContinuation<Socket, Iterator>(socket, begin, end, nullptr);
}
}
}
#endif

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIB_COMMON_CONTINUATION_CONTINUATION_H_
#define LIB_COMMON_CONTINUATION_CONTINUATION_H_
#include "libhdfspp/status.h"
#include <functional>
#include <memory>
#include <vector>
namespace hdfs {
namespace continuation {
class PipelineBase;
/**
* A continuation is a fragment of runnable code whose execution will
* be scheduled by a \link Pipeline \endlink.
*
* The Continuation class is a build block to implement the
* Continuation Passing Style (CPS) in libhdfs++. In CPS, the
* upper-level user specifies the control flow by chaining a sequence
* of continuations explicitly through the \link Run() \endlink method,
* while in traditional imperative programming the sequences of
* sentences implicitly specify the control flow.
*
* See http://en.wikipedia.org/wiki/Continuation for more details.
**/
class Continuation {
public:
typedef std::function<void(const Status &)> Next;
virtual ~Continuation() = default;
virtual void Run(const Next &next) = 0;
Continuation(const Continuation &) = delete;
Continuation &operator=(const Continuation &) = delete;
protected:
Continuation() = default;
};
/**
* A pipeline schedules the execution of a chain of \link Continuation
* \endlink. The pipeline schedules the execution of continuations
* based on their order in the pipeline, where the next parameter for
* each continuation points to the \link Schedule() \endlink
* method. That way the pipeline executes all scheduled continuations
* in sequence.
*
* The typical use case of a pipeline is executing continuations
* asynchronously. Note that a continuation calls the next
* continuation when it is finished. If the continuation is posted
* into an asynchronous event loop, invoking the next continuation
* can be done in the callback handler in the asynchronous event loop.
*
* The pipeline allocates the memory as follows. A pipeline is always
* allocated on the heap. It owns all the continuations as well as the
* the state specified by the user. Both the continuations and the
* state have the same life cycle of the pipeline. The design
* simplifies the problem of ensuring that the executions in the
* asynchronous event loop always hold valid pointers w.r.t. the
* pipeline. The pipeline will automatically deallocate itself right
* after it invokes the callback specified the user.
**/
template <class State> class Pipeline {
public:
typedef std::function<void(const Status &, const State &)> UserHandler;
static Pipeline *Create() { return new Pipeline(); }
Pipeline &Push(Continuation *stage);
void Run(UserHandler &&handler);
State &state() { return state_; }
private:
State state_;
std::vector<std::unique_ptr<Continuation>> routines_;
size_t stage_;
std::function<void(const Status &, const State &)> handler_;
Pipeline() : stage_(0) {}
~Pipeline() = default;
void Schedule(const Status &status);
};
template <class State>
inline Pipeline<State> &Pipeline<State>::Push(Continuation *stage) {
routines_.emplace_back(std::unique_ptr<Continuation>(stage));
return *this;
}
template <class State>
inline void Pipeline<State>::Schedule(const Status &status) {
if (stage_ >= routines_.size()) {
handler_(status, state_);
routines_.clear();
delete this;
} else {
auto next = routines_[stage_].get();
++stage_;
next->Run(std::bind(&Pipeline::Schedule, this, std::placeholders::_1));
}
}
template <class State> inline void Pipeline<State>::Run(UserHandler &&handler) {
handler_ = std::move(handler);
Schedule(Status::OK());
}
}
}
#endif

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_
#define LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_
#include "common/util.h"
#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#include <cassert>
namespace hdfs {
namespace continuation {
template <class Stream, size_t MaxMessageSize = 512>
struct ReadDelimitedPBMessageContinuation : public Continuation {
ReadDelimitedPBMessageContinuation(Stream *stream,
::google::protobuf::MessageLite *msg)
: stream_(stream), msg_(msg) {}
virtual void Run(const Next &next) override {
namespace pbio = google::protobuf::io;
auto handler = [this, next](const asio::error_code &ec, size_t) {
Status status;
if (ec) {
status = ToStatus(ec);
} else {
pbio::ArrayInputStream as(&buf_[0], buf_.size());
pbio::CodedInputStream is(&as);
uint32_t size = 0;
bool v = is.ReadVarint32(&size);
assert(v);
is.PushLimit(size);
msg_->Clear();
v = msg_->MergeFromCodedStream(&is);
assert(v);
}
next(status);
};
asio::async_read(
*stream_, asio::buffer(buf_),
std::bind(&ReadDelimitedPBMessageContinuation::CompletionHandler, this,
std::placeholders::_1, std::placeholders::_2),
handler);
}
private:
size_t CompletionHandler(const asio::error_code &ec, size_t transferred) {
if (ec) {
return 0;
}
size_t offset = 0, len = 0;
for (size_t i = 0; i + 1 < transferred && i < sizeof(int); ++i) {
len = (len << 7) | (buf_[i] & 0x7f);
if ((uint8_t)buf_.at(i) < 0x80) {
offset = i + 1;
break;
}
}
assert(offset + len < buf_.size() && "Message is too big");
return offset ? len + offset - transferred : 1;
}
Stream *stream_;
::google::protobuf::MessageLite *msg_;
std::array<char, MaxMessageSize> buf_;
};
template <class Stream>
struct WriteDelimitedPBMessageContinuation : Continuation {
WriteDelimitedPBMessageContinuation(Stream *stream,
const google::protobuf::MessageLite *msg)
: stream_(stream), msg_(msg) {}
virtual void Run(const Next &next) override {
namespace pbio = google::protobuf::io;
int size = msg_->ByteSize();
buf_.reserve(pbio::CodedOutputStream::VarintSize32(size) + size);
pbio::StringOutputStream ss(&buf_);
pbio::CodedOutputStream os(&ss);
os.WriteVarint32(size);
msg_->SerializeToCodedStream(&os);
write_coroutine_ =
std::shared_ptr<Continuation>(Write(stream_, asio::buffer(buf_)));
write_coroutine_->Run([next](const Status &stat) { next(stat); });
}
private:
Stream *stream_;
const google::protobuf::MessageLite *msg_;
std::string buf_;
std::shared_ptr<Continuation> write_coroutine_;
};
template <class Stream, size_t MaxMessageSize = 512>
static inline Continuation *
ReadDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) {
return new ReadDelimitedPBMessageContinuation<Stream, MaxMessageSize>(stream,
msg);
}
template <class Stream>
static inline Continuation *
WriteDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) {
return new WriteDelimitedPBMessageContinuation<Stream>(stream, msg);
}
}
}
#endif