HDFS-9890: libhdfs++: Add test suite to simulate network issues. Contributed by Xiaowei Zhu.

This commit is contained in:
James 2016-07-13 15:36:14 +00:00 committed by James Clampffer
parent a23e6b422b
commit a586ccbcfa
16 changed files with 483 additions and 54 deletions

View File

@ -182,6 +182,16 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
} }
(*env)->DeleteLocalRef(env, val.l); (*env)->DeleteLocalRef(env, val.l);
} }
if (conf->numDataNodes) {
jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
"numDataNodes", "(I)L" MINIDFS_CLUSTER_BUILDER ";", conf->numDataNodes);
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: "
"Builder::numDataNodes");
goto error;
}
}
(*env)->DeleteLocalRef(env, val.l);
jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER, jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
"build", "()L" MINIDFS_CLUSTER ";"); "build", "()L" MINIDFS_CLUSTER ";");
if (jthr) { if (jthr) {

View File

@ -51,6 +51,11 @@ struct NativeMiniDfsConf {
* Nonzero if we should configure short circuit. * Nonzero if we should configure short circuit.
*/ */
jboolean configureShortCircuit; jboolean configureShortCircuit;
/**
* The number of datanodes in MiniDfsCluster
*/
jint numDataNodes;
}; };
/** /**

View File

@ -0,0 +1,345 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "expect.h"
#include "hdfs/hdfs.h"
#include "hdfspp/hdfs_ext.h"
#include "native_mini_dfs.h"
#include "os/thread.h"
#include <errno.h>
#include <inttypes.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define TO_STR_HELPER(X) #X
#define TO_STR(X) TO_STR_HELPER(X)
#define TLH_MAX_THREADS 10000
#define TLH_MAX_DNS 16
#define TLH_DEFAULT_BLOCK_SIZE 1048576
#define TLH_DEFAULT_DFS_REPLICATION 3
#define TLH_DEFAULT_IPC_CLIENT_CONNECT_MAX_RETRIES 100
#define TLH_DEFAULT_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS 5
#ifndef RANDOM_ERROR_RATIO
#define RANDOM_ERROR_RATIO 1000000000
#endif
struct tlhThreadInfo {
/** Thread index */
int threadIdx;
/** 0 = thread was successful; error code otherwise */
int success;
/** thread identifier */
thread theThread;
/** fs, shared with other threads **/
hdfsFS hdfs;
/** Filename */
const char *fileNm;
};
static int hdfsNameNodeConnect(struct NativeMiniDfsCluster *cl, hdfsFS *fs,
const char *username)
{
int ret;
tPort port;
hdfsFS hdfs;
struct hdfsBuilder *bld;
port = (tPort)nmdGetNameNodePort(cl);
if (port < 0) {
fprintf(stderr, "hdfsNameNodeConnect: nmdGetNameNodePort "
"returned error %d\n", port);
return port;
}
bld = hdfsNewBuilder();
if (!bld)
return -ENOMEM;
hdfsBuilderSetForceNewInstance(bld);
hdfsBuilderSetNameNode(bld, "localhost");
hdfsBuilderSetNameNodePort(bld, port);
hdfsBuilderConfSetStr(bld, "dfs.block.size",
TO_STR(TLH_DEFAULT_BLOCK_SIZE));
hdfsBuilderConfSetStr(bld, "dfs.blocksize",
TO_STR(TLH_DEFAULT_BLOCK_SIZE));
hdfsBuilderConfSetStr(bld, "dfs.replication",
TO_STR(TLH_DEFAULT_DFS_REPLICATION));
hdfsBuilderConfSetStr(bld, "ipc.client.connect.max.retries",
TO_STR(TLH_DEFAULT_IPC_CLIENT_CONNECT_MAX_RETRIES));
hdfsBuilderConfSetStr(bld, "ipc.client.connect.retry.interval",
TO_STR(TLH_DEFAULT_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS));
if (username) {
hdfsBuilderSetUserName(bld, username);
}
hdfs = hdfsBuilderConnect(bld);
if (!hdfs) {
ret = -errno;
return ret;
}
*fs = hdfs;
return 0;
}
static int hdfsWriteData(hdfsFS hdfs, const char *dirNm,
const char *fileNm, tSize fileSz)
{
hdfsFile file;
int ret, expected;
const char *content;
content = fileNm;
if (hdfsExists(hdfs, dirNm) == 0) {
EXPECT_ZERO(hdfsDelete(hdfs, dirNm, 1));
}
EXPECT_ZERO(hdfsCreateDirectory(hdfs, dirNm));
file = hdfsOpenFile(hdfs, fileNm, O_WRONLY, 0, 0, 0);
EXPECT_NONNULL(file);
expected = (int)strlen(content);
tSize sz = 0;
while (sz < fileSz) {
ret = hdfsWrite(hdfs, file, content, expected);
if (ret < 0) {
ret = errno;
fprintf(stderr, "hdfsWrite failed and set errno %d\n", ret);
return ret;
}
if (ret != expected) {
fprintf(stderr, "hdfsWrite was supposed to write %d bytes, but "
"it wrote %d\n", ret, expected);
return EIO;
}
sz += ret;
}
EXPECT_ZERO(hdfsFlush(hdfs, file));
EXPECT_ZERO(hdfsHSync(hdfs, file));
EXPECT_ZERO(hdfsCloseFile(hdfs, file));
return 0;
}
static int fileEventCallback1(const char * event, const char * cluster, const char * file, int64_t value, int64_t cookie)
{
char * randomErrRatioStr = getenv("RANDOM_ERROR_RATIO");
int64_t randomErrRatio = RANDOM_ERROR_RATIO;
if (randomErrRatioStr) randomErrRatio = (int64_t)atoi(randomErrRatioStr);
if (randomErrRatio == 0) return DEBUG_SIMULATE_ERROR;
else if (randomErrRatio < 0) return LIBHDFSPP_EVENT_OK;
return random() % randomErrRatio == 0 ? DEBUG_SIMULATE_ERROR : LIBHDFSPP_EVENT_OK;
}
static int fileEventCallback2(const char * event, const char * cluster, const char * file, int64_t value, int64_t cookie)
{
/* no op */
return LIBHDFSPP_EVENT_OK;
}
static int doTestHdfsMiniStress(struct tlhThreadInfo *ti, int randomErr)
{
char tmp[4096];
hdfsFile file;
int ret, expected;
hdfsFileInfo *fileInfo;
uint64_t readOps, nErrs=0;
tOffset seekPos;
const char *content;
content = ti->fileNm;
expected = (int)strlen(content);
fileInfo = hdfsGetPathInfo(ti->hdfs, ti->fileNm);
EXPECT_NONNULL(fileInfo);
file = hdfsOpenFile(ti->hdfs, ti->fileNm, O_RDONLY, 0, 0, 0);
EXPECT_NONNULL(file);
libhdfspp_file_event_callback callback = (randomErr != 0) ? &fileEventCallback1 : &fileEventCallback2;
hdfsPreAttachFileMonitor(callback, 0);
fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): starting read loop\n",
ti->threadIdx);
for (readOps=0; readOps < 1000; ++readOps) {
EXPECT_ZERO(hdfsCloseFile(ti->hdfs, file));
file = hdfsOpenFile(ti->hdfs, ti->fileNm, O_RDONLY, 0, 0, 0);
EXPECT_NONNULL(file);
seekPos = (((double)random()) / RAND_MAX) * (fileInfo->mSize - expected);
seekPos = (seekPos / expected) * expected;
ret = hdfsSeek(ti->hdfs, file, seekPos);
if (ret < 0) {
ret = errno;
fprintf(stderr, "hdfsSeek to %"PRIu64" failed and set"
" errno %d\n", seekPos, ret);
++nErrs;
continue;
}
ret = hdfsRead(ti->hdfs, file, tmp, expected);
if (ret < 0) {
ret = errno;
fprintf(stderr, "hdfsRead failed and set errno %d\n", ret);
++nErrs;
continue;
}
if (ret != expected) {
fprintf(stderr, "hdfsRead was supposed to read %d bytes, but "
"it read %d\n", ret, expected);
++nErrs;
continue;
}
ret = memcmp(content, tmp, expected);
if (ret) {
fprintf(stderr, "hdfsRead result (%.*s) does not match expected (%.*s)",
expected, tmp, expected, content);
++nErrs;
continue;
}
}
EXPECT_ZERO(hdfsCloseFile(ti->hdfs, file));
fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): finished read loop\n",
ti->threadIdx);
EXPECT_ZERO(nErrs);
return 0;
}
static int testHdfsMiniStressImpl(struct tlhThreadInfo *ti)
{
fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): starting\n",
ti->threadIdx);
EXPECT_NONNULL(ti->hdfs);
EXPECT_ZERO(doTestHdfsMiniStress(ti, 1));
EXPECT_ZERO(doTestHdfsMiniStress(ti, 0));
return 0;
}
static void testHdfsMiniStress(void *v)
{
struct tlhThreadInfo *ti = (struct tlhThreadInfo*)v;
int ret = testHdfsMiniStressImpl(ti);
ti->success = ret;
}
static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads)
{
int i, threadsFailed = 0;
const char *sep = "";
for (i = 0; i < tlhNumThreads; i++) {
if (ti[i].success != 0) {
threadsFailed = 1;
}
}
if (!threadsFailed) {
fprintf(stderr, "testLibHdfsMiniStress: all threads succeeded. SUCCESS.\n");
return EXIT_SUCCESS;
}
fprintf(stderr, "testLibHdfsMiniStress: some threads failed: [");
for (i = 0; i < tlhNumThreads; i++) {
if (ti[i].success != 0) {
fprintf(stderr, "%s%d", sep, i);
sep = ", ";
}
}
fprintf(stderr, "]. FAILURE.\n");
return EXIT_FAILURE;
}
/**
* Test intended to stress libhdfs client with concurrent requests. Currently focused
* on concurrent reads.
*/
int main(void)
{
int i, tlhNumThreads;
char *dirNm, *fileNm;
tSize fileSz;
const char *tlhNumThreadsStr, *tlhNumDNsStr;
hdfsFS hdfs = NULL;
struct NativeMiniDfsCluster* tlhCluster;
struct tlhThreadInfo ti[TLH_MAX_THREADS];
struct NativeMiniDfsConf conf = {
1, /* doFormat */
};
dirNm = "/tlhMiniStressData";
fileNm = "/tlhMiniStressData/file";
fileSz = 2*1024*1024;
tlhNumDNsStr = getenv("TLH_NUM_DNS");
if (!tlhNumDNsStr) {
tlhNumDNsStr = "1";
}
conf.numDataNodes = atoi(tlhNumDNsStr);
if ((conf.numDataNodes <= 0) || (conf.numDataNodes > TLH_MAX_DNS)) {
fprintf(stderr, "testLibHdfsMiniStress: must have a number of datanodes "
"between 1 and %d inclusive, not %d\n",
TLH_MAX_DNS, conf.numDataNodes);
return EXIT_FAILURE;
}
tlhNumThreadsStr = getenv("TLH_NUM_THREADS");
if (!tlhNumThreadsStr) {
tlhNumThreadsStr = "8";
}
tlhNumThreads = atoi(tlhNumThreadsStr);
if ((tlhNumThreads <= 0) || (tlhNumThreads > TLH_MAX_THREADS)) {
fprintf(stderr, "testLibHdfsMiniStress: must have a number of threads "
"between 1 and %d inclusive, not %d\n",
TLH_MAX_THREADS, tlhNumThreads);
return EXIT_FAILURE;
}
memset(&ti[0], 0, sizeof(ti));
for (i = 0; i < tlhNumThreads; i++) {
ti[i].threadIdx = i;
}
tlhCluster = nmdCreate(&conf);
EXPECT_NONNULL(tlhCluster);
EXPECT_ZERO(nmdWaitClusterUp(tlhCluster));
EXPECT_ZERO(hdfsNameNodeConnect(tlhCluster, &hdfs, NULL));
// Single threaded writes for now.
EXPECT_ZERO(hdfsWriteData(hdfs, dirNm, fileNm, fileSz));
// Multi-threaded reads.
for (i = 0; i < tlhNumThreads; i++) {
ti[i].theThread.start = testHdfsMiniStress;
ti[i].theThread.arg = &ti[i];
ti[i].hdfs = hdfs;
ti[i].fileNm = fileNm;
EXPECT_ZERO(threadCreate(&ti[i].theThread));
}
for (i = 0; i < tlhNumThreads; i++) {
EXPECT_ZERO(threadJoin(&ti[i].theThread));
}
EXPECT_ZERO(hdfsDisconnect(hdfs));
EXPECT_ZERO(nmdShutdown(tlhCluster));
nmdFree(tlhCluster);
return checkFailures(ti, tlhNumThreads);
}

View File

@ -48,10 +48,8 @@ public:
enum event_response_type { enum event_response_type {
kOk = 0, kOk = 0,
#ifndef NDEBUG
// Responses to be used in testing only // Responses to be used in testing only
kTest_Error = 100 kTest_Error = 100
#endif
}; };
@ -70,10 +68,9 @@ private:
// //
// Testing support // Testing support
// //
// If running a debug build, the consumer can stimulate errors // The consumer can stimulate errors
// within libhdfdspp by returning a Status from the callback. // within libhdfdspp by returning a Status from the callback.
/////////////////////////////////////////////// ///////////////////////////////////////////////
#ifndef NDEBUG
public: public:
static event_response test_err(const Status &status) { static event_response test_err(const Status &status) {
return event_response(status); return event_response(status);
@ -86,7 +83,6 @@ private:
response_(event_response_type::kTest_Error), error_status_(status) {} response_(event_response_type::kTest_Error), error_status_(status) {}
Status error_status_; // To be used with kTest_Error Status error_status_; // To be used with kTest_Error
#endif
}; };

View File

@ -235,9 +235,7 @@ extern const char * FILE_DN_WRITE_EVENT;
#define LIBHDFSPP_EVENT_OK (0) #define LIBHDFSPP_EVENT_OK (0)
#ifndef NDEBUG #define DEBUG_SIMULATE_ERROR (-1)
#define DEBUG_SIMULATE_ERROR (-1)
#endif
typedef int (*libhdfspp_fs_event_callback)(const char * event, const char * cluster, typedef int (*libhdfspp_fs_event_callback)(const char * event, const char * cluster,
int64_t value, int64_t cookie); int64_t value, int64_t cookie);

View File

@ -331,6 +331,9 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
Error(stat); Error(stat);
return nullptr; return nullptr;
} }
if (f && fileEventCallback) {
f->SetFileEventCallback(fileEventCallback.value());
}
return new hdfsFile_internal(f); return new hdfsFile_internal(f);
} catch (const std::exception & e) { } catch (const std::exception & e) {
ReportException(e); ReportException(e);
@ -959,7 +962,7 @@ event_response fs_callback_glue(libhdfspp_fs_event_callback handler,
if (result == LIBHDFSPP_EVENT_OK) { if (result == LIBHDFSPP_EVENT_OK) {
return event_response::ok(); return event_response::ok();
} }
#ifndef NDEBUG #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (result == DEBUG_SIMULATE_ERROR) { if (result == DEBUG_SIMULATE_ERROR) {
return event_response::test_err(Status::Error("Simulated error")); return event_response::test_err(Status::Error("Simulated error"));
} }
@ -978,7 +981,7 @@ event_response file_callback_glue(libhdfspp_file_event_callback handler,
if (result == LIBHDFSPP_EVENT_OK) { if (result == LIBHDFSPP_EVENT_OK) {
return event_response::ok(); return event_response::ok();
} }
#ifndef NDEBUG #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (result == DEBUG_SIMULATE_ERROR) { if (result == DEBUG_SIMULATE_ERROR) {
return event_response::test_err(Status::Error("Simulated error")); return event_response::test_err(Status::Error("Simulated error"));
} }

