diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt index 51e31223f0b..bd5558d89ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt @@ -19,6 +19,7 @@ project (libhdfspp) find_package(Doxygen) +find_package(OpenSSL REQUIRED) find_package(Protobuf REQUIRED) find_package(Threads) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h new file mode 100644 index 00000000000..59ebd6087e2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_HDFS_H_ +#define LIBHDFSPP_HDFS_H_ + +#include "libhdfspp/status.h" + +#include + +namespace hdfs { + +/** + * An IoService manages a queue of asynchronous tasks. All libhdfs++ + * operations are filed against a particular IoService. + * + * When an operation is queued into an IoService, the IoService will + * run the callback handler associated with the operation. Note that + * the IoService must be stopped before destructing the objects that + * file the operations. + * + * From an implementation point of view the IoService object wraps the + * ::asio::io_service objects. Please see the related documentation + * for more details. + **/ +class IoService { +public: + static IoService *New(); + /** + * Run the asynchronous tasks associated with this IoService. + **/ + virtual void Run() = 0; + /** + * Stop running asynchronous tasks associated with this IoService. + **/ + virtual void Stop() = 0; + virtual ~IoService(); +}; + +/** + * Applications opens an InputStream to read files in HDFS. + **/ +class InputStream { +public: + /** + * Read data from a specific position. The handler returns the + * number of bytes has read. + **/ + virtual void + PositionRead(void *buf, size_t nbyte, size_t offset, + const std::function &handler) = 0; + virtual ~InputStream(); +}; + +/** + * FileSystem implements APIs to interact with HDFS. + **/ +class FileSystem { +public: + /** + * Create a new instance of the FileSystem object. The call + * initializes the RPC connections to the NameNode and returns an + * FileSystem object. + **/ + static void + New(IoService *io_service, const std::string &server, + const std::string &service, + const std::function &handler); + /** + * Open a file on HDFS. The call issues an RPC to the NameNode to + * gather the locations of all blocks in the file and to return a + * new instance of the @ref InputStream object. + **/ + virtual void + Open(const std::string &path, + const std::function &handler) = 0; + virtual ~FileSystem(); +}; +} + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt index e77942b68c9..a0e33797ec8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt @@ -17,6 +17,7 @@ # add_subdirectory(common) +add_subdirectory(fs) add_subdirectory(reader) add_subdirectory(rpc) add_subdirectory(proto) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt index cea5e0d99fe..06b4ca38178 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt @@ -1 +1 @@ -add_library(common base64.cc status.cc sasl_digest_md5.cc) +add_library(common base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h index f7d76e86c38..5630934ed01 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h @@ -89,6 +89,31 @@ private: Iterator *connected_endpoint_; }; +template +class ResolveContinuation : public Continuation { +public: + ResolveContinuation(::asio::io_service *io_service, const std::string &server, + const std::string &service, OutputIterator result) + : resolver_(*io_service), query_(server, service), result_(result) {} + + virtual void Run(const Next &next) override { + using resolver = ::asio::ip::tcp::resolver; + auto handler = + [this, next](const asio::error_code &ec, resolver::iterator it) { + if (!ec) { + std::copy(it, resolver::iterator(), result_); + } + next(ToStatus(ec)); + }; + resolver_.async_resolve(query_, handler); + } + +private: + ::asio::ip::tcp::resolver resolver_; + ::asio::ip::tcp::resolver::query query_; + OutputIterator result_; +}; + template static inline Continuation *Write(Stream *stream, const ConstBufferSequence &buffer) { @@ -106,6 +131,13 @@ static inline Continuation *Connect(Socket *socket, Iterator begin, Iterator end) { return new ConnectContinuation(socket, begin, end, nullptr); } + +template +static inline Continuation * +Resolve(::asio::io_service *io_service, const std::string &server, + const std::string &service, OutputIterator result) { + return new ResolveContinuation(io_service, server, service, result); +} } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h index 9576c2f722d..0af04a8d59d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h @@ -104,7 +104,7 @@ inline Pipeline &Pipeline::Push(Continuation *stage) { template inline void Pipeline::Schedule(const Status &status) { - if (stage_ >= routines_.size()) { + if (!status.ok() || stage_ >= routines_.size()) { handler_(status, state_); routines_.clear(); delete this; @@ -119,6 +119,19 @@ template inline void Pipeline::Run(UserHandler &&handler) { handler_ = std::move(handler); Schedule(Status::OK()); } + +template class BindContinuation : public Continuation { +public: + BindContinuation(const Handler &handler) : handler_(handler) {} + virtual void Run(const Next &next) override { handler_(next); } + +private: + Handler handler_; +}; + +template static inline Continuation *Bind(const Handler &handler) { + return new BindContinuation(handler); +} } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc new file mode 100644 index 00000000000..31926144506 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfs_public_api.h" + +namespace hdfs { + +IoService::~IoService() {} + +IoService *IoService::New() { + return new IoServiceImpl(); +} + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h new file mode 100644 index 00000000000..95567c0d003 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.h @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef COMMON_HDFS_PUBLIC_API_H_ +#define COMMON_HDFS_PUBLIC_API_H_ + +#include "libhdfspp/hdfs.h" + +#include + +namespace hdfs { + +class IoServiceImpl : public IoService { + public: + virtual void Run() override { + asio::io_service::work work(io_service_); + io_service_.run(); + } + virtual void Stop() override { io_service_.stop(); } + ::asio::io_service &io_service() { return io_service_; } + private: + ::asio::io_service io_service_; +}; + +} + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt new file mode 100644 index 00000000000..f386688ab16 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt @@ -0,0 +1,2 @@ +add_library(fs filesystem.cc inputstream.cc) +add_dependencies(fs proto) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc new file mode 100644 index 00000000000..0090fd57c50 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "filesystem.h" +#include "common/continuation/asio.h" +#include "common/util.h" + +#include + +#include + +namespace hdfs { + +static const char kNamenodeProtocol[] = + "org.apache.hadoop.hdfs.protocol.ClientProtocol"; +static const int kNamenodeProtocolVersion = 1; + +using ::asio::ip::tcp; + +FileSystem::~FileSystem() {} + +void FileSystem::New( + IoService *io_service, const std::string &server, + const std::string &service, + const std::function &handler) { + FileSystemImpl *impl = new FileSystemImpl(io_service); + impl->Connect(server, service, [impl, handler](const Status &stat) { + if (stat.ok()) { + handler(stat, impl); + } else { + delete impl; + handler(stat, nullptr); + } + }); +} + +FileSystemImpl::FileSystemImpl(IoService *io_service) + : io_service_(static_cast(io_service)), + engine_(&io_service_->io_service(), RpcEngine::GetRandomClientName(), + kNamenodeProtocol, kNamenodeProtocolVersion), + namenode_(&engine_) {} + +void FileSystemImpl::Connect(const std::string &server, + const std::string &service, + std::function &&handler) { + using namespace continuation; + typedef std::vector State; + auto m = Pipeline::Create(); + m->Push(Resolve(&io_service_->io_service(), server, service, + std::back_inserter(m->state()))) + .Push(Bind([this, m](const Continuation::Next &next) { + engine_.Connect(m->state(), next); + })); + m->Run([this, handler](const Status &status, const State &) { + if (status.ok()) { + engine_.Start(); + } + handler(status); + }); +} + +void FileSystemImpl::Open( + const std::string &path, + const std::function &handler) { + using ::hadoop::hdfs::GetBlockLocationsRequestProto; + using ::hadoop::hdfs::GetBlockLocationsResponseProto; + + struct State { + GetBlockLocationsRequestProto req; + std::shared_ptr resp; + }; + + auto m = continuation::Pipeline::Create(); + auto &req = m->state().req; + req.set_src(path); + req.set_offset(0); + req.set_length(std::numeric_limits::max()); + m->state().resp.reset(new GetBlockLocationsResponseProto()); + + State *s = &m->state(); + m->Push(continuation::Bind( + [this, s](const continuation::Continuation::Next &next) { + namenode_.GetBlockLocations(&s->req, s->resp, next); + })); + m->Run([this, handler](const Status &stat, const State &s) { + handler(stat, stat.ok() ? new InputStreamImpl(this, &s.resp->locations()) + : nullptr); + }); +} +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h new file mode 100644 index 00000000000..4e29a806816 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_ +#define LIBHDFSPP_LIB_FS_FILESYSTEM_H_ + +#include "common/hdfs_public_api.h" +#include "libhdfspp/hdfs.h" +#include "rpc/rpc_engine.h" +#include "ClientNamenodeProtocol.pb.h" +#include "ClientNamenodeProtocol.hrpc.inl" + +namespace hdfs { + +class FileSystemImpl : public FileSystem { +public: + FileSystemImpl(IoService *io_service); + void Connect(const std::string &server, const std::string &service, + std::function &&handler); + virtual void Open(const std::string &path, + const std::function + &handler) override; + RpcEngine &rpc_engine() { return engine_; } + +private: + IoServiceImpl *io_service_; + RpcEngine engine_; + ClientNamenodeProtocol namenode_; +}; + +class InputStreamImpl : public InputStream { +public: + InputStreamImpl(FileSystemImpl *fs, + const ::hadoop::hdfs::LocatedBlocksProto *blocks); + virtual void PositionRead( + void *buf, size_t nbyte, size_t offset, + const std::function &handler) override; + template + void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers, + const Handler &handler); + template + void AsyncReadBlock(const std::string &client_name, + const hadoop::hdfs::LocatedBlockProto &block, + size_t offset, const MutableBufferSequence &buffers, + const Handler &handler); + +private: + FileSystemImpl *fs_; + unsigned long long file_length_; + std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_; + template struct HandshakeContinuation; + template + struct ReadBlockContinuation; + struct RemoteBlockReaderTrait; +}; +} + +#include "inputstream_impl.h" + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc new file mode 100644 index 00000000000..1b230c65941 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "filesystem.h" + +namespace hdfs { + +using ::hadoop::hdfs::LocatedBlocksProto; + +InputStream::~InputStream() {} + +InputStreamImpl::InputStreamImpl(FileSystemImpl *fs, + const LocatedBlocksProto *blocks) + : fs_(fs), file_length_(blocks->filelength()) { + for (const auto &block : blocks->blocks()) { + blocks_.push_back(block); + } + + if (blocks->has_lastblock() && blocks->lastblock().b().numbytes()) { + blocks_.push_back(blocks->lastblock()); + } +} + +void InputStreamImpl::PositionRead( + void *buf, size_t nbyte, size_t offset, + const std::function &handler) { + AsyncPreadSome(offset, asio::buffer(buf, nbyte), handler); +} +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h new file mode 100644 index 00000000000..077a15f0307 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef FS_INPUTSTREAM_IMPL_H_ +#define FS_INPUTSTREAM_IMPL_H_ + +#include "reader/block_reader.h" + +#include "common/continuation/asio.h" +#include "common/continuation/protobuf.h" + +#include +#include + +namespace hdfs { + +struct InputStreamImpl::RemoteBlockReaderTrait { + typedef RemoteBlockReader Reader; + struct State { + std::unique_ptr conn_; + std::unique_ptr reader_; + std::vector endpoints_; + size_t transferred_; + Reader *reader() { return reader_.get(); } + size_t *transferred() { return &transferred_; } + const size_t *transferred() const { return &transferred_; } + }; + static continuation::Pipeline * + CreatePipeline(::asio::io_service *io_service, + const ::hadoop::hdfs::LocatedBlockProto &b) { + using namespace ::asio::ip; + auto m = continuation::Pipeline::Create(); + auto &s = m->state(); + s.conn_.reset(new tcp::socket(*io_service)); + s.reader_.reset(new Reader(BlockReaderOptions(), s.conn_.get())); + for (auto &loc : b.locs()) { + auto datanode = loc.id(); + s.endpoints_.push_back(tcp::endpoint( + address::from_string(datanode.ipaddr()), datanode.xferport())); + } + + m->Push(continuation::Connect(s.conn_.get(), s.endpoints_.begin(), + s.endpoints_.end())); + return m; + } +}; + +template +struct InputStreamImpl::HandshakeContinuation : continuation::Continuation { + HandshakeContinuation(Reader *reader, const std::string &client_name, + const hadoop::common::TokenProto *token, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, uint64_t offset) + : reader_(reader), client_name_(client_name), length_(length), + offset_(offset) { + if (token) { + token_.reset(new hadoop::common::TokenProto()); + token_->CheckTypeAndMergeFrom(*token); + } + block_.CheckTypeAndMergeFrom(*block); + } + + virtual void Run(const Next &next) override { + reader_->async_connect(client_name_, token_.get(), &block_, length_, + offset_, next); + } + +private: + Reader *reader_; + const std::string client_name_; + std::unique_ptr token_; + hadoop::hdfs::ExtendedBlockProto block_; + uint64_t length_; + uint64_t offset_; +}; + +template +struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation { + ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer, + size_t *transferred) + : reader_(reader), buffer_(buffer), + buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {} + + virtual void Run(const Next &next) override { + *transferred_ = 0; + next_ = next; + OnReadData(Status::OK(), 0); + } + +private: + Reader *reader_; + MutableBufferSequence buffer_; + const size_t buffer_size_; + size_t *transferred_; + std::function next_; + + void OnReadData(const Status &status, size_t transferred) { + using std::placeholders::_1; + using std::placeholders::_2; + *transferred_ += transferred; + if (!status.ok()) { + next_(status); + } else if (*transferred_ >= buffer_size_) { + next_(status); + } else { + reader_->async_read_some( + asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_), + std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2)); + } + } +}; + +template +void InputStreamImpl::AsyncPreadSome(size_t offset, + const MutableBufferSequence &buffers, + const Handler &handler) { + using ::hadoop::hdfs::LocatedBlockProto; + namespace ip = ::asio::ip; + using ::asio::ip::tcp; + + auto it = std::find_if( + blocks_.begin(), blocks_.end(), [offset](const LocatedBlockProto &p) { + return p.offset() <= offset && offset < p.offset() + p.b().numbytes(); + }); + + if (it == blocks_.end()) { + handler(Status::InvalidArgument("Cannot find corresponding blocks"), 0); + return; + } else if (!it->locs_size()) { + handler(Status::ResourceUnavailable("No datanodes available"), 0); + return; + } + + uint64_t offset_within_block = offset - it->offset(); + uint64_t size_within_block = std::min( + it->b().numbytes() - offset_within_block, asio::buffer_size(buffers)); + + AsyncReadBlock( + fs_->rpc_engine().client_name(), *it, offset_within_block, + asio::buffer(buffers, size_within_block), handler); +} + +template +void InputStreamImpl::AsyncReadBlock( + const std::string &client_name, + const hadoop::hdfs::LocatedBlockProto &block, size_t offset, + const MutableBufferSequence &buffers, const Handler &handler) { + + typedef typename BlockReaderTrait::Reader Reader; + auto m = + BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), block); + auto &s = m->state(); + size_t size = asio::buffer_size(buffers); + m->Push(new HandshakeContinuation(s.reader(), client_name, nullptr, + &block.b(), size, offset)) + .Push(new ReadBlockContinuation( + s.reader(), buffers, s.transferred())); + m->Run([handler](const Status &status, + const typename BlockReaderTrait::State &state) { + handler(status, *state.transferred()); + }); +} +} + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc index 80522e72aee..e4b5accf8c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc @@ -81,8 +81,8 @@ void StubGenerator::EmitMethod(const MethodDescriptor *method, out->Print( "\n inline void $camel_method$(const Message *req, " "const std::shared_ptr &resp, " - "Callback &&handler) {\n" - " engine_->AsyncRpc(\"$method$\", req, resp, std::move(handler));\n" + "const Callback &handler) {\n" + " engine_->AsyncRpc(\"$method$\", req, resp, handler);\n" " }\n", "camel_method", ToCamelCase(method->name()), "method", method->name()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc index 4d08bd15793..555c37f40f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc @@ -182,7 +182,8 @@ std::shared_ptr RpcConnection::PrepareHandshakePacket() { void RpcConnection::AsyncRpc( const std::string &method_name, const ::google::protobuf::MessageLite *req, - std::shared_ptr<::google::protobuf::MessageLite> resp, Callback &&handler) { + std::shared_ptr<::google::protobuf::MessageLite> resp, + const Callback &handler) { std::lock_guard state_lock(engine_state_lock_); auto wrapped_handler = diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc index 50dce864577..a9d6cee6594 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -35,20 +35,15 @@ RpcEngine::RpcEngine(::asio::io_service *io_service, , conn_(new RpcConnectionImpl<::asio::ip::tcp::socket>(this)) {} -Status -RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &servers) { - using ::asio::ip::tcp; - auto stat = std::make_shared>(); - std::future future(stat->get_future()); - conn_->Connect(servers, [this, stat](const Status &status) { - if (!status.ok()) { - stat->set_value(status); - return; +void RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &servers, + const std::function &handler) { + conn_->Connect(servers, [this, handler](const Status &stat) { + if (!stat.ok()) { + handler(stat); + } else { + conn_->Handshake([handler](const Status &s) { handler(s); }); } - conn_->Handshake( - [this, stat](const Status &status) { stat->set_value(status); }); }); - return future.get(); } void RpcEngine::Start() { conn_->Start(); } @@ -60,8 +55,8 @@ void RpcEngine::Shutdown() { void RpcEngine::AsyncRpc( const std::string &method_name, const ::google::protobuf::MessageLite *req, const std::shared_ptr<::google::protobuf::MessageLite> &resp, - std::function &&handler) { - conn_->AsyncRpc(method_name, req, resp, std::move(handler)); + const std::function &handler) { + conn_->AsyncRpc(method_name, req, resp, handler); } Status diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h index cd5c0e6a47e..9ff6361c81b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h @@ -48,7 +48,7 @@ public: void AsyncRpc(const std::string &method_name, const ::google::protobuf::MessageLite *req, std::shared_ptr<::google::protobuf::MessageLite> resp, - Callback &&handler); + const Callback &handler); void AsyncRawRpc(const std::string &method_name, const std::string &request, std::shared_ptr resp, Callback &&handler); @@ -123,7 +123,7 @@ public: void AsyncRpc(const std::string &method_name, const ::google::protobuf::MessageLite *req, const std::shared_ptr<::google::protobuf::MessageLite> &resp, - std::function &&handler); + const std::function &handler); Status Rpc(const std::string &method_name, const ::google::protobuf::MessageLite *req, @@ -134,7 +134,8 @@ public: **/ Status RawRpc(const std::string &method_name, const std::string &req, std::shared_ptr resp); - Status Connect(const std::vector<::asio::ip::tcp::endpoint> &server); + void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, + const std::function &handler); void Start(); void Shutdown(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt index 53f340dc538..4c622f2fab9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt @@ -25,3 +25,7 @@ add_test(remote_block_reader remote_block_reader_test) add_executable(sasl_digest_md5_test sasl_digest_md5_test.cc) target_link_libraries(sasl_digest_md5_test common ${OPENSSL_LIBRARIES} gmock_main) add_test(sasl_digest_md5 sasl_digest_md5_test) + +add_executable(inputstream_test inputstream_test.cc) +target_link_libraries(inputstream_test fs rpc reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main) +add_test(inputstream inputstream_test) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc new file mode 100644 index 00000000000..96aa38abf98 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fs/filesystem.h" +#include + +using hadoop::common::TokenProto; +using hadoop::hdfs::ExtendedBlockProto; +using hadoop::hdfs::LocatedBlockProto; +using hadoop::hdfs::LocatedBlocksProto; + +using ::testing::_; +using ::testing::InvokeArgument; +using ::testing::Return; + +using namespace hdfs; + +namespace hdfs { + +class MockReader { +public: + virtual ~MockReader() {} + MOCK_METHOD2( + async_read_some, + void(const asio::mutable_buffers_1 &, + const std::function &)); + + MOCK_METHOD6(async_connect, + void(const std::string &, TokenProto *, ExtendedBlockProto *, + uint64_t, uint64_t, + const std::function &)); +}; + +template struct MockBlockReaderTrait { + typedef MockReader Reader; + struct State { + MockReader reader_; + size_t transferred_; + Reader *reader() { return &reader_; } + size_t *transferred() { return &transferred_; } + const size_t *transferred() const { return &transferred_; } + }; + + static continuation::Pipeline * + CreatePipeline(::asio::io_service *, const LocatedBlockProto &) { + auto m = continuation::Pipeline::Create(); + *m->state().transferred() = 0; + Trait::InitializeMockReader(m->state().reader()); + return m; + } +}; +} + +TEST(InputStreamTest, TestReadSingleTrunk) { + LocatedBlocksProto blocks; + LocatedBlockProto block; + char buf[4096] = { + 0, + }; + IoServiceImpl io_service; + FileSystemImpl fs(&io_service); + InputStreamImpl is(&fs, &blocks); + Status stat; + size_t read = 0; + struct Trait { + static void InitializeMockReader(MockReader *reader) { + EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) + .WillOnce(InvokeArgument<5>(Status::OK())); + + EXPECT_CALL(*reader, async_read_some(_,_)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf))); + } + }; + + is.AsyncReadBlock>( + "client", block, 0, asio::buffer(buf, sizeof(buf)), + [&stat, &read](const Status &status, size_t transferred) { + stat = status; + read = transferred; + }); + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(sizeof(buf), read); + read = 0; +} + +TEST(InputStreamTest, TestReadMultipleTrunk) { + LocatedBlocksProto blocks; + LocatedBlockProto block; + char buf[4096] = { + 0, + }; + IoServiceImpl io_service; + FileSystemImpl fs(&io_service); + InputStreamImpl is(&fs, &blocks); + Status stat; + size_t read = 0; + struct Trait { + static void InitializeMockReader(MockReader *reader) { + EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) + .WillOnce(InvokeArgument<5>(Status::OK())); + + EXPECT_CALL(*reader, async_read_some(_,_)) + .Times(4) + .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)); + } + }; + + is.AsyncReadBlock>( + "client", block, 0, asio::buffer(buf, sizeof(buf)), + [&stat, &read](const Status &status, size_t transferred) { + stat = status; + read = transferred; + }); + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(sizeof(buf), read); + read = 0; +} + +TEST(InputStreamTest, TestReadError) { + LocatedBlocksProto blocks; + LocatedBlockProto block; + char buf[4096] = { + 0, + }; + IoServiceImpl io_service; + FileSystemImpl fs(&io_service); + InputStreamImpl is(&fs, &blocks); + Status stat; + size_t read = 0; + struct Trait { + static void InitializeMockReader(MockReader *reader) { + EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) + .WillOnce(InvokeArgument<5>(Status::OK())); + + EXPECT_CALL(*reader, async_read_some(_,_)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) + .WillOnce(InvokeArgument<1>(Status::Error("error"), 0)); + } + }; + + is.AsyncReadBlock>( + "client", block, 0, asio::buffer(buf, sizeof(buf)), + [&stat, &read](const Status &status, size_t transferred) { + stat = status; + read = transferred; + }); + ASSERT_FALSE(stat.ok()); + ASSERT_EQ(sizeof(buf) / 4 * 3, read); + read = 0; +} + +int main(int argc, char *argv[]) { + // The following line must be executed to initialize Google Mock + // (and Google Test) before running the tests. + ::testing::InitGoogleMock(&argc, argv); + return RUN_ALL_TESTS(); +}