HDFS-8788. Implement unit tests for remote block reader in libhdfspp. Contributed by Haohui Mai.
This commit is contained in:
parent
1bec75a13c
commit
26d9f6cee3
|
@ -204,6 +204,19 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
</target>
|
</target>
|
||||||
</configuration>
|
</configuration>
|
||||||
</execution>
|
</execution>
|
||||||
|
<execution>
|
||||||
|
<id>native_tests</id>
|
||||||
|
<phase>test</phase>
|
||||||
|
<goals><goal>run</goal></goals>
|
||||||
|
<configuration>
|
||||||
|
<skip>${skipTests}</skip>
|
||||||
|
<target>
|
||||||
|
<exec executable="make" dir="${project.build.directory}/native" failonerror="true">
|
||||||
|
<arg line="test"/>
|
||||||
|
</exec>
|
||||||
|
</target>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
</executions>
|
</executions>
|
||||||
</plugin>
|
</plugin>
|
||||||
</plugins>
|
</plugins>
|
||||||
|
|
|
@ -18,4 +18,6 @@
|
||||||
|
|
||||||
cmake_minimum_required(VERSION 2.8 FATAL_ERROR)
|
cmake_minimum_required(VERSION 2.8 FATAL_ERROR)
|
||||||
|
|
||||||
|
enable_testing()
|
||||||
|
|
||||||
add_subdirectory(libhdfspp)
|
add_subdirectory(libhdfspp)
|
||||||
|
|
|
@ -51,3 +51,4 @@ include_directories(
|
||||||
|
|
||||||
add_subdirectory(third_party/gmock-1.7.0)
|
add_subdirectory(third_party/gmock-1.7.0)
|
||||||
add_subdirectory(lib)
|
add_subdirectory(lib)
|
||||||
|
add_subdirectory(tests)
|
||||||
|
|
|
@ -0,0 +1,22 @@
|
||||||
|
#
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
add_library(test_common OBJECT mock_connection.cc)
|
||||||
|
add_executable(remote_block_reader_test remote_block_reader_test.cc $<TARGET_OBJECTS:test_common>)
|
||||||
|
target_link_libraries(remote_block_reader_test reader proto common ${PROTOBUF_LIBRARIES} gmock_main)
|
||||||
|
add_test(remote_block_reader remote_block_reader_test)
|
|
@ -0,0 +1,25 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "mock_connection.h"
|
||||||
|
|
||||||
|
namespace hdfs {
|
||||||
|
|
||||||
|
MockConnectionBase::~MockConnectionBase() {}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,64 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
#ifndef LIBHDFSPP_TEST_MOCK_CONNECTION_H_
|
||||||
|
#define LIBHDFSPP_TEST_MOCK_CONNECTION_H_
|
||||||
|
|
||||||
|
#include <asio/error_code.hpp>
|
||||||
|
#include <asio/buffer.hpp>
|
||||||
|
#include <asio/streambuf.hpp>
|
||||||
|
#include <gmock/gmock.h>
|
||||||
|
|
||||||
|
namespace hdfs {
|
||||||
|
|
||||||
|
class MockConnectionBase {
|
||||||
|
public:
|
||||||
|
virtual ~MockConnectionBase();
|
||||||
|
typedef std::pair<asio::error_code, std::string> ProducerResult;
|
||||||
|
template <class MutableBufferSequence, class Handler>
|
||||||
|
void async_read_some(const MutableBufferSequence &buf, Handler &&handler) {
|
||||||
|
if (produced_.size() == 0) {
|
||||||
|
ProducerResult r = Produce();
|
||||||
|
if (r.first) {
|
||||||
|
handler(r.first, 0);
|
||||||
|
}
|
||||||
|
asio::mutable_buffers_1 data = produced_.prepare(r.second.size());
|
||||||
|
asio::buffer_copy(data, asio::buffer(r.second));
|
||||||
|
produced_.commit(r.second.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t len = std::min(asio::buffer_size(buf), produced_.size());
|
||||||
|
asio::buffer_copy(buf, produced_.data());
|
||||||
|
produced_.consume(len);
|
||||||
|
handler(asio::error_code(), len);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <class ConstBufferSequence, class Handler>
|
||||||
|
void async_write_some(const ConstBufferSequence &buf, Handler &&handler) {
|
||||||
|
// CompletionResult res = OnWrite(buf);
|
||||||
|
handler(asio::error_code(), asio::buffer_size(buf));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
virtual ProducerResult Produce() = 0;
|
||||||
|
|
||||||
|
private:
|
||||||
|
asio::streambuf produced_;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
|
@ -0,0 +1,213 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "mock_connection.h"
|
||||||
|
|
||||||
|
#include "datatransfer.pb.h"
|
||||||
|
#include "common/util.h"
|
||||||
|
#include "reader/block_reader.h"
|
||||||
|
|
||||||
|
#include <google/protobuf/io/coded_stream.h>
|
||||||
|
#include <google/protobuf/io/zero_copy_stream_impl.h>
|
||||||
|
#include <gmock/gmock.h>
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
using namespace hdfs;
|
||||||
|
|
||||||
|
using ::hadoop::common::TokenProto;
|
||||||
|
using ::hadoop::hdfs::BlockOpResponseProto;
|
||||||
|
using ::hadoop::hdfs::ChecksumProto;
|
||||||
|
using ::hadoop::hdfs::ExtendedBlockProto;
|
||||||
|
using ::hadoop::hdfs::PacketHeaderProto;
|
||||||
|
using ::hadoop::hdfs::ReadOpChecksumInfoProto;
|
||||||
|
|
||||||
|
using ::asio::buffer;
|
||||||
|
using ::asio::error_code;
|
||||||
|
using ::asio::mutable_buffers_1;
|
||||||
|
using ::testing::Return;
|
||||||
|
using std::make_pair;
|
||||||
|
using std::string;
|
||||||
|
|
||||||
|
namespace pb = ::google::protobuf;
|
||||||
|
namespace pbio = pb::io;
|
||||||
|
|
||||||
|
namespace hdfs {
|
||||||
|
|
||||||
|
class MockDNConnection : public MockConnectionBase {
|
||||||
|
public:
|
||||||
|
MOCK_METHOD0(Produce, ProducerResult());
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline string ToDelimitedString(const pb::MessageLite *msg) {
|
||||||
|
string res;
|
||||||
|
res.reserve(hdfs::DelimitedPBMessageSize(msg));
|
||||||
|
pbio::StringOutputStream os(&res);
|
||||||
|
pbio::CodedOutputStream out(&os);
|
||||||
|
out.WriteVarint32(msg->ByteSize());
|
||||||
|
msg->SerializeToCodedStream(&out);
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline std::pair<error_code, string> Produce(const std::string &s) {
|
||||||
|
return make_pair(error_code(), s);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline std::pair<error_code, string>
|
||||||
|
ProducePacket(const std::string &data, const std::string &checksum,
|
||||||
|
int offset_in_block, int seqno, bool last_packet) {
|
||||||
|
PacketHeaderProto proto;
|
||||||
|
proto.set_datalen(data.size());
|
||||||
|
proto.set_offsetinblock(offset_in_block);
|
||||||
|
proto.set_seqno(seqno);
|
||||||
|
proto.set_lastpacketinblock(last_packet);
|
||||||
|
|
||||||
|
char prefix[6];
|
||||||
|
*reinterpret_cast<unsigned *>(prefix) =
|
||||||
|
htonl(data.size() + checksum.size() + sizeof(int));
|
||||||
|
*reinterpret_cast<short *>(prefix + sizeof(int)) = htons(proto.ByteSize());
|
||||||
|
std::string payload(prefix, sizeof(prefix));
|
||||||
|
payload.reserve(payload.size() + proto.ByteSize() + checksum.size() +
|
||||||
|
data.size());
|
||||||
|
proto.AppendToString(&payload);
|
||||||
|
payload += checksum;
|
||||||
|
payload += data;
|
||||||
|
return std::make_pair(error_code(), std::move(payload));
|
||||||
|
}
|
||||||
|
|
||||||
|
static std::shared_ptr<RemoteBlockReader<MockDNConnection>>
|
||||||
|
ReadContent(MockDNConnection *conn, TokenProto *token,
|
||||||
|
const ExtendedBlockProto &block, uint64_t length, uint64_t offset,
|
||||||
|
const mutable_buffers_1 &buf, Status *status, size_t *transferred) {
|
||||||
|
BlockReaderOptions options;
|
||||||
|
auto reader =
|
||||||
|
std::make_shared<RemoteBlockReader<MockDNConnection>>(options, conn);
|
||||||
|
Status result;
|
||||||
|
reader->async_connect(
|
||||||
|
"libhdfs++", token, &block, length, offset,
|
||||||
|
[buf, reader, status, transferred](const Status &stat) {
|
||||||
|
if (!stat.ok()) {
|
||||||
|
*status = stat;
|
||||||
|
} else {
|
||||||
|
reader->async_read_some(
|
||||||
|
buf, [status, transferred](const Status &stat, size_t t) {
|
||||||
|
*transferred = t;
|
||||||
|
*status = stat;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return reader;
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
|
||||||
|
static const size_t kChunkSize = 512;
|
||||||
|
static const string kChunkData(kChunkSize, 'a');
|
||||||
|
MockDNConnection conn;
|
||||||
|
BlockOpResponseProto block_op_resp;
|
||||||
|
|
||||||
|
block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
|
||||||
|
|
||||||
|
EXPECT_CALL(conn, Produce())
|
||||||
|
.WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
|
||||||
|
.WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
|
||||||
|
|
||||||
|
ExtendedBlockProto block;
|
||||||
|
std::string data(kChunkSize, 0);
|
||||||
|
size_t transferred = 0;
|
||||||
|
Status stat;
|
||||||
|
ReadContent(&conn, nullptr, block, kChunkSize, 0,
|
||||||
|
buffer(const_cast<char *>(data.c_str()), data.size()), &stat,
|
||||||
|
&transferred);
|
||||||
|
ASSERT_TRUE(stat.ok());
|
||||||
|
ASSERT_EQ(kChunkSize, transferred);
|
||||||
|
ASSERT_EQ(kChunkData, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
|
||||||
|
static const size_t kChunkSize = 1024;
|
||||||
|
static const size_t kLength = kChunkSize / 4 * 3;
|
||||||
|
static const size_t kOffset = kChunkSize / 4;
|
||||||
|
static const string kChunkData = string(kOffset, 'a') + string(kLength, 'b');
|
||||||
|
|
||||||
|
MockDNConnection conn;
|
||||||
|
BlockOpResponseProto block_op_resp;
|
||||||
|
ReadOpChecksumInfoProto *checksum_info =
|
||||||
|
block_op_resp.mutable_readopchecksuminfo();
|
||||||
|
checksum_info->set_chunkoffset(0);
|
||||||
|
ChecksumProto *checksum = checksum_info->mutable_checksum();
|
||||||
|
checksum->set_type(::hadoop::hdfs::ChecksumTypeProto::CHECKSUM_NULL);
|
||||||
|
checksum->set_bytesperchecksum(512);
|
||||||
|
block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
|
||||||
|
|
||||||
|
EXPECT_CALL(conn, Produce())
|
||||||
|
.WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
|
||||||
|
.WillOnce(Return(ProducePacket(kChunkData, "", kOffset, 1, true)));
|
||||||
|
|
||||||
|
ExtendedBlockProto block;
|
||||||
|
string data(kLength, 0);
|
||||||
|
size_t transferred = 0;
|
||||||
|
Status stat;
|
||||||
|
ReadContent(&conn, nullptr, block, data.size(), kOffset,
|
||||||
|
buffer(const_cast<char *>(data.c_str()), data.size()), &stat,
|
||||||
|
&transferred);
|
||||||
|
ASSERT_TRUE(stat.ok());
|
||||||
|
ASSERT_EQ(kLength, transferred);
|
||||||
|
ASSERT_EQ(kChunkData.substr(kOffset, kLength), data);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
|
||||||
|
static const size_t kChunkSize = 1024;
|
||||||
|
static const string kChunkData(kChunkSize, 'a');
|
||||||
|
|
||||||
|
MockDNConnection conn;
|
||||||
|
BlockOpResponseProto block_op_resp;
|
||||||
|
block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
|
||||||
|
|
||||||
|
EXPECT_CALL(conn, Produce())
|
||||||
|
.WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
|
||||||
|
.WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, false)))
|
||||||
|
.WillOnce(Return(ProducePacket(kChunkData, "", kChunkSize, 2, true)));
|
||||||
|
|
||||||
|
ExtendedBlockProto block;
|
||||||
|
string data(kChunkSize, 0);
|
||||||
|
size_t transferred = 0;
|
||||||
|
Status stat;
|
||||||
|
mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size());
|
||||||
|
auto reader = ReadContent(&conn, nullptr, block, data.size(), 0, buf, &stat,
|
||||||
|
&transferred);
|
||||||
|
ASSERT_TRUE(stat.ok());
|
||||||
|
ASSERT_EQ(kChunkSize, transferred);
|
||||||
|
ASSERT_EQ(kChunkData, data);
|
||||||
|
|
||||||
|
data.clear();
|
||||||
|
data.resize(kChunkSize);
|
||||||
|
transferred = 0;
|
||||||
|
|
||||||
|
reader->async_read_some(buf, [&data](const Status &stat, size_t transferred) {
|
||||||
|
ASSERT_TRUE(stat.ok());
|
||||||
|
ASSERT_EQ(kChunkSize, transferred);
|
||||||
|
ASSERT_EQ(kChunkData, data);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char *argv[]) {
|
||||||
|
// The following line must be executed to initialize Google Mock
|
||||||
|
// (and Google Test) before running the tests.
|
||||||
|
::testing::InitGoogleMock(&argc, argv);
|
||||||
|
return RUN_ALL_TESTS();
|
||||||
|
}
|
Loading…
Reference in New Issue