View File

@ -239,7 +239,7 @@ void FileHandleImpl::AsyncPreadSome(
// Wrap the DN in a block reader to handle the state and logic of the // Wrap the DN in a block reader to handle the state and logic of the
// block request protocol // block request protocol
std::shared_ptr<BlockReader> reader; std::shared_ptr<BlockReader> reader;
reader = CreateBlockReader(BlockReaderOptions(), dn); reader = CreateBlockReader(BlockReaderOptions(), dn, event_handlers_);
// Lambdas cannot capture copies of member variables so we'll make explicit // Lambdas cannot capture copies of member variables so we'll make explicit
// copies for it // copies for it
@ -248,8 +248,8 @@ void FileHandleImpl::AsyncPreadSome(
auto cluster_name = cluster_name_; auto cluster_name = cluster_name_;
auto read_handler = [reader, event_handlers, cluster_name, path, dn_id, handler](const Status & status, size_t transferred) { auto read_handler = [reader, event_handlers, cluster_name, path, dn_id, handler](const Status & status, size_t transferred) {
auto event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred); event_response event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred);
#ifndef NDEBUG #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (event_resp.response() == event_response::kTest_Error) { if (event_resp.response() == event_response::kTest_Error) {
handler(event_resp.status(), dn_id, transferred); handler(event_resp.status(), dn_id, transferred);
return; return;
@ -262,8 +262,8 @@ void FileHandleImpl::AsyncPreadSome(
auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name] auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name]
(Status status, std::shared_ptr<DataNodeConnection> dn) { (Status status, std::shared_ptr<DataNodeConnection> dn) {
(void)dn; (void)dn;
auto event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0); event_response event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0);
#ifndef NDEBUG #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (event_resp.response() == event_response::kTest_Error) { if (event_resp.response() == event_response::kTest_Error) {
status = event_resp.status(); status = event_resp.status();
} }
@ -284,9 +284,10 @@ void FileHandleImpl::AsyncPreadSome(
} }
std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options, std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options,
std::shared_ptr<DataNodeConnection> dn) std::shared_ptr<DataNodeConnection> dn,
std::shared_ptr<LibhdfsEvents> event_handlers)
{ {
std::shared_ptr<BlockReader> reader = std::make_shared<BlockReaderImpl>(options, dn, cancel_state_); std::shared_ptr<BlockReader> reader = std::make_shared<BlockReaderImpl>(options, dn, cancel_state_, event_handlers);
LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateBlockReader(" << FMT_THIS_ADDR LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateBlockReader(" << FMT_THIS_ADDR
<< ", ..., dnconn=" << dn.get() << ", ..., dnconn=" << dn.get()

View File

@ -119,7 +119,8 @@ public:
protected: protected:
virtual std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options, virtual std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
std::shared_ptr<DataNodeConnection> dn); std::shared_ptr<DataNodeConnection> dn,
std::shared_ptr<hdfs::LibhdfsEvents> event_handlers);
virtual std::shared_ptr<DataNodeConnection> CreateDataNodeConnection( virtual std::shared_ptr<DataNodeConnection> CreateDataNodeConnection(
::asio::io_service *io_service, ::asio::io_service *io_service,
const ::hadoop::hdfs::DatanodeInfoProto & dn, const ::hadoop::hdfs::DatanodeInfoProto & dn,

View File

@ -24,7 +24,6 @@
#include <future> #include <future>
namespace hdfs { namespace hdfs {
#define FMT_CONT_AND_PARENT_ADDR "this=" << (void*)this << ", parent=" << (void*)parent_ #define FMT_CONT_AND_PARENT_ADDR "this=" << (void*)this << ", parent=" << (void*)parent_
@ -105,7 +104,17 @@ void BlockReaderImpl::AsyncRequestBlock(
m->Run([this, handler, offset](const Status &status, const State &s) { Status stat = status; m->Run([this, handler, offset](const Status &status, const State &s) { Status stat = status;
if (stat.ok()) { if (stat.ok()) {
const auto &resp = s.response; const auto &resp = s.response;
if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
if(this->event_handlers_) {
event_response event_resp = this->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (stat.ok() && event_resp.response() == event_response::kTest_Error) {
stat = Status::Error("Test error");
}
#endif
}
if (stat.ok() && resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
if (resp.has_readopchecksuminfo()) { if (resp.has_readopchecksuminfo()) {
const auto &checksum_info = resp.readopchecksuminfo(); const auto &checksum_info = resp.readopchecksuminfo();
chunk_padding_bytes_ = offset - checksum_info.chunkoffset(); chunk_padding_bytes_ = offset - checksum_info.chunkoffset();
@ -162,6 +171,14 @@ struct BlockReaderImpl::ReadPacketHeader
assert(v && "Failed to parse the header"); assert(v && "Failed to parse the header");
parent_->state_ = kReadChecksum; parent_->state_ = kReadChecksum;
} }
if(parent_->event_handlers_) {
event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (status.ok() && event_resp.response() == event_response::kTest_Error) {
status = Status::Error("Test error");
}
#endif
}
next(status); next(status);
}; };
@ -214,7 +231,7 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation {
return; return;
} }
auto handler = [parent, next](const asio::error_code &ec, size_t) { auto handler = [parent, next, this](const asio::error_code &ec, size_t) {
Status status; Status status;
if (ec) { if (ec) {
status = Status(ec.value(), ec.message().c_str()); status = Status(ec.value(), ec.message().c_str());
@ -222,6 +239,14 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation {
parent->state_ = parent->state_ =
parent->chunk_padding_bytes_ ? kReadPadding : kReadData; parent->chunk_padding_bytes_ ? kReadPadding : kReadData;
} }
if(parent->event_handlers_) {
event_response event_resp = parent->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (status.ok() && event_resp.response() == event_response::kTest_Error) {
status = Status::Error("Test error");
}
#endif
}
next(status); next(status);
}; };
parent->checksum_.resize(parent->packet_len_ - sizeof(int) - parent->checksum_.resize(parent->packet_len_ - sizeof(int) -
@ -248,7 +273,6 @@ struct BlockReaderImpl::ReadData : continuation::Continuation {
virtual void Run(const Next &next) override { virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadData::Run(" LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadData::Run("
<< FMT_CONT_AND_PARENT_ADDR << ") called"); << FMT_CONT_AND_PARENT_ADDR << ") called");
auto handler = auto handler =
[next, this](const asio::error_code &ec, size_t transferred) { [next, this](const asio::error_code &ec, size_t transferred) {
Status status; Status status;
@ -261,6 +285,14 @@ struct BlockReaderImpl::ReadData : continuation::Continuation {
if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) { if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
parent_->state_ = kReadPacketHeader; parent_->state_ = kReadPacketHeader;
} }
if(parent_->event_handlers_) {
event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (status.ok() && event_resp.response() == event_response::kTest_Error) {
status = Status::Error("Test error");
}
#endif
}
next(status); next(status);
}; };
@ -292,13 +324,22 @@ struct BlockReaderImpl::ReadPadding : continuation::Continuation {
return; return;
} }
auto h = [next, this](const Status &status) { auto h = [next, this](const Status &stat) {
Status status = stat;
if (status.ok()) { if (status.ok()) {
assert(reinterpret_cast<const int &>(*bytes_transferred_) == assert(reinterpret_cast<const int &>(*bytes_transferred_) ==
parent_->chunk_padding_bytes_); parent_->chunk_padding_bytes_);
parent_->chunk_padding_bytes_ = 0; parent_->chunk_padding_bytes_ = 0;
parent_->state_ = kReadData; parent_->state_ = kReadData;
} }
if(parent_->event_handlers_) {
event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (status.ok() && event_resp.response() == event_response::kTest_Error) {
status = Status::Error("Test error");
}
#endif
}
next(status); next(status);
}; };
read_data_->Run(h); read_data_->Run(h);
@ -334,11 +375,20 @@ struct BlockReaderImpl::AckRead : continuation::Continuation {
m->Push( m->Push(
continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state())); continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state()));
m->Run([this, next](const Status &status, m->Run([this, next](const Status &stat,
const hadoop::hdfs::ClientReadStatusProto &) { const hadoop::hdfs::ClientReadStatusProto &) {
Status status = stat;
if (status.ok()) { if (status.ok()) {
parent_->state_ = BlockReaderImpl::kFinished; parent_->state_ = BlockReaderImpl::kFinished;
} }
if(parent_->event_handlers_) {
event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (status.ok() && event_resp.response() == event_response::kTest_Error) {
status = Status::Error("Test error");
}
#endif
}
next(status); next(status);
}); });
} }

View File

@ -93,9 +93,9 @@ class BlockReaderImpl
: public BlockReader, public std::enable_shared_from_this<BlockReaderImpl> { : public BlockReader, public std::enable_shared_from_this<BlockReaderImpl> {
public: public:
explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection> dn, explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection> dn,
CancelHandle cancel_state) CancelHandle cancel_state, std::shared_ptr<LibhdfsEvents> event_handlers=nullptr)
: dn_(dn), state_(kOpen), options_(options), : dn_(dn), state_(kOpen), options_(options),
chunk_padding_bytes_(0), cancel_state_(cancel_state) {} chunk_padding_bytes_(0), cancel_state_(cancel_state), event_handlers_(event_handlers.get()) {}
virtual void AsyncReadPacket( virtual void AsyncReadPacket(
const MutableBuffers &buffers, const MutableBuffers &buffers,
@ -152,6 +152,7 @@ private:
long long bytes_to_read_; long long bytes_to_read_;
std::vector<char> checksum_; std::vector<char> checksum_;
CancelHandle cancel_state_; CancelHandle cancel_state_;
LibhdfsEvents* event_handlers_;
}; };
} }

View File

@ -274,9 +274,18 @@ void RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) {
} }
Status status; Status status;
if (h.has_exceptionclassname()) { if(event_handlers_) {
event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0);
#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (event_resp.response() == event_response::kTest_Error) {
status = event_resp.status();
}
#endif
}
if (status.ok() && h.has_exceptionclassname()) {
status = status =
Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str()); Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str());
} }
io_service().post([req, response, status]() { io_service().post([req, response, status]() {

View File

@ -30,6 +30,8 @@
#include <asio/read.hpp> #include <asio/read.hpp>
#include <asio/write.hpp> #include <asio/write.hpp>
#include <system_error>
namespace hdfs { namespace hdfs {
template <class NextLayer> template <class NextLayer>
@ -72,8 +74,9 @@ RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine)
: RpcConnection(engine), : RpcConnection(engine),
options_(engine->options()), options_(engine->options()),
next_layer_(engine->io_service()), next_layer_(engine->io_service()),
connect_timer_(engine->io_service()) { connect_timer_(engine->io_service())
LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called"); {
LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this);
} }
template <class NextLayer> template <class NextLayer>
@ -87,7 +90,6 @@ RpcConnectionImpl<NextLayer>::~RpcConnectionImpl() {
LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue"); LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue");
} }
template <class NextLayer> template <class NextLayer>
void RpcConnectionImpl<NextLayer>::Connect( void RpcConnectionImpl<NextLayer>::Connect(
const std::vector<::asio::ip::tcp::endpoint> &server, const std::vector<::asio::ip::tcp::endpoint> &server,
@ -171,8 +173,8 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec,
Status status = ToStatus(ec); Status status = ToStatus(ec);
if(event_handlers_) { if(event_handlers_) {
auto event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0); event_response event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0);
#ifndef NDEBUG #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (event_resp.response() == event_response::kTest_Error) { if (event_resp.response() == event_response::kTest_Error) {
status = event_resp.status(); status = event_resp.status();
} }
@ -349,27 +351,28 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
template <class NextLayer> template <class NextLayer>
void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &asio_ec, void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &original_ec,
size_t) { size_t) {
using std::placeholders::_1; using std::placeholders::_1;
using std::placeholders::_2; using std::placeholders::_2;
std::lock_guard<std::mutex> state_lock(connection_state_lock_); std::lock_guard<std::mutex> state_lock(connection_state_lock_);
::asio::error_code my_ec(original_ec);
LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called"); LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called");
std::shared_ptr<RpcConnection> shared_this = shared_from_this(); std::shared_ptr<RpcConnection> shared_this = shared_from_this();
::asio::error_code ec = asio_ec;
if(event_handlers_) { if(event_handlers_) {
auto event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0); event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0);
#ifndef NDEBUG #ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED
if (event_resp.response() == event_response::kTest_Error) { if (event_resp.response() == event_response::kTest_Error) {
ec = std::make_error_code(std::errc::network_down); my_ec = std::make_error_code(std::errc::network_down);
} }
#endif #endif
} }
switch (ec.value()) { switch (my_ec.value()) {
case 0: case 0:
// No errors // No errors
break; break;
@ -377,8 +380,8 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &asi
// The event loop has been shut down. Ignore the error. // The event loop has been shut down. Ignore the error.
return; return;
default: default:
LOG_WARN(kRPC, << "Network error during RPC read: " << ec.message()); LOG_WARN(kRPC, << "Network error during RPC read: " << my_ec.message());
CommsError(ToStatus(ec)); CommsError(ToStatus(my_ec));
return; return;
} }

