HDFS-9093. Initialize protobuf fields in RemoteBlockReaderTest. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2015-09-16 16:48:56 -07:00 committed by James Clampffer
parent a559ef0243
commit 9480c116ac
3 changed files with 104 additions and 70 deletions

View File

@ -20,6 +20,10 @@
namespace hdfs { namespace hdfs {
MockConnectionBase::MockConnectionBase(::asio::io_service *io_service)
: io_service_(io_service)
{}
MockConnectionBase::~MockConnectionBase() {} MockConnectionBase::~MockConnectionBase() {}
} }

View File

@ -21,12 +21,15 @@
#include <asio/error_code.hpp> #include <asio/error_code.hpp>
#include <asio/buffer.hpp> #include <asio/buffer.hpp>
#include <asio/streambuf.hpp> #include <asio/streambuf.hpp>
#include <asio/io_service.hpp>
#include <gmock/gmock.h> #include <gmock/gmock.h>
namespace hdfs { namespace hdfs {
class MockConnectionBase { class MockConnectionBase {
public: public:
MockConnectionBase(::asio::io_service *io_service);
virtual ~MockConnectionBase(); virtual ~MockConnectionBase();
typedef std::pair<asio::error_code, std::string> ProducerResult; typedef std::pair<asio::error_code, std::string> ProducerResult;
template <class MutableBufferSequence, class Handler> template <class MutableBufferSequence, class Handler>
@ -34,7 +37,7 @@ public:
if (produced_.size() == 0) { if (produced_.size() == 0) {
ProducerResult r = Produce(); ProducerResult r = Produce();
if (r.first) { if (r.first) {
handler(r.first, 0); io_service_->post(std::bind(handler, r.first, 0));
} }
asio::mutable_buffers_1 data = produced_.prepare(r.second.size()); asio::mutable_buffers_1 data = produced_.prepare(r.second.size());
asio::buffer_copy(data, asio::buffer(r.second)); asio::buffer_copy(data, asio::buffer(r.second));
@ -44,17 +47,18 @@ public:
size_t len = std::min(asio::buffer_size(buf), produced_.size()); size_t len = std::min(asio::buffer_size(buf), produced_.size());
asio::buffer_copy(buf, produced_.data()); asio::buffer_copy(buf, produced_.data());
produced_.consume(len); produced_.consume(len);
handler(asio::error_code(), len); io_service_->post(std::bind(handler, asio::error_code(), len));
} }
template <class ConstBufferSequence, class Handler> template <class ConstBufferSequence, class Handler>
void async_write_some(const ConstBufferSequence &buf, Handler &&handler) { void async_write_some(const ConstBufferSequence &buf, Handler &&handler) {
// CompletionResult res = OnWrite(buf); // CompletionResult res = OnWrite(buf);
handler(asio::error_code(), asio::buffer_size(buf)); io_service_->post(std::bind(handler, asio::error_code(), asio::buffer_size(buf)));
} }
protected: protected:
virtual ProducerResult Produce() = 0; virtual ProducerResult Produce() = 0;
::asio::io_service *io_service_;
private: private:
asio::streambuf produced_; asio::streambuf produced_;

View File

@ -51,6 +51,8 @@ namespace hdfs {
class MockDNConnection : public MockConnectionBase { class MockDNConnection : public MockConnectionBase {
public: public:
MockDNConnection(::asio::io_service &io_service)
: MockConnectionBase(&io_service) {}
MOCK_METHOD0(Produce, ProducerResult()); MOCK_METHOD0(Produce, ProducerResult());
}; };
} }
@ -91,26 +93,20 @@ ProducePacket(const std::string &data, const std::string &checksum,
return std::make_pair(error_code(), std::move(payload)); return std::make_pair(error_code(), std::move(payload));
} }
template<class Stream = MockDNConnection> template <class Stream = MockDNConnection, class Handler>
static std::shared_ptr<RemoteBlockReader<Stream>> static std::shared_ptr<RemoteBlockReader<Stream>>
ReadContent(Stream *conn, TokenProto *token, ReadContent(Stream *conn, TokenProto *token, const ExtendedBlockProto &block,
const ExtendedBlockProto &block, uint64_t length, uint64_t offset, uint64_t length, uint64_t offset, const mutable_buffers_1 &buf,
const mutable_buffers_1 &buf, Status *status, size_t *transferred) { const Handler &handler) {
BlockReaderOptions options; BlockReaderOptions options;
auto reader = auto reader = std::make_shared<RemoteBlockReader<Stream>>(options, conn);
std::make_shared<RemoteBlockReader<Stream>>(options, conn);
Status result; Status result;
reader->async_connect( reader->async_connect("libhdfs++", token, &block, length, offset,
"libhdfs++", token, &block, length, offset, [buf, reader, handler](const Status &stat) {
[buf, reader, status, transferred](const Status &stat) {
if (!stat.ok()) { if (!stat.ok()) {
*status = stat; handler(stat, 0);
} else { } else {
reader->async_read_some( reader->async_read_some(buf, handler);
buf, [status, transferred](const Status &stat, size_t t) {
*transferred = t;
*status = stat;
});
} }
}); });
return reader; return reader;
@ -119,7 +115,8 @@ ReadContent(Stream *conn, TokenProto *token,
TEST(RemoteBlockReaderTest, TestReadWholeBlock) { TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
static const size_t kChunkSize = 512; static const size_t kChunkSize = 512;
static const string kChunkData(kChunkSize, 'a'); static const string kChunkData(kChunkSize, 'a');
MockDNConnection conn; ::asio::io_service io_service;
MockDNConnection conn(io_service);
BlockOpResponseProto block_op_resp; BlockOpResponseProto block_op_resp;
block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
@ -128,15 +125,20 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
.WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true))); .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
ExtendedBlockProto block; ExtendedBlockProto block;
block.set_poolid("foo");
block.set_blockid(0);
block.set_generationstamp(0);
std::string data(kChunkSize, 0); std::string data(kChunkSize, 0);
size_t transferred = 0;
Status stat;
ReadContent(&conn, nullptr, block, kChunkSize, 0, ReadContent(&conn, nullptr, block, kChunkSize, 0,
buffer(const_cast<char *>(data.c_str()), data.size()), &stat, buffer(const_cast<char *>(data.c_str()), data.size()),
&transferred); [&data, &io_service](const Status &stat, size_t transferred) {
ASSERT_TRUE(stat.ok()); ASSERT_TRUE(stat.ok());
ASSERT_EQ(kChunkSize, transferred); ASSERT_EQ(kChunkSize, transferred);
ASSERT_EQ(kChunkData, data); ASSERT_EQ(kChunkData, data);
io_service.stop();
});
io_service.run();
} }
TEST(RemoteBlockReaderTest, TestReadWithinChunk) { TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
@ -145,7 +147,8 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
static const size_t kOffset = kChunkSize / 4; static const size_t kOffset = kChunkSize / 4;
static const string kChunkData = string(kOffset, 'a') + string(kLength, 'b'); static const string kChunkData = string(kOffset, 'a') + string(kLength, 'b');
MockDNConnection conn; ::asio::io_service io_service;
MockDNConnection conn(io_service);
BlockOpResponseProto block_op_resp; BlockOpResponseProto block_op_resp;
ReadOpChecksumInfoProto *checksum_info = ReadOpChecksumInfoProto *checksum_info =
block_op_resp.mutable_readopchecksuminfo(); block_op_resp.mutable_readopchecksuminfo();
@ -160,22 +163,28 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
.WillOnce(Return(ProducePacket(kChunkData, "", kOffset, 1, true))); .WillOnce(Return(ProducePacket(kChunkData, "", kOffset, 1, true)));
ExtendedBlockProto block; ExtendedBlockProto block;
block.set_poolid("foo");
block.set_blockid(0);
block.set_generationstamp(0);
string data(kLength, 0); string data(kLength, 0);
size_t transferred = 0;
Status stat;
ReadContent(&conn, nullptr, block, data.size(), kOffset, ReadContent(&conn, nullptr, block, data.size(), kOffset,
buffer(const_cast<char *>(data.c_str()), data.size()), &stat, buffer(const_cast<char *>(data.c_str()), data.size()),
&transferred); [&data, &io_service](const Status &stat, size_t transferred) {
ASSERT_TRUE(stat.ok()); ASSERT_TRUE(stat.ok());
ASSERT_EQ(kLength, transferred); ASSERT_EQ(kLength, transferred);
ASSERT_EQ(kChunkData.substr(kOffset, kLength), data); ASSERT_EQ(kChunkData.substr(kOffset, kLength), data);
io_service.stop();
});
io_service.run();
} }
TEST(RemoteBlockReaderTest, TestReadMultiplePacket) { TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
static const size_t kChunkSize = 1024; static const size_t kChunkSize = 1024;
static const string kChunkData(kChunkSize, 'a'); static const string kChunkData(kChunkSize, 'a');
MockDNConnection conn; ::asio::io_service io_service;
MockDNConnection conn(io_service);
BlockOpResponseProto block_op_resp; BlockOpResponseProto block_op_resp;
block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
@ -185,25 +194,37 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
.WillOnce(Return(ProducePacket(kChunkData, "", kChunkSize, 2, true))); .WillOnce(Return(ProducePacket(kChunkData, "", kChunkSize, 2, true)));
ExtendedBlockProto block; ExtendedBlockProto block;
block.set_poolid("foo");
block.set_blockid(0);
block.set_generationstamp(0);
string data(kChunkSize, 0); string data(kChunkSize, 0);
size_t transferred = 0;
Status stat;
mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size()); mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size());
auto reader = ReadContent(&conn, nullptr, block, data.size(), 0, buf, &stat, BlockReaderOptions options;
&transferred); auto reader = std::make_shared<RemoteBlockReader<MockDNConnection> >(options, &conn);
Status result;
reader->async_connect(
"libhdfs++", nullptr, &block, data.size(), 0,
[buf, reader, &data, &io_service](const Status &stat) {
ASSERT_TRUE(stat.ok());
reader->async_read_some(
buf, [buf, reader, &data, &io_service](const Status &stat, size_t transferred) {
ASSERT_TRUE(stat.ok()); ASSERT_TRUE(stat.ok());
ASSERT_EQ(kChunkSize, transferred); ASSERT_EQ(kChunkSize, transferred);
ASSERT_EQ(kChunkData, data); ASSERT_EQ(kChunkData, data);
data.clear(); data.clear();
data.resize(kChunkSize); data.resize(kChunkSize);
transferred = 0; transferred = 0;
reader->async_read_some(
reader->async_read_some(buf, [&data](const Status &stat, size_t transferred) { buf, [&data,&io_service](const Status &stat, size_t transferred) {
ASSERT_TRUE(stat.ok()); ASSERT_TRUE(stat.ok());
ASSERT_EQ(kChunkSize, transferred); ASSERT_EQ(kChunkSize, transferred);
ASSERT_EQ(kChunkData, data); ASSERT_EQ(kChunkData, data);
io_service.stop();
}); });
});
});
io_service.run();
} }
TEST(RemoteBlockReaderTest, TestSaslConnection) { TEST(RemoteBlockReaderTest, TestSaslConnection) {
@ -212,7 +233,8 @@ TEST(RemoteBlockReaderTest, TestSaslConnection) {
static const string kAuthPayload = "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/" static const string kAuthPayload = "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/"
"qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\"," "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\","
"charset=utf-8,algorithm=md5-sess"; "charset=utf-8,algorithm=md5-sess";
MockDNConnection conn; ::asio::io_service io_service;
MockDNConnection conn(io_service);
BlockOpResponseProto block_op_resp; BlockOpResponseProto block_op_resp;
block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
@ -233,20 +255,24 @@ TEST(RemoteBlockReaderTest, TestSaslConnection) {
DataTransferSaslStream<MockDNConnection> sasl_conn(&conn, "foo", "bar"); DataTransferSaslStream<MockDNConnection> sasl_conn(&conn, "foo", "bar");
ExtendedBlockProto block; ExtendedBlockProto block;
std::string data(kChunkSize, 0); block.set_poolid("foo");
size_t transferred = 0; block.set_blockid(0);
Status stat; block.set_generationstamp(0);
sasl_conn.Handshake([&stat](const Status &s) {
stat = s;
});
ASSERT_TRUE(stat.ok()); std::string data(kChunkSize, 0);
sasl_conn.Handshake([&sasl_conn, &block, &data, &io_service](
const Status &s) {
ASSERT_TRUE(s.ok());
ReadContent(&sasl_conn, nullptr, block, kChunkSize, 0, ReadContent(&sasl_conn, nullptr, block, kChunkSize, 0,
buffer(const_cast<char *>(data.c_str()), data.size()), &stat, buffer(const_cast<char *>(data.c_str()), data.size()),
&transferred); [&data, &io_service](const Status &stat, size_t transferred) {
ASSERT_TRUE(stat.ok()); ASSERT_TRUE(stat.ok());
ASSERT_EQ(kChunkSize, transferred); ASSERT_EQ(kChunkSize, transferred);
ASSERT_EQ(kChunkData, data); ASSERT_EQ(kChunkData, data);
io_service.stop();
});
});
io_service.run();
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {