diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in index 773990f1c45..ac1d0fbd1a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in @@ -8,6 +8,7 @@ BUILTIN_STL_SUPPORT = YES INPUT = @PROJECT_SOURCE_DIR@/doc/mainpage.dox \ @PROJECT_SOURCE_DIR@/include/libhdfspp \ + @PROJECT_SOURCE_DIR@/lib/common/continuation \ INPUT_ENCODING = UTF-8 RECURSIVE = NO diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h new file mode 100644 index 00000000000..f7d76e86c38 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_COMMON_CONTINUATION_ASIO_H_ +#define LIB_COMMON_CONTINUATION_ASIO_H_ + +#include "continuation.h" +#include "common/util.h" + +#include "libhdfspp/status.h" + +#include +#include +#include +#include + +namespace hdfs { +namespace continuation { + +template +class ReadContinuation : public Continuation { +public: + ReadContinuation(Stream *stream, const MutableBufferSequence &buffer) + : stream_(stream), buffer_(buffer) {} + virtual void Run(const Next &next) override { + auto handler = + [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); }; + asio::async_read(*stream_, buffer_, handler); + } + +private: + Stream *stream_; + MutableBufferSequence buffer_; +}; + +template +class WriteContinuation : public Continuation { +public: + WriteContinuation(Stream *stream, const ConstBufferSequence &buffer) + : stream_(stream), buffer_(buffer) {} + + virtual void Run(const Next &next) override { + auto handler = + [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); }; + asio::async_write(*stream_, buffer_, handler); + } + +private: + Stream *stream_; + ConstBufferSequence buffer_; +}; + +template +class ConnectContinuation : public Continuation { +public: + ConnectContinuation(Socket *socket, Iterator begin, Iterator end, + Iterator *connected_endpoint) + : socket_(socket), begin_(begin), end_(end), + connected_endpoint_(connected_endpoint) {} + + virtual void Run(const Next &next) override { + auto handler = [this, next](const asio::error_code &ec, Iterator it) { + if (connected_endpoint_) { + *connected_endpoint_ = it; + } + next(ToStatus(ec)); + }; + asio::async_connect(*socket_, begin_, end_, handler); + } + +private: + Socket *socket_; + Iterator begin_; + Iterator end_; + Iterator *connected_endpoint_; +}; + +template +static inline Continuation *Write(Stream *stream, + const ConstBufferSequence &buffer) { + return new WriteContinuation(stream, buffer); +} + +template +static inline Continuation *Read(Stream *stream, + const MutableBufferSequence &buffer) { + return new ReadContinuation(stream, buffer); +} + +template +static inline Continuation *Connect(Socket *socket, Iterator begin, + Iterator end) { + return new ConnectContinuation(socket, begin, end, nullptr); +} +} +} + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h new file mode 100644 index 00000000000..9576c2f722d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_COMMON_CONTINUATION_CONTINUATION_H_ +#define LIB_COMMON_CONTINUATION_CONTINUATION_H_ + +#include "libhdfspp/status.h" + +#include +#include +#include + +namespace hdfs { +namespace continuation { + +class PipelineBase; + +/** + * A continuation is a fragment of runnable code whose execution will + * be scheduled by a \link Pipeline \endlink. + * + * The Continuation class is a build block to implement the + * Continuation Passing Style (CPS) in libhdfs++. In CPS, the + * upper-level user specifies the control flow by chaining a sequence + * of continuations explicitly through the \link Run() \endlink method, + * while in traditional imperative programming the sequences of + * sentences implicitly specify the control flow. + * + * See http://en.wikipedia.org/wiki/Continuation for more details. + **/ +class Continuation { +public: + typedef std::function Next; + virtual ~Continuation() = default; + virtual void Run(const Next &next) = 0; + Continuation(const Continuation &) = delete; + Continuation &operator=(const Continuation &) = delete; + +protected: + Continuation() = default; +}; + +/** + * A pipeline schedules the execution of a chain of \link Continuation + * \endlink. The pipeline schedules the execution of continuations + * based on their order in the pipeline, where the next parameter for + * each continuation points to the \link Schedule() \endlink + * method. That way the pipeline executes all scheduled continuations + * in sequence. + * + * The typical use case of a pipeline is executing continuations + * asynchronously. Note that a continuation calls the next + * continuation when it is finished. If the continuation is posted + * into an asynchronous event loop, invoking the next continuation + * can be done in the callback handler in the asynchronous event loop. + * + * The pipeline allocates the memory as follows. A pipeline is always + * allocated on the heap. It owns all the continuations as well as the + * the state specified by the user. Both the continuations and the + * state have the same life cycle of the pipeline. The design + * simplifies the problem of ensuring that the executions in the + * asynchronous event loop always hold valid pointers w.r.t. the + * pipeline. The pipeline will automatically deallocate itself right + * after it invokes the callback specified the user. + **/ +template class Pipeline { +public: + typedef std::function UserHandler; + static Pipeline *Create() { return new Pipeline(); } + Pipeline &Push(Continuation *stage); + void Run(UserHandler &&handler); + State &state() { return state_; } + +private: + State state_; + std::vector> routines_; + size_t stage_; + std::function handler_; + + Pipeline() : stage_(0) {} + ~Pipeline() = default; + void Schedule(const Status &status); +}; + +template +inline Pipeline &Pipeline::Push(Continuation *stage) { + routines_.emplace_back(std::unique_ptr(stage)); + return *this; +} + +template +inline void Pipeline::Schedule(const Status &status) { + if (stage_ >= routines_.size()) { + handler_(status, state_); + routines_.clear(); + delete this; + } else { + auto next = routines_[stage_].get(); + ++stage_; + next->Run(std::bind(&Pipeline::Schedule, this, std::placeholders::_1)); + } +} + +template inline void Pipeline::Run(UserHandler &&handler) { + handler_ = std::move(handler); + Schedule(Status::OK()); +} +} +} + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h new file mode 100644 index 00000000000..3e4b5356160 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_ +#define LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_ + +#include "common/util.h" + +#include +#include +#include + +#include + +namespace hdfs { +namespace continuation { + +template +struct ReadDelimitedPBMessageContinuation : public Continuation { + ReadDelimitedPBMessageContinuation(Stream *stream, + ::google::protobuf::MessageLite *msg) + : stream_(stream), msg_(msg) {} + + virtual void Run(const Next &next) override { + namespace pbio = google::protobuf::io; + auto handler = [this, next](const asio::error_code &ec, size_t) { + Status status; + if (ec) { + status = ToStatus(ec); + } else { + pbio::ArrayInputStream as(&buf_[0], buf_.size()); + pbio::CodedInputStream is(&as); + uint32_t size = 0; + bool v = is.ReadVarint32(&size); + assert(v); + is.PushLimit(size); + msg_->Clear(); + v = msg_->MergeFromCodedStream(&is); + assert(v); + } + next(status); + }; + asio::async_read( + *stream_, asio::buffer(buf_), + std::bind(&ReadDelimitedPBMessageContinuation::CompletionHandler, this, + std::placeholders::_1, std::placeholders::_2), + handler); + } + +private: + size_t CompletionHandler(const asio::error_code &ec, size_t transferred) { + if (ec) { + return 0; + } + + size_t offset = 0, len = 0; + for (size_t i = 0; i + 1 < transferred && i < sizeof(int); ++i) { + len = (len << 7) | (buf_[i] & 0x7f); + if ((uint8_t)buf_.at(i) < 0x80) { + offset = i + 1; + break; + } + } + + assert(offset + len < buf_.size() && "Message is too big"); + return offset ? len + offset - transferred : 1; + } + + Stream *stream_; + ::google::protobuf::MessageLite *msg_; + std::array buf_; +}; + +template +struct WriteDelimitedPBMessageContinuation : Continuation { + WriteDelimitedPBMessageContinuation(Stream *stream, + const google::protobuf::MessageLite *msg) + : stream_(stream), msg_(msg) {} + + virtual void Run(const Next &next) override { + namespace pbio = google::protobuf::io; + int size = msg_->ByteSize(); + buf_.reserve(pbio::CodedOutputStream::VarintSize32(size) + size); + pbio::StringOutputStream ss(&buf_); + pbio::CodedOutputStream os(&ss); + os.WriteVarint32(size); + msg_->SerializeToCodedStream(&os); + write_coroutine_ = + std::shared_ptr(Write(stream_, asio::buffer(buf_))); + write_coroutine_->Run([next](const Status &stat) { next(stat); }); + } + +private: + Stream *stream_; + const google::protobuf::MessageLite *msg_; + std::string buf_; + std::shared_ptr write_coroutine_; +}; + +template +static inline Continuation * +ReadDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) { + return new ReadDelimitedPBMessageContinuation(stream, + msg); +} + +template +static inline Continuation * +WriteDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) { + return new WriteDelimitedPBMessageContinuation(stream, msg); +} +} +} +#endif