View File

@ -230,7 +230,6 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
std::shared_ptr<LibhdfsEvents> event_handlers_; std::shared_ptr<LibhdfsEvents> event_handlers_;
std::string cluster_name_; std::string cluster_name_;
// Lock for mutable parts of this class that need to be thread safe // Lock for mutable parts of this class that need to be thread safe
std::mutex connection_state_lock_; std::mutex connection_state_lock_;

View File

@ -22,6 +22,9 @@ set (LIBHDFS_TESTS_DIR ../../libhdfs-tests)
set (LIBHDFSPP_SRC_DIR ..) set (LIBHDFSPP_SRC_DIR ..)
set (LIBHDFSPP_LIB_DIR ${LIBHDFSPP_SRC_DIR}/lib) set (LIBHDFSPP_LIB_DIR ${LIBHDFSPP_SRC_DIR}/lib)
set (LIBHDFSPP_BINDING_C ${LIBHDFSPP_LIB_DIR}/bindings/c) set (LIBHDFSPP_BINDING_C ${LIBHDFSPP_LIB_DIR}/bindings/c)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-missing-field-initializers")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-missing-field-initializers")
include_directories( include_directories(
${GENERATED_JAVAH} ${GENERATED_JAVAH}
${CMAKE_CURRENT_LIST_DIR} ${CMAKE_CURRENT_LIST_DIR}
@ -138,6 +141,10 @@ build_libhdfs_test(hdfspp_mini_dfs_smoke hdfspp_test_shim_static ${CMAKE_CURRENT
link_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static fs reader rpc proto common connection gmock_main ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES}) link_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static fs reader rpc proto common connection gmock_main ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES})
add_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static) add_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static)
build_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static expect.c test_libhdfs_mini_stress.c ${OS_DIR}/thread.c)
link_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static fs reader rpc proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES})
add_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static)
build_libhdfs_test(hdfs_ext hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hdfs_ext_test.cc) build_libhdfs_test(hdfs_ext hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hdfs_ext_test.cc)
link_libhdfs_test (hdfs_ext hdfspp_test_shim_static hdfspp_static gmock_main native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES}) link_libhdfs_test (hdfs_ext hdfspp_test_shim_static hdfspp_static gmock_main native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES})
add_libhdfs_test (hdfs_ext hdfspp_test_shim_static) add_libhdfs_test (hdfs_ext hdfspp_test_shim_static)

View File

@ -93,9 +93,10 @@ public:
std::shared_ptr<MockReader> mock_reader_ = std::make_shared<MockReader>(); std::shared_ptr<MockReader> mock_reader_ = std::make_shared<MockReader>();
protected: protected:
std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options, std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
std::shared_ptr<DataNodeConnection> dn) override std::shared_ptr<DataNodeConnection> dn,
std::shared_ptr<hdfs::LibhdfsEvents> event_handlers) override
{ {
(void) options; (void) dn; (void) options; (void) dn; (void) event_handlers;
assert(mock_reader_); assert(mock_reader_);
return mock_reader_; return mock_reader_;
} }

View File

@ -386,7 +386,7 @@ TEST(RpcEngineTest, TestEventCallbacks)
}); });
io_service.run(); io_service.run();
ASSERT_TRUE(complete); ASSERT_TRUE(complete);
ASSERT_EQ(7, callbacks.size()); ASSERT_EQ(8, callbacks.size());
ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error
ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[1]); // reconnect ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[1]); // reconnect
ASSERT_EQ(FS_NN_READ_EVENT, callbacks[2]); // makes an error ASSERT_EQ(FS_NN_READ_EVENT, callbacks[2]); // makes an error