From bc47acc9d9b87464c90bbe650f9a3c644b9b8931 Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Mon, 24 Aug 2015 18:10:17 -0700 Subject: [PATCH] HDFS-8952. InputStream.PositionRead() should be aware of available DNs. Contributed by Haohui Mai. --- .../native/libhdfspp/include/libhdfspp/hdfs.h | 20 ++++-- .../libhdfspp/include/libhdfspp/status.h | 1 - .../main/native/libhdfspp/lib/fs/filesystem.h | 12 ++-- .../native/libhdfspp/lib/fs/inputstream.cc | 8 ++- .../libhdfspp/lib/fs/inputstream_impl.h | 50 ++++++++------ .../libhdfspp/tests/inputstream_test.cc | 69 ++++++++++++++++--- 6 files changed, 118 insertions(+), 42 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h index 59ebd6087e2..edb1ff3e097 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h @@ -21,6 +21,7 @@ #include "libhdfspp/status.h" #include +#include namespace hdfs { @@ -57,12 +58,23 @@ public: class InputStream { public: /** - * Read data from a specific position. The handler returns the - * number of bytes has read. + * Read data from a specific position. The current implementation + * stops at the block boundary. + * + * @param buf the pointer to the buffer + * @param nbyte the size of the buffer + * @param offset the offset the file + * @param excluded_datanodes the UUID of the datanodes that should + * not be used in this read + * + * The handler returns the datanode that serves the block and the number of + * bytes has read. **/ virtual void - PositionRead(void *buf, size_t nbyte, size_t offset, - const std::function &handler) = 0; + PositionRead(void *buf, size_t nbyte, uint64_t offset, + const std::set &excluded_datanodes, + const std::function &handler) = 0; virtual ~InputStream(); }; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h index d2ef005f85a..fc5ea66c276 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h @@ -66,7 +66,6 @@ class Status { // state_[4] == code // state_[5..] == message const char* state_; - friend class StatusHelper; enum Code { kOk = 0, diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h index 4e29a806816..0c9dd7af5bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -46,16 +46,20 @@ class InputStreamImpl : public InputStream { public: InputStreamImpl(FileSystemImpl *fs, const ::hadoop::hdfs::LocatedBlocksProto *blocks); - virtual void PositionRead( - void *buf, size_t nbyte, size_t offset, - const std::function &handler) override; + virtual void + PositionRead(void *buf, size_t nbyte, uint64_t offset, + const std::set &excluded_datanodes, + const std::function &handler) override; template void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers, + const std::set &excluded_datanodes, const Handler &handler); template void AsyncReadBlock(const std::string &client_name, const hadoop::hdfs::LocatedBlockProto &block, - size_t offset, const MutableBufferSequence &buffers, + const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset, + const MutableBufferSequence &buffers, const Handler &handler); private: diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc index 1b230c65941..b47dcb1a8fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc @@ -37,8 +37,10 @@ InputStreamImpl::InputStreamImpl(FileSystemImpl *fs, } void InputStreamImpl::PositionRead( - void *buf, size_t nbyte, size_t offset, - const std::function &handler) { - AsyncPreadSome(offset, asio::buffer(buf, nbyte), handler); + void *buf, size_t nbyte, uint64_t offset, + const std::set &excluded_datanodes, + const std::function + &handler) { + AsyncPreadSome(offset, asio::buffer(buf, nbyte), excluded_datanodes, handler); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h index 077a15f0307..ca5ac38e234 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h @@ -33,7 +33,7 @@ struct InputStreamImpl::RemoteBlockReaderTrait { struct State { std::unique_ptr conn_; std::unique_ptr reader_; - std::vector endpoints_; + std::array endpoints_; size_t transferred_; Reader *reader() { return reader_.get(); } size_t *transferred() { return &transferred_; } @@ -41,17 +41,15 @@ struct InputStreamImpl::RemoteBlockReaderTrait { }; static continuation::Pipeline * CreatePipeline(::asio::io_service *io_service, - const ::hadoop::hdfs::LocatedBlockProto &b) { + const ::hadoop::hdfs::DatanodeInfoProto &dn) { using namespace ::asio::ip; auto m = continuation::Pipeline::Create(); auto &s = m->state(); s.conn_.reset(new tcp::socket(*io_service)); s.reader_.reset(new Reader(BlockReaderOptions(), s.conn_.get())); - for (auto &loc : b.locs()) { - auto datanode = loc.id(); - s.endpoints_.push_back(tcp::endpoint( - address::from_string(datanode.ipaddr()), datanode.xferport())); - } + auto datanode = dn.id(); + s.endpoints_[0] = tcp::endpoint(address::from_string(datanode.ipaddr()), + datanode.xferport()); m->Push(continuation::Connect(s.conn_.get(), s.endpoints_.begin(), s.endpoints_.end())); @@ -125,12 +123,11 @@ private: }; template -void InputStreamImpl::AsyncPreadSome(size_t offset, - const MutableBufferSequence &buffers, - const Handler &handler) { +void InputStreamImpl::AsyncPreadSome( + size_t offset, const MutableBufferSequence &buffers, + const std::set &excluded_datanodes, const Handler &handler) { + using ::hadoop::hdfs::DatanodeInfoProto; using ::hadoop::hdfs::LocatedBlockProto; - namespace ip = ::asio::ip; - using ::asio::ip::tcp; auto it = std::find_if( blocks_.begin(), blocks_.end(), [offset](const LocatedBlockProto &p) { @@ -138,10 +135,21 @@ void InputStreamImpl::AsyncPreadSome(size_t offset, }); if (it == blocks_.end()) { - handler(Status::InvalidArgument("Cannot find corresponding blocks"), 0); + handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0); return; - } else if (!it->locs_size()) { - handler(Status::ResourceUnavailable("No datanodes available"), 0); + } + + const DatanodeInfoProto *chosen_dn = nullptr; + for (int i = 0; i < it->locs_size(); ++i) { + const auto &di = it->locs(i); + if (!excluded_datanodes.count(di.id().datanodeuuid())) { + chosen_dn = &di; + break; + } + } + + if (!chosen_dn) { + handler(Status::ResourceUnavailable("No datanodes available"), "", 0); return; } @@ -150,28 +158,30 @@ void InputStreamImpl::AsyncPreadSome(size_t offset, it->b().numbytes() - offset_within_block, asio::buffer_size(buffers)); AsyncReadBlock( - fs_->rpc_engine().client_name(), *it, offset_within_block, + fs_->rpc_engine().client_name(), *it, *chosen_dn, offset_within_block, asio::buffer(buffers, size_within_block), handler); } template void InputStreamImpl::AsyncReadBlock( const std::string &client_name, - const hadoop::hdfs::LocatedBlockProto &block, size_t offset, + const hadoop::hdfs::LocatedBlockProto &block, + const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset, const MutableBufferSequence &buffers, const Handler &handler) { typedef typename BlockReaderTrait::Reader Reader; auto m = - BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), block); + BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), dn); auto &s = m->state(); size_t size = asio::buffer_size(buffers); m->Push(new HandshakeContinuation(s.reader(), client_name, nullptr, &block.b(), size, offset)) .Push(new ReadBlockContinuation( s.reader(), buffers, s.transferred())); - m->Run([handler](const Status &status, + const std::string &dnid = dn.id().datanodeuuid(); + m->Run([handler, dnid](const Status &status, const typename BlockReaderTrait::State &state) { - handler(status, *state.transferred()); + handler(status, dnid, *state.transferred()); }); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc index 96aa38abf98..6d878237075 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc @@ -20,6 +20,8 @@ #include using hadoop::common::TokenProto; +using hadoop::hdfs::DatanodeInfoProto; +using hadoop::hdfs::DatanodeIDProto; using hadoop::hdfs::ExtendedBlockProto; using hadoop::hdfs::LocatedBlockProto; using hadoop::hdfs::LocatedBlocksProto; @@ -57,7 +59,7 @@ template struct MockBlockReaderTrait { }; static continuation::Pipeline * - CreatePipeline(::asio::io_service *, const LocatedBlockProto &) { + CreatePipeline(::asio::io_service *, const DatanodeInfoProto &) { auto m = continuation::Pipeline::Create(); *m->state().transferred() = 0; Trait::InitializeMockReader(m->state().reader()); @@ -69,6 +71,7 @@ template struct MockBlockReaderTrait { TEST(InputStreamTest, TestReadSingleTrunk) { LocatedBlocksProto blocks; LocatedBlockProto block; + DatanodeInfoProto dn; char buf[4096] = { 0, }; @@ -82,14 +85,14 @@ TEST(InputStreamTest, TestReadSingleTrunk) { EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) .WillOnce(InvokeArgument<5>(Status::OK())); - EXPECT_CALL(*reader, async_read_some(_,_)) + EXPECT_CALL(*reader, async_read_some(_, _)) .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf))); } }; is.AsyncReadBlock>( - "client", block, 0, asio::buffer(buf, sizeof(buf)), - [&stat, &read](const Status &status, size_t transferred) { + "client", block, dn, 0, asio::buffer(buf, sizeof(buf)), + [&stat, &read](const Status &status, const std::string &, size_t transferred) { stat = status; read = transferred; }); @@ -101,6 +104,7 @@ TEST(InputStreamTest, TestReadSingleTrunk) { TEST(InputStreamTest, TestReadMultipleTrunk) { LocatedBlocksProto blocks; LocatedBlockProto block; + DatanodeInfoProto dn; char buf[4096] = { 0, }; @@ -114,15 +118,16 @@ TEST(InputStreamTest, TestReadMultipleTrunk) { EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) .WillOnce(InvokeArgument<5>(Status::OK())); - EXPECT_CALL(*reader, async_read_some(_,_)) + EXPECT_CALL(*reader, async_read_some(_, _)) .Times(4) .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)); } }; is.AsyncReadBlock>( - "client", block, 0, asio::buffer(buf, sizeof(buf)), - [&stat, &read](const Status &status, size_t transferred) { + "client", block, dn, 0, asio::buffer(buf, sizeof(buf)), + [&stat, &read](const Status &status, const std::string &, + size_t transferred) { stat = status; read = transferred; }); @@ -134,6 +139,7 @@ TEST(InputStreamTest, TestReadMultipleTrunk) { TEST(InputStreamTest, TestReadError) { LocatedBlocksProto blocks; LocatedBlockProto block; + DatanodeInfoProto dn; char buf[4096] = { 0, }; @@ -147,7 +153,7 @@ TEST(InputStreamTest, TestReadError) { EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) .WillOnce(InvokeArgument<5>(Status::OK())); - EXPECT_CALL(*reader, async_read_some(_,_)) + EXPECT_CALL(*reader, async_read_some(_, _)) .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) @@ -156,8 +162,9 @@ TEST(InputStreamTest, TestReadError) { }; is.AsyncReadBlock>( - "client", block, 0, asio::buffer(buf, sizeof(buf)), - [&stat, &read](const Status &status, size_t transferred) { + "client", block, dn, 0, asio::buffer(buf, sizeof(buf)), + [&stat, &read](const Status &status, const std::string &, + size_t transferred) { stat = status; read = transferred; }); @@ -166,6 +173,48 @@ TEST(InputStreamTest, TestReadError) { read = 0; } +TEST(InputStreamTest, TestExcludeDataNode) { + LocatedBlocksProto blocks; + LocatedBlockProto *block = blocks.add_blocks(); + ExtendedBlockProto *b = block->mutable_b(); + b->set_poolid(""); + b->set_blockid(1); + b->set_generationstamp(1); + b->set_numbytes(4096); + + DatanodeInfoProto *di = block->add_locs(); + DatanodeIDProto *dnid = di->mutable_id(); + dnid->set_datanodeuuid("foo"); + + char buf[4096] = { + 0, + }; + IoServiceImpl io_service; + FileSystemImpl fs(&io_service); + InputStreamImpl is(&fs, &blocks); + Status stat; + size_t read = 0; + struct Trait { + static void InitializeMockReader(MockReader *reader) { + EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) + .WillOnce(InvokeArgument<5>(Status::OK())); + + EXPECT_CALL(*reader, async_read_some(_, _)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf))); + } + }; + + + std::set excluded_dn({"foo"}); + is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), excluded_dn, + [&stat, &read](const Status &status, const std::string &, size_t transferred) { + stat = status; + read = transferred; + }); + ASSERT_EQ(static_cast(std::errc::resource_unavailable_try_again), stat.code()); + ASSERT_EQ(0UL, read); +} + int main(int argc, char *argv[]) { // The following line must be executed to initialize Google Mock // (and Google Test) before running the tests.