Revert HDFS-9890. Needs some more work to reflect the current status of HDFS-8707. Done by James Clampffer
This commit is contained in:
parent
11b68ecebe
commit
d64bbf983e
|
@ -147,7 +147,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
<mkdir dir="${project.build.directory}/native"/>
|
<mkdir dir="${project.build.directory}/native"/>
|
||||||
<exec executable="cmake" dir="${project.build.directory}/native"
|
<exec executable="cmake" dir="${project.build.directory}/native"
|
||||||
failonerror="true">
|
failonerror="true">
|
||||||
<arg line="${basedir}/src/ -DCMAKE_BUILD_TYPE=Debug -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model} -DHADOOP_BUILD=1 -DREQUIRE_FUSE=${require.fuse} -DREQUIRE_VALGRIND=${require.valgrind} -A '${env.PLATFORM}'"/>
|
<arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model} -DHADOOP_BUILD=1 -DREQUIRE_FUSE=${require.fuse} -DREQUIRE_VALGRIND=${require.valgrind} -A '${env.PLATFORM}'"/>
|
||||||
<arg line="${native_cmake_args}"/>
|
<arg line="${native_cmake_args}"/>
|
||||||
</exec>
|
</exec>
|
||||||
<exec executable="msbuild" dir="${project.build.directory}/native"
|
<exec executable="msbuild" dir="${project.build.directory}/native"
|
||||||
|
@ -208,7 +208,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
<target>
|
<target>
|
||||||
<mkdir dir="${project.build.directory}"/>
|
<mkdir dir="${project.build.directory}"/>
|
||||||
<exec executable="cmake" dir="${project.build.directory}" failonerror="true">
|
<exec executable="cmake" dir="${project.build.directory}" failonerror="true">
|
||||||
<arg line="${basedir}/src/ -DCMAKE_BUILD_TYPE=Debug -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model} -DHADOOP_BUILD=1 -DREQUIRE_LIBWEBHDFS=${require.libwebhdfs} -DREQUIRE_FUSE=${require.fuse} -DREQUIRE_VALGRIND=${require.valgrind} "/>
|
<arg line="${basedir}/src/ -DGENERATED_JAVAH=${project.build.directory}/native/javah -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model} -DHADOOP_BUILD=1 -DREQUIRE_LIBWEBHDFS=${require.libwebhdfs} -DREQUIRE_FUSE=${require.fuse} -DREQUIRE_VALGRIND=${require.valgrind} "/>
|
||||||
<arg line="${native_cmake_args}"/>
|
<arg line="${native_cmake_args}"/>
|
||||||
</exec>
|
</exec>
|
||||||
<exec executable="make" dir="${project.build.directory}" failonerror="true">
|
<exec executable="make" dir="${project.build.directory}" failonerror="true">
|
||||||
|
|
|
@ -182,16 +182,6 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
|
||||||
}
|
}
|
||||||
(*env)->DeleteLocalRef(env, val.l);
|
(*env)->DeleteLocalRef(env, val.l);
|
||||||
}
|
}
|
||||||
if (conf->numDataNodes) {
|
|
||||||
jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
|
|
||||||
"numDataNodes", "(I)L" MINIDFS_CLUSTER_BUILDER ";", conf->numDataNodes);
|
|
||||||
if (jthr) {
|
|
||||||
printExceptionAndFree(env, jthr, PRINT_EXC_ALL, "nmdCreate: "
|
|
||||||
"Builder::numDataNodes");
|
|
||||||
goto error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
(*env)->DeleteLocalRef(env, val.l);
|
|
||||||
jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
|
jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
|
||||||
"build", "()L" MINIDFS_CLUSTER ";");
|
"build", "()L" MINIDFS_CLUSTER ";");
|
||||||
if (jthr) {
|
if (jthr) {
|
||||||
|
|
|
@ -51,11 +51,6 @@ struct NativeMiniDfsConf {
|
||||||
* Nonzero if we should configure short circuit.
|
* Nonzero if we should configure short circuit.
|
||||||
*/
|
*/
|
||||||
jboolean configureShortCircuit;
|
jboolean configureShortCircuit;
|
||||||
|
|
||||||
/**
|
|
||||||
* The number of datanodes in MiniDfsCluster
|
|
||||||
*/
|
|
||||||
jint numDataNodes;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1,338 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "expect.h"
|
|
||||||
#include "hdfs/hdfs.h"
|
|
||||||
#include "hdfspp/hdfs_ext.h"
|
|
||||||
#include "native_mini_dfs.h"
|
|
||||||
#include "os/thread.h"
|
|
||||||
|
|
||||||
#include <errno.h>
|
|
||||||
#include <inttypes.h>
|
|
||||||
#include <stdint.h>
|
|
||||||
#include <stdio.h>
|
|
||||||
#include <stdlib.h>
|
|
||||||
#include <string.h>
|
|
||||||
|
|
||||||
#define TO_STR_HELPER(X) #X
|
|
||||||
#define TO_STR(X) TO_STR_HELPER(X)
|
|
||||||
|
|
||||||
#define TLH_MAX_THREADS 10000
|
|
||||||
|
|
||||||
#define TLH_MAX_DNS 16
|
|
||||||
|
|
||||||
#define TLH_DEFAULT_BLOCK_SIZE 1048576
|
|
||||||
|
|
||||||
#define TLH_DEFAULT_DFS_REPLICATION 3
|
|
||||||
|
|
||||||
#define TLH_DEFAULT_IPC_CLIENT_CONNECT_MAX_RETRIES 100
|
|
||||||
|
|
||||||
#define TLH_DEFAULT_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS 5
|
|
||||||
|
|
||||||
#ifndef RANDOM_ERROR_RATIO
|
|
||||||
#define RANDOM_ERROR_RATIO 1000000000
|
|
||||||
#endif
|
|
||||||
|
|
||||||
struct tlhThreadInfo {
|
|
||||||
/** Thread index */
|
|
||||||
int threadIdx;
|
|
||||||
/** 0 = thread was successful; error code otherwise */
|
|
||||||
int success;
|
|
||||||
/** thread identifier */
|
|
||||||
thread theThread;
|
|
||||||
/** fs, shared with other threads **/
|
|
||||||
hdfsFS hdfs;
|
|
||||||
/** Filename */
|
|
||||||
const char *fileNm;
|
|
||||||
|
|
||||||
};
|
|
||||||
|
|
||||||
static int hdfsNameNodeConnect(struct NativeMiniDfsCluster *cl, hdfsFS *fs,
|
|
||||||
const char *username)
|
|
||||||
{
|
|
||||||
int ret;
|
|
||||||
tPort port;
|
|
||||||
hdfsFS hdfs;
|
|
||||||
struct hdfsBuilder *bld;
|
|
||||||
|
|
||||||
port = (tPort)nmdGetNameNodePort(cl);
|
|
||||||
if (port < 0) {
|
|
||||||
fprintf(stderr, "hdfsNameNodeConnect: nmdGetNameNodePort "
|
|
||||||
"returned error %d\n", port);
|
|
||||||
return port;
|
|
||||||
}
|
|
||||||
bld = hdfsNewBuilder();
|
|
||||||
if (!bld)
|
|
||||||
return -ENOMEM;
|
|
||||||
hdfsBuilderSetForceNewInstance(bld);
|
|
||||||
hdfsBuilderSetNameNode(bld, "localhost");
|
|
||||||
hdfsBuilderSetNameNodePort(bld, port);
|
|
||||||
hdfsBuilderConfSetStr(bld, "dfs.block.size",
|
|
||||||
TO_STR(TLH_DEFAULT_BLOCK_SIZE));
|
|
||||||
hdfsBuilderConfSetStr(bld, "dfs.blocksize",
|
|
||||||
TO_STR(TLH_DEFAULT_BLOCK_SIZE));
|
|
||||||
hdfsBuilderConfSetStr(bld, "dfs.replication",
|
|
||||||
TO_STR(TLH_DEFAULT_DFS_REPLICATION));
|
|
||||||
hdfsBuilderConfSetStr(bld, "ipc.client.connect.max.retries",
|
|
||||||
TO_STR(TLH_DEFAULT_IPC_CLIENT_CONNECT_MAX_RETRIES));
|
|
||||||
hdfsBuilderConfSetStr(bld, "ipc.client.connect.retry.interval",
|
|
||||||
TO_STR(TLH_DEFAULT_IPC_CLIENT_CONNECT_RETRY_INTERVAL_MS));
|
|
||||||
if (username) {
|
|
||||||
hdfsBuilderSetUserName(bld, username);
|
|
||||||
}
|
|
||||||
hdfs = hdfsBuilderConnect(bld);
|
|
||||||
if (!hdfs) {
|
|
||||||
ret = -errno;
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
*fs = hdfs;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int hdfsWriteData(hdfsFS hdfs, const char *dirNm,
|
|
||||||
const char *fileNm, tSize fileSz)
|
|
||||||
{
|
|
||||||
hdfsFile file;
|
|
||||||
int ret, expected;
|
|
||||||
const char *content;
|
|
||||||
|
|
||||||
content = fileNm;
|
|
||||||
|
|
||||||
if (hdfsExists(hdfs, dirNm) == 0) {
|
|
||||||
EXPECT_ZERO(hdfsDelete(hdfs, dirNm, 1));
|
|
||||||
}
|
|
||||||
EXPECT_ZERO(hdfsCreateDirectory(hdfs, dirNm));
|
|
||||||
|
|
||||||
file = hdfsOpenFile(hdfs, fileNm, O_WRONLY, 0, 0, 0);
|
|
||||||
EXPECT_NONNULL(file);
|
|
||||||
|
|
||||||
expected = (int)strlen(content);
|
|
||||||
tSize sz = 0;
|
|
||||||
while (sz < fileSz) {
|
|
||||||
ret = hdfsWrite(hdfs, file, content, expected);
|
|
||||||
if (ret < 0) {
|
|
||||||
ret = errno;
|
|
||||||
fprintf(stderr, "hdfsWrite failed and set errno %d\n", ret);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
if (ret != expected) {
|
|
||||||
fprintf(stderr, "hdfsWrite was supposed to write %d bytes, but "
|
|
||||||
"it wrote %d\n", ret, expected);
|
|
||||||
return EIO;
|
|
||||||
}
|
|
||||||
sz += ret;
|
|
||||||
}
|
|
||||||
EXPECT_ZERO(hdfsFlush(hdfs, file));
|
|
||||||
EXPECT_ZERO(hdfsHSync(hdfs, file));
|
|
||||||
EXPECT_ZERO(hdfsCloseFile(hdfs, file));
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int fileEventCallback(const char * event, const char * cluster, const char * file, int64_t value, int64_t cookie)
|
|
||||||
{
|
|
||||||
char * randomErrRatioStr = getenv("RANDOM_ERROR_RATIO");
|
|
||||||
int64_t randomErrRatio = RANDOM_ERROR_RATIO;
|
|
||||||
if (randomErrRatioStr) randomErrRatio = (int64_t)atoi(randomErrRatioStr);
|
|
||||||
if (randomErrRatio == 0) return DEBUG_SIMULATE_ERROR;
|
|
||||||
else if (randomErrRatio < 0) return LIBHDFSPP_EVENT_OK;
|
|
||||||
return random() % randomErrRatio == 0 ? DEBUG_SIMULATE_ERROR : LIBHDFSPP_EVENT_OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int doTestHdfsMiniStress(struct tlhThreadInfo *ti)
|
|
||||||
{
|
|
||||||
char tmp[4096];
|
|
||||||
hdfsFile file;
|
|
||||||
int ret, expected;
|
|
||||||
hdfsFileInfo *fileInfo;
|
|
||||||
uint64_t readOps, nErrs=0;
|
|
||||||
tOffset seekPos;
|
|
||||||
const char *content;
|
|
||||||
|
|
||||||
content = ti->fileNm;
|
|
||||||
expected = (int)strlen(content);
|
|
||||||
|
|
||||||
fileInfo = hdfsGetPathInfo(ti->hdfs, ti->fileNm);
|
|
||||||
EXPECT_NONNULL(fileInfo);
|
|
||||||
|
|
||||||
file = hdfsOpenFile(ti->hdfs, ti->fileNm, O_RDONLY, 0, 0, 0);
|
|
||||||
EXPECT_NONNULL(file);
|
|
||||||
|
|
||||||
libhdfspp_file_event_callback callback = &fileEventCallback;
|
|
||||||
|
|
||||||
hdfsPreAttachFileMonitor(callback, 0);
|
|
||||||
|
|
||||||
fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): starting read loop\n",
|
|
||||||
ti->threadIdx);
|
|
||||||
for (readOps=0; readOps < 1000; ++readOps) {
|
|
||||||
EXPECT_ZERO(hdfsCloseFile(ti->hdfs, file));
|
|
||||||
file = hdfsOpenFile(ti->hdfs, ti->fileNm, O_RDONLY, 0, 0, 0);
|
|
||||||
EXPECT_NONNULL(file);
|
|
||||||
seekPos = (((double)random()) / RAND_MAX) * (fileInfo->mSize - expected);
|
|
||||||
seekPos = (seekPos / expected) * expected;
|
|
||||||
ret = hdfsSeek(ti->hdfs, file, seekPos);
|
|
||||||
if (ret < 0) {
|
|
||||||
ret = errno;
|
|
||||||
fprintf(stderr, "hdfsSeek to %"PRIu64" failed and set"
|
|
||||||
" errno %d\n", seekPos, ret);
|
|
||||||
++nErrs;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
ret = hdfsRead(ti->hdfs, file, tmp, expected);
|
|
||||||
if (ret < 0) {
|
|
||||||
ret = errno;
|
|
||||||
fprintf(stderr, "hdfsRead failed and set errno %d\n", ret);
|
|
||||||
++nErrs;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (ret != expected) {
|
|
||||||
fprintf(stderr, "hdfsRead was supposed to read %d bytes, but "
|
|
||||||
"it read %d\n", ret, expected);
|
|
||||||
++nErrs;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
ret = memcmp(content, tmp, expected);
|
|
||||||
if (ret) {
|
|
||||||
fprintf(stderr, "hdfsRead result (%.*s) does not match expected (%.*s)",
|
|
||||||
expected, tmp, expected, content);
|
|
||||||
++nErrs;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
EXPECT_ZERO(hdfsCloseFile(ti->hdfs, file));
|
|
||||||
fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): finished read loop\n",
|
|
||||||
ti->threadIdx);
|
|
||||||
EXPECT_ZERO(nErrs);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int testHdfsMiniStressImpl(struct tlhThreadInfo *ti)
|
|
||||||
{
|
|
||||||
fprintf(stderr, "testHdfsMiniStress(threadIdx=%d): starting\n",
|
|
||||||
ti->threadIdx);
|
|
||||||
EXPECT_NONNULL(ti->hdfs);
|
|
||||||
EXPECT_ZERO(doTestHdfsMiniStress(ti));
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void testHdfsMiniStress(void *v)
|
|
||||||
{
|
|
||||||
struct tlhThreadInfo *ti = (struct tlhThreadInfo*)v;
|
|
||||||
int ret = testHdfsMiniStressImpl(ti);
|
|
||||||
ti->success = ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads)
|
|
||||||
{
|
|
||||||
int i, threadsFailed = 0;
|
|
||||||
const char *sep = "";
|
|
||||||
|
|
||||||
for (i = 0; i < tlhNumThreads; i++) {
|
|
||||||
if (ti[i].success != 0) {
|
|
||||||
threadsFailed = 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!threadsFailed) {
|
|
||||||
fprintf(stderr, "testLibHdfsMiniStress: all threads succeeded. SUCCESS.\n");
|
|
||||||
return EXIT_SUCCESS;
|
|
||||||
}
|
|
||||||
fprintf(stderr, "testLibHdfsMiniStress: some threads failed: [");
|
|
||||||
for (i = 0; i < tlhNumThreads; i++) {
|
|
||||||
if (ti[i].success != 0) {
|
|
||||||
fprintf(stderr, "%s%d", sep, i);
|
|
||||||
sep = ", ";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fprintf(stderr, "]. FAILURE.\n");
|
|
||||||
return EXIT_FAILURE;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test intended to stress libhdfs client with concurrent requests. Currently focused
|
|
||||||
* on concurrent reads.
|
|
||||||
*/
|
|
||||||
int main(void)
|
|
||||||
{
|
|
||||||
int i, tlhNumThreads;
|
|
||||||
char *dirNm, *fileNm;
|
|
||||||
tSize fileSz;
|
|
||||||
const char *tlhNumThreadsStr, *tlhNumDNsStr;
|
|
||||||
hdfsFS hdfs = NULL;
|
|
||||||
struct NativeMiniDfsCluster* tlhCluster;
|
|
||||||
struct tlhThreadInfo ti[TLH_MAX_THREADS];
|
|
||||||
struct NativeMiniDfsConf conf = {
|
|
||||||
1, /* doFormat */
|
|
||||||
};
|
|
||||||
|
|
||||||
dirNm = "/tlhMiniStressData";
|
|
||||||
fileNm = "/tlhMiniStressData/file";
|
|
||||||
fileSz = 2*1024*1024;
|
|
||||||
|
|
||||||
tlhNumDNsStr = getenv("TLH_NUM_DNS");
|
|
||||||
if (!tlhNumDNsStr) {
|
|
||||||
tlhNumDNsStr = "1";
|
|
||||||
}
|
|
||||||
conf.numDataNodes = atoi(tlhNumDNsStr);
|
|
||||||
if ((conf.numDataNodes <= 0) || (conf.numDataNodes > TLH_MAX_DNS)) {
|
|
||||||
fprintf(stderr, "testLibHdfsMiniStress: must have a number of datanodes "
|
|
||||||
"between 1 and %d inclusive, not %d\n",
|
|
||||||
TLH_MAX_DNS, conf.numDataNodes);
|
|
||||||
return EXIT_FAILURE;
|
|
||||||
}
|
|
||||||
|
|
||||||
tlhNumThreadsStr = getenv("TLH_NUM_THREADS");
|
|
||||||
if (!tlhNumThreadsStr) {
|
|
||||||
tlhNumThreadsStr = "8";
|
|
||||||
}
|
|
||||||
tlhNumThreads = atoi(tlhNumThreadsStr);
|
|
||||||
if ((tlhNumThreads <= 0) || (tlhNumThreads > TLH_MAX_THREADS)) {
|
|
||||||
fprintf(stderr, "testLibHdfsMiniStress: must have a number of threads "
|
|
||||||
"between 1 and %d inclusive, not %d\n",
|
|
||||||
TLH_MAX_THREADS, tlhNumThreads);
|
|
||||||
return EXIT_FAILURE;
|
|
||||||
}
|
|
||||||
memset(&ti[0], 0, sizeof(ti));
|
|
||||||
for (i = 0; i < tlhNumThreads; i++) {
|
|
||||||
ti[i].threadIdx = i;
|
|
||||||
}
|
|
||||||
|
|
||||||
tlhCluster = nmdCreate(&conf);
|
|
||||||
EXPECT_NONNULL(tlhCluster);
|
|
||||||
EXPECT_ZERO(nmdWaitClusterUp(tlhCluster));
|
|
||||||
|
|
||||||
EXPECT_ZERO(hdfsNameNodeConnect(tlhCluster, &hdfs, NULL));
|
|
||||||
|
|
||||||
// Single threaded writes for now.
|
|
||||||
EXPECT_ZERO(hdfsWriteData(hdfs, dirNm, fileNm, fileSz));
|
|
||||||
|
|
||||||
// Multi-threaded reads.
|
|
||||||
for (i = 0; i < tlhNumThreads; i++) {
|
|
||||||
ti[i].theThread.start = testHdfsMiniStress;
|
|
||||||
ti[i].theThread.arg = &ti[i];
|
|
||||||
ti[i].hdfs = hdfs;
|
|
||||||
ti[i].fileNm = fileNm;
|
|
||||||
EXPECT_ZERO(threadCreate(&ti[i].theThread));
|
|
||||||
}
|
|
||||||
for (i = 0; i < tlhNumThreads; i++) {
|
|
||||||
EXPECT_ZERO(threadJoin(&ti[i].theThread));
|
|
||||||
}
|
|
||||||
|
|
||||||
EXPECT_ZERO(hdfsDisconnect(hdfs));
|
|
||||||
EXPECT_ZERO(nmdShutdown(tlhCluster));
|
|
||||||
nmdFree(tlhCluster);
|
|
||||||
return checkFailures(ti, tlhNumThreads);
|
|
||||||
}
|
|
|
@ -298,9 +298,6 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
|
||||||
Error(stat);
|
Error(stat);
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
if (f && fileEventCallback) {
|
|
||||||
f->SetFileEventCallback(fileEventCallback.value());
|
|
||||||
}
|
|
||||||
return new hdfsFile_internal(f);
|
return new hdfsFile_internal(f);
|
||||||
} catch (const std::exception & e) {
|
} catch (const std::exception & e) {
|
||||||
ReportException(e);
|
ReportException(e);
|
||||||
|
|
|
@ -231,7 +231,7 @@ void FileHandleImpl::AsyncPreadSome(
|
||||||
// Wrap the DN in a block reader to handle the state and logic of the
|
// Wrap the DN in a block reader to handle the state and logic of the
|
||||||
// block request protocol
|
// block request protocol
|
||||||
std::shared_ptr<BlockReader> reader;
|
std::shared_ptr<BlockReader> reader;
|
||||||
reader = CreateBlockReader(BlockReaderOptions(), dn, event_handlers_);
|
reader = CreateBlockReader(BlockReaderOptions(), dn);
|
||||||
|
|
||||||
// Lambdas cannot capture copies of member variables so we'll make explicit
|
// Lambdas cannot capture copies of member variables so we'll make explicit
|
||||||
// copies for it
|
// copies for it
|
||||||
|
@ -240,7 +240,7 @@ void FileHandleImpl::AsyncPreadSome(
|
||||||
auto cluster_name = cluster_name_;
|
auto cluster_name = cluster_name_;
|
||||||
|
|
||||||
auto read_handler = [reader, event_handlers, cluster_name, path, dn_id, handler](const Status & status, size_t transferred) {
|
auto read_handler = [reader, event_handlers, cluster_name, path, dn_id, handler](const Status & status, size_t transferred) {
|
||||||
event_response event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred);
|
auto event_resp = event_handlers->call(FILE_DN_READ_EVENT, cluster_name.c_str(), path.c_str(), transferred);
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
if (event_resp.response() == event_response::kTest_Error) {
|
if (event_resp.response() == event_response::kTest_Error) {
|
||||||
handler(event_resp.status(), dn_id, transferred);
|
handler(event_resp.status(), dn_id, transferred);
|
||||||
|
@ -254,7 +254,7 @@ void FileHandleImpl::AsyncPreadSome(
|
||||||
auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name]
|
auto connect_handler = [handler,event_handlers,cluster_name,path,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name]
|
||||||
(Status status, std::shared_ptr<DataNodeConnection> dn) {
|
(Status status, std::shared_ptr<DataNodeConnection> dn) {
|
||||||
(void)dn;
|
(void)dn;
|
||||||
event_response event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0);
|
auto event_resp = event_handlers->call(FILE_DN_CONNECT_EVENT, cluster_name.c_str(), path.c_str(), 0);
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
if (event_resp.response() == event_response::kTest_Error) {
|
if (event_resp.response() == event_response::kTest_Error) {
|
||||||
status = event_resp.status();
|
status = event_resp.status();
|
||||||
|
@ -276,10 +276,9 @@ void FileHandleImpl::AsyncPreadSome(
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options,
|
std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options,
|
||||||
std::shared_ptr<DataNodeConnection> dn,
|
std::shared_ptr<DataNodeConnection> dn)
|
||||||
std::shared_ptr<LibhdfsEvents> event_handlers)
|
|
||||||
{
|
{
|
||||||
std::shared_ptr<BlockReader> reader = std::make_shared<BlockReaderImpl>(options, dn, cancel_state_, event_handlers);
|
std::shared_ptr<BlockReader> reader = std::make_shared<BlockReaderImpl>(options, dn, cancel_state_);
|
||||||
|
|
||||||
LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateBlockReader(" << FMT_THIS_ADDR
|
LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateBlockReader(" << FMT_THIS_ADDR
|
||||||
<< ", ..., dnconn=" << dn.get()
|
<< ", ..., dnconn=" << dn.get()
|
||||||
|
|
|
@ -119,8 +119,7 @@ public:
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
|
virtual std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
|
||||||
std::shared_ptr<DataNodeConnection> dn,
|
std::shared_ptr<DataNodeConnection> dn);
|
||||||
std::shared_ptr<hdfs::LibhdfsEvents> event_handlers);
|
|
||||||
virtual std::shared_ptr<DataNodeConnection> CreateDataNodeConnection(
|
virtual std::shared_ptr<DataNodeConnection> CreateDataNodeConnection(
|
||||||
::asio::io_service *io_service,
|
::asio::io_service *io_service,
|
||||||
const ::hadoop::hdfs::DatanodeInfoProto & dn,
|
const ::hadoop::hdfs::DatanodeInfoProto & dn,
|
||||||
|
|
|
@ -96,7 +96,7 @@ FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_n
|
||||||
io_service = nullptr;
|
io_service = nullptr;
|
||||||
|
|
||||||
/* spawn background threads for asio delegation */
|
/* spawn background threads for asio delegation */
|
||||||
unsigned int threads = 2 /* options.io_threads_, pending HDFS-9117 */;
|
unsigned int threads = 1 /* options.io_threads_, pending HDFS-9117 */;
|
||||||
for (unsigned int i = 0; i < threads; i++) {
|
for (unsigned int i = 0; i < threads; i++) {
|
||||||
AddWorkerThread();
|
AddWorkerThread();
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
|
|
||||||
#include <future>
|
#include <future>
|
||||||
|
|
||||||
|
|
||||||
namespace hdfs {
|
namespace hdfs {
|
||||||
|
|
||||||
#define FMT_CONT_AND_PARENT_ADDR "this=" << (void*)this << ", parent=" << (void*)parent_
|
#define FMT_CONT_AND_PARENT_ADDR "this=" << (void*)this << ", parent=" << (void*)parent_
|
||||||
|
@ -104,17 +105,7 @@ void BlockReaderImpl::AsyncRequestBlock(
|
||||||
m->Run([this, handler, offset](const Status &status, const State &s) { Status stat = status;
|
m->Run([this, handler, offset](const Status &status, const State &s) { Status stat = status;
|
||||||
if (stat.ok()) {
|
if (stat.ok()) {
|
||||||
const auto &resp = s.response;
|
const auto &resp = s.response;
|
||||||
|
if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
|
||||||
if(this->event_handlers_) {
|
|
||||||
event_response event_resp = this->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
|
|
||||||
#ifndef NDEBUG
|
|
||||||
if (stat.ok() && event_resp.response() == event_response::kTest_Error) {
|
|
||||||
stat = Status::Error("Test error");
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
if (stat.ok() && resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
|
|
||||||
if (resp.has_readopchecksuminfo()) {
|
if (resp.has_readopchecksuminfo()) {
|
||||||
const auto &checksum_info = resp.readopchecksuminfo();
|
const auto &checksum_info = resp.readopchecksuminfo();
|
||||||
chunk_padding_bytes_ = offset - checksum_info.chunkoffset();
|
chunk_padding_bytes_ = offset - checksum_info.chunkoffset();
|
||||||
|
@ -171,14 +162,6 @@ struct BlockReaderImpl::ReadPacketHeader
|
||||||
assert(v && "Failed to parse the header");
|
assert(v && "Failed to parse the header");
|
||||||
parent_->state_ = kReadChecksum;
|
parent_->state_ = kReadChecksum;
|
||||||
}
|
}
|
||||||
if(parent_->event_handlers_) {
|
|
||||||
event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
|
|
||||||
#ifndef NDEBUG
|
|
||||||
if (status.ok() && event_resp.response() == event_response::kTest_Error) {
|
|
||||||
status = Status::Error("Test error");
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
next(status);
|
next(status);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -231,7 +214,7 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto handler = [parent, next, this](const asio::error_code &ec, size_t) {
|
auto handler = [parent, next](const asio::error_code &ec, size_t) {
|
||||||
Status status;
|
Status status;
|
||||||
if (ec) {
|
if (ec) {
|
||||||
status = Status(ec.value(), ec.message().c_str());
|
status = Status(ec.value(), ec.message().c_str());
|
||||||
|
@ -239,14 +222,6 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation {
|
||||||
parent->state_ =
|
parent->state_ =
|
||||||
parent->chunk_padding_bytes_ ? kReadPadding : kReadData;
|
parent->chunk_padding_bytes_ ? kReadPadding : kReadData;
|
||||||
}
|
}
|
||||||
if(parent->event_handlers_) {
|
|
||||||
event_response event_resp = parent->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
|
|
||||||
#ifndef NDEBUG
|
|
||||||
if (status.ok() && event_resp.response() == event_response::kTest_Error) {
|
|
||||||
status = Status::Error("Test error");
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
next(status);
|
next(status);
|
||||||
};
|
};
|
||||||
parent->checksum_.resize(parent->packet_len_ - sizeof(int) -
|
parent->checksum_.resize(parent->packet_len_ - sizeof(int) -
|
||||||
|
@ -273,6 +248,7 @@ struct BlockReaderImpl::ReadData : continuation::Continuation {
|
||||||
virtual void Run(const Next &next) override {
|
virtual void Run(const Next &next) override {
|
||||||
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadData::Run("
|
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadData::Run("
|
||||||
<< FMT_CONT_AND_PARENT_ADDR << ") called");
|
<< FMT_CONT_AND_PARENT_ADDR << ") called");
|
||||||
|
|
||||||
auto handler =
|
auto handler =
|
||||||
[next, this](const asio::error_code &ec, size_t transferred) {
|
[next, this](const asio::error_code &ec, size_t transferred) {
|
||||||
Status status;
|
Status status;
|
||||||
|
@ -285,14 +261,6 @@ struct BlockReaderImpl::ReadData : continuation::Continuation {
|
||||||
if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
|
if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
|
||||||
parent_->state_ = kReadPacketHeader;
|
parent_->state_ = kReadPacketHeader;
|
||||||
}
|
}
|
||||||
if(parent_->event_handlers_) {
|
|
||||||
event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
|
|
||||||
#ifndef NDEBUG
|
|
||||||
if (status.ok() && event_resp.response() == event_response::kTest_Error) {
|
|
||||||
status = Status::Error("Test error");
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
next(status);
|
next(status);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -324,22 +292,13 @@ struct BlockReaderImpl::ReadPadding : continuation::Continuation {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto h = [next, this](const Status &stat) {
|
auto h = [next, this](const Status &status) {
|
||||||
Status status = stat;
|
|
||||||
if (status.ok()) {
|
if (status.ok()) {
|
||||||
assert(reinterpret_cast<const int &>(*bytes_transferred_) ==
|
assert(reinterpret_cast<const int &>(*bytes_transferred_) ==
|
||||||
parent_->chunk_padding_bytes_);
|
parent_->chunk_padding_bytes_);
|
||||||
parent_->chunk_padding_bytes_ = 0;
|
parent_->chunk_padding_bytes_ = 0;
|
||||||
parent_->state_ = kReadData;
|
parent_->state_ = kReadData;
|
||||||
}
|
}
|
||||||
if(parent_->event_handlers_) {
|
|
||||||
event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
|
|
||||||
#ifndef NDEBUG
|
|
||||||
if (status.ok() && event_resp.response() == event_response::kTest_Error) {
|
|
||||||
status = Status::Error("Test error");
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
next(status);
|
next(status);
|
||||||
};
|
};
|
||||||
read_data_->Run(h);
|
read_data_->Run(h);
|
||||||
|
@ -375,20 +334,11 @@ struct BlockReaderImpl::AckRead : continuation::Continuation {
|
||||||
m->Push(
|
m->Push(
|
||||||
continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state()));
|
continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state()));
|
||||||
|
|
||||||
m->Run([this, next](const Status &stat,
|
m->Run([this, next](const Status &status,
|
||||||
const hadoop::hdfs::ClientReadStatusProto &) {
|
const hadoop::hdfs::ClientReadStatusProto &) {
|
||||||
Status status = stat;
|
|
||||||
if (status.ok()) {
|
if (status.ok()) {
|
||||||
parent_->state_ = BlockReaderImpl::kFinished;
|
parent_->state_ = BlockReaderImpl::kFinished;
|
||||||
}
|
}
|
||||||
if(parent_->event_handlers_) {
|
|
||||||
event_response event_resp = parent_->event_handlers_->call(FILE_DN_READ_EVENT, "", "", 0);
|
|
||||||
#ifndef NDEBUG
|
|
||||||
if (status.ok() && event_resp.response() == event_response::kTest_Error) {
|
|
||||||
status = Status::Error("Test error");
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
next(status);
|
next(status);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,9 +93,9 @@ class BlockReaderImpl
|
||||||
: public BlockReader, public std::enable_shared_from_this<BlockReaderImpl> {
|
: public BlockReader, public std::enable_shared_from_this<BlockReaderImpl> {
|
||||||
public:
|
public:
|
||||||
explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection> dn,
|
explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection> dn,
|
||||||
CancelHandle cancel_state, std::shared_ptr<LibhdfsEvents> event_handlers=nullptr)
|
CancelHandle cancel_state)
|
||||||
: dn_(dn), state_(kOpen), options_(options),
|
: dn_(dn), state_(kOpen), options_(options),
|
||||||
chunk_padding_bytes_(0), cancel_state_(cancel_state), event_handlers_(event_handlers.get()) {}
|
chunk_padding_bytes_(0), cancel_state_(cancel_state) {}
|
||||||
|
|
||||||
virtual void AsyncReadPacket(
|
virtual void AsyncReadPacket(
|
||||||
const MutableBuffers &buffers,
|
const MutableBuffers &buffers,
|
||||||
|
@ -152,7 +152,6 @@ private:
|
||||||
long long bytes_to_read_;
|
long long bytes_to_read_;
|
||||||
std::vector<char> checksum_;
|
std::vector<char> checksum_;
|
||||||
CancelHandle cancel_state_;
|
CancelHandle cancel_state_;
|
||||||
LibhdfsEvents* event_handlers_;
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -274,16 +274,7 @@ void RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) {
|
||||||
}
|
}
|
||||||
|
|
||||||
Status status;
|
Status status;
|
||||||
if(event_handlers_) {
|
if (h.has_exceptionclassname()) {
|
||||||
event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0);
|
|
||||||
#ifndef NDEBUG
|
|
||||||
if (event_resp.response() == event_response::kTest_Error) {
|
|
||||||
status = event_resp.status();
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
if (status.ok() && h.has_exceptionclassname()) {
|
|
||||||
status =
|
status =
|
||||||
Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str());
|
Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str());
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,8 +30,6 @@
|
||||||
#include <asio/read.hpp>
|
#include <asio/read.hpp>
|
||||||
#include <asio/write.hpp>
|
#include <asio/write.hpp>
|
||||||
|
|
||||||
#include <system_error>
|
|
||||||
|
|
||||||
namespace hdfs {
|
namespace hdfs {
|
||||||
|
|
||||||
template <class NextLayer>
|
template <class NextLayer>
|
||||||
|
@ -65,10 +63,6 @@ public:
|
||||||
NextLayer next_layer_;
|
NextLayer next_layer_;
|
||||||
|
|
||||||
void ConnectComplete(const ::asio::error_code &ec);
|
void ConnectComplete(const ::asio::error_code &ec);
|
||||||
|
|
||||||
// Hide default ctors.
|
|
||||||
RpcConnectionImpl();
|
|
||||||
RpcConnectionImpl(const RpcConnectionImpl &other);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
template <class NextLayer>
|
template <class NextLayer>
|
||||||
|
@ -76,7 +70,7 @@ RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine)
|
||||||
: RpcConnection(engine),
|
: RpcConnection(engine),
|
||||||
options_(engine->options()),
|
options_(engine->options()),
|
||||||
next_layer_(engine->io_service()) {
|
next_layer_(engine->io_service()) {
|
||||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this);
|
LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called");
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class NextLayer>
|
template <class NextLayer>
|
||||||
|
@ -90,6 +84,7 @@ RpcConnectionImpl<NextLayer>::~RpcConnectionImpl() {
|
||||||
LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue");
|
LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
template <class NextLayer>
|
template <class NextLayer>
|
||||||
void RpcConnectionImpl<NextLayer>::Connect(
|
void RpcConnectionImpl<NextLayer>::Connect(
|
||||||
const std::vector<::asio::ip::tcp::endpoint> &server,
|
const std::vector<::asio::ip::tcp::endpoint> &server,
|
||||||
|
@ -150,7 +145,7 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec)
|
||||||
|
|
||||||
Status status = ToStatus(ec);
|
Status status = ToStatus(ec);
|
||||||
if(event_handlers_) {
|
if(event_handlers_) {
|
||||||
event_response event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0);
|
auto event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0);
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
if (event_resp.response() == event_response::kTest_Error) {
|
if (event_resp.response() == event_response::kTest_Error) {
|
||||||
status = event_resp.status();
|
status = event_resp.status();
|
||||||
|
@ -315,28 +310,27 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
|
||||||
|
|
||||||
|
|
||||||
template <class NextLayer>
|
template <class NextLayer>
|
||||||
void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &original_ec,
|
void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &asio_ec,
|
||||||
size_t) {
|
size_t) {
|
||||||
using std::placeholders::_1;
|
using std::placeholders::_1;
|
||||||
using std::placeholders::_2;
|
using std::placeholders::_2;
|
||||||
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
|
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
|
||||||
|
|
||||||
::asio::error_code my_ec(original_ec);
|
|
||||||
|
|
||||||
LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called");
|
LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called");
|
||||||
|
|
||||||
std::shared_ptr<RpcConnection> shared_this = shared_from_this();
|
std::shared_ptr<RpcConnection> shared_this = shared_from_this();
|
||||||
|
|
||||||
|
::asio::error_code ec = asio_ec;
|
||||||
if(event_handlers_) {
|
if(event_handlers_) {
|
||||||
event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0);
|
auto event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0);
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
if (event_resp.response() == event_response::kTest_Error) {
|
if (event_resp.response() == event_response::kTest_Error) {
|
||||||
my_ec = std::make_error_code(std::errc::network_down);
|
ec = std::make_error_code(std::errc::network_down);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (my_ec.value()) {
|
switch (ec.value()) {
|
||||||
case 0:
|
case 0:
|
||||||
// No errors
|
// No errors
|
||||||
break;
|
break;
|
||||||
|
@ -344,8 +338,8 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ori
|
||||||
// The event loop has been shut down. Ignore the error.
|
// The event loop has been shut down. Ignore the error.
|
||||||
return;
|
return;
|
||||||
default:
|
default:
|
||||||
LOG_WARN(kRPC, << "Network error during RPC read: " << my_ec.message());
|
LOG_WARN(kRPC, << "Network error during RPC read: " << ec.message());
|
||||||
CommsError(ToStatus(my_ec));
|
CommsError(ToStatus(ec));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -229,6 +229,7 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection> {
|
||||||
std::shared_ptr<LibhdfsEvents> event_handlers_;
|
std::shared_ptr<LibhdfsEvents> event_handlers_;
|
||||||
std::string cluster_name_;
|
std::string cluster_name_;
|
||||||
|
|
||||||
|
|
||||||
// Lock for mutable parts of this class that need to be thread safe
|
// Lock for mutable parts of this class that need to be thread safe
|
||||||
std::mutex connection_state_lock_;
|
std::mutex connection_state_lock_;
|
||||||
|
|
||||||
|
|
|
@ -138,10 +138,6 @@ build_libhdfs_test(hdfspp_mini_dfs_smoke hdfspp_test_shim_static ${CMAKE_CURRENT
|
||||||
link_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static fs reader rpc proto common connection gmock_main ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES})
|
link_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static fs reader rpc proto common connection gmock_main ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES})
|
||||||
add_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static)
|
add_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static)
|
||||||
|
|
||||||
build_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static expect.c test_libhdfs_mini_stress.c ${OS_DIR}/thread.c)
|
|
||||||
link_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static fs reader rpc proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES})
|
|
||||||
add_libhdfs_test(libhdfs_mini_stress hdfspp_test_shim_static)
|
|
||||||
|
|
||||||
build_libhdfs_test(hdfs_ext hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hdfs_ext_test.cc)
|
build_libhdfs_test(hdfs_ext hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hdfs_ext_test.cc)
|
||||||
link_libhdfs_test (hdfs_ext hdfspp_test_shim_static hdfspp_static gmock_main native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES})
|
link_libhdfs_test (hdfs_ext hdfspp_test_shim_static hdfspp_static gmock_main native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES})
|
||||||
add_libhdfs_test (hdfs_ext hdfspp_test_shim_static)
|
add_libhdfs_test (hdfs_ext hdfspp_test_shim_static)
|
||||||
|
|
|
@ -93,10 +93,9 @@ public:
|
||||||
std::shared_ptr<MockReader> mock_reader_ = std::make_shared<MockReader>();
|
std::shared_ptr<MockReader> mock_reader_ = std::make_shared<MockReader>();
|
||||||
protected:
|
protected:
|
||||||
std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
|
std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
|
||||||
std::shared_ptr<DataNodeConnection> dn,
|
std::shared_ptr<DataNodeConnection> dn) override
|
||||||
std::shared_ptr<hdfs::LibhdfsEvents> event_handlers) override
|
|
||||||
{
|
{
|
||||||
(void) options; (void) dn; (void) event_handlers;
|
(void) options; (void) dn;
|
||||||
assert(mock_reader_);
|
assert(mock_reader_);
|
||||||
return mock_reader_;
|
return mock_reader_;
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,9 +24,15 @@
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
|
/* Cheat for now and use the same hdfsBuilder as libhdfs */
|
||||||
|
/* (libhdfspp doesn't have an hdfsBuilder yet). */
|
||||||
struct hdfsBuilder {
|
struct hdfsBuilder {
|
||||||
struct libhdfs_hdfsBuilder * libhdfsBuilder;
|
int forceNewInstance;
|
||||||
struct libhdfspp_hdfsBuilder * libhdfsppBuilder;
|
const char *nn;
|
||||||
|
tPort port;
|
||||||
|
const char *kerbTicketCachePath;
|
||||||
|
const char *userName;
|
||||||
|
struct hdfsBuilderConfOpt *opts;
|
||||||
};
|
};
|
||||||
|
|
||||||
/* Shim structs and functions that delegate to libhdfspp and libhdfs. */
|
/* Shim structs and functions that delegate to libhdfspp and libhdfs. */
|
||||||
|
@ -92,13 +98,13 @@ hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) {
|
||||||
|
|
||||||
hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
|
hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
|
||||||
hdfsFS ret = calloc(1, sizeof(struct hdfs_internal));
|
hdfsFS ret = calloc(1, sizeof(struct hdfs_internal));
|
||||||
ret->libhdfsppRep = libhdfspp_hdfsBuilderConnect(bld->libhdfsppBuilder);
|
ret->libhdfsppRep = libhdfspp_hdfsConnect(bld->nn, bld->port);
|
||||||
if (!ret->libhdfsppRep) {
|
if (!ret->libhdfsppRep) {
|
||||||
free(ret);
|
free(ret);
|
||||||
ret = NULL;
|
ret = NULL;
|
||||||
} else {
|
} else {
|
||||||
/* Destroys bld object. */
|
/* Destroys bld object. */
|
||||||
ret->libhdfsRep = libhdfs_hdfsBuilderConnect(bld->libhdfsBuilder);
|
ret->libhdfsRep = libhdfs_hdfsBuilderConnect(bld);
|
||||||
if (!ret->libhdfsRep) {
|
if (!ret->libhdfsRep) {
|
||||||
libhdfspp_hdfsDisconnect(ret->libhdfsppRep);
|
libhdfspp_hdfsDisconnect(ret->libhdfsppRep);
|
||||||
free(ret);
|
free(ret);
|
||||||
|
@ -109,61 +115,49 @@ hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
|
||||||
}
|
}
|
||||||
|
|
||||||
struct hdfsBuilder *hdfsNewBuilder(void) {
|
struct hdfsBuilder *hdfsNewBuilder(void) {
|
||||||
struct hdfsBuilder * ret = calloc(1, sizeof(struct hdfsBuilder));
|
return libhdfs_hdfsNewBuilder();
|
||||||
ret->libhdfsppBuilder = libhdfspp_hdfsNewBuilder();
|
|
||||||
ret->libhdfsBuilder = libhdfs_hdfsNewBuilder();
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld) {
|
void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld) {
|
||||||
libhdfs_hdfsBuilderSetForceNewInstance(bld->libhdfsBuilder);
|
libhdfs_hdfsBuilderSetForceNewInstance(bld);
|
||||||
// libhdfspp_hdfsBuilderSetForceNewInstance(bld->libhdfsppBuilder);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn) {
|
void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn) {
|
||||||
libhdfs_hdfsBuilderSetNameNode(bld->libhdfsBuilder, nn);
|
libhdfs_hdfsBuilderSetNameNode(bld, nn);
|
||||||
libhdfspp_hdfsBuilderSetNameNode(bld->libhdfsppBuilder, nn);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port) {
|
void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port) {
|
||||||
libhdfs_hdfsBuilderSetNameNodePort(bld->libhdfsBuilder, port);
|
libhdfs_hdfsBuilderSetNameNodePort(bld, port);
|
||||||
libhdfspp_hdfsBuilderSetNameNodePort(bld->libhdfsppBuilder, port);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName) {
|
void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName) {
|
||||||
libhdfs_hdfsBuilderSetUserName(bld->libhdfsBuilder, userName);
|
libhdfs_hdfsBuilderSetUserName(bld, userName);
|
||||||
libhdfspp_hdfsBuilderSetUserName(bld->libhdfsppBuilder, userName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
|
void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
|
||||||
const char *kerbTicketCachePath) {
|
const char *kerbTicketCachePath) {
|
||||||
libhdfs_hdfsBuilderSetKerbTicketCachePath(bld->libhdfsBuilder, kerbTicketCachePath);
|
libhdfs_hdfsBuilderSetKerbTicketCachePath(bld, kerbTicketCachePath);
|
||||||
// libhdfspp_hdfsBuilderSetKerbTicketCachePath(bld->libhdfsppBuilder, kerbTicketCachePath);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void hdfsFreeBuilder(struct hdfsBuilder *bld) {
|
void hdfsFreeBuilder(struct hdfsBuilder *bld) {
|
||||||
libhdfs_hdfsFreeBuilder(bld->libhdfsBuilder);
|
libhdfs_hdfsFreeBuilder(bld);
|
||||||
libhdfspp_hdfsFreeBuilder(bld->libhdfsppBuilder);
|
|
||||||
free(bld);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
|
int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
|
||||||
const char *val) {
|
const char *val) {
|
||||||
fprintf(stderr, "hdfs_shim::hdfsBuilderConfSetStr) key=%s val=%s\n", key, val);
|
return libhdfs_hdfsBuilderConfSetStr(bld, key, val);
|
||||||
libhdfs_hdfsBuilderConfSetStr(bld->libhdfsBuilder, key, val);
|
|
||||||
return libhdfspp_hdfsBuilderConfSetStr(bld->libhdfsppBuilder, key, val);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int hdfsConfGetStr(const char *key, char **val) {
|
int hdfsConfGetStr(const char *key, char **val) {
|
||||||
return libhdfspp_hdfsConfGetStr(key, val);
|
return libhdfs_hdfsConfGetStr(key, val);
|
||||||
}
|
}
|
||||||
|
|
||||||
int hdfsConfGetInt(const char *key, int32_t *val) {
|
int hdfsConfGetInt(const char *key, int32_t *val) {
|
||||||
return libhdfspp_hdfsConfGetInt(key, val);
|
return libhdfs_hdfsConfGetInt(key, val);
|
||||||
}
|
}
|
||||||
|
|
||||||
void hdfsConfStrFree(char *val) {
|
void hdfsConfStrFree(char *val) {
|
||||||
libhdfspp_hdfsConfStrFree(val);
|
libhdfs_hdfsConfStrFree(val);
|
||||||
}
|
}
|
||||||
|
|
||||||
int hdfsDisconnect(hdfsFS fs) {
|
int hdfsDisconnect(hdfsFS fs) {
|
||||||
|
|
|
@ -81,7 +81,6 @@
|
||||||
#define hadoopRzBufferLength libhdfspp_hadoopRzBufferLength
|
#define hadoopRzBufferLength libhdfspp_hadoopRzBufferLength
|
||||||
#define hadoopRzBufferGet libhdfspp_hadoopRzBufferGet
|
#define hadoopRzBufferGet libhdfspp_hadoopRzBufferGet
|
||||||
#define hadoopRzBufferFree libhdfspp_hadoopRzBufferFree
|
#define hadoopRzBufferFree libhdfspp_hadoopRzBufferFree
|
||||||
#define hdfsBuilder libhdfspp_hdfsBuilder
|
|
||||||
#define hdfs_internal libhdfspp_hdfs_internal
|
#define hdfs_internal libhdfspp_hdfs_internal
|
||||||
#define hdfsFS libhdfspp_hdfsFS
|
#define hdfsFS libhdfspp_hdfsFS
|
||||||
#define hdfsFile_internal libhdfspp_hdfsFile_internal
|
#define hdfsFile_internal libhdfspp_hdfsFile_internal
|
||||||
|
|
|
@ -386,7 +386,7 @@ TEST(RpcEngineTest, TestEventCallbacks)
|
||||||
});
|
});
|
||||||
io_service.run();
|
io_service.run();
|
||||||
ASSERT_TRUE(complete);
|
ASSERT_TRUE(complete);
|
||||||
ASSERT_EQ(8, callbacks.size());
|
ASSERT_EQ(7, callbacks.size());
|
||||||
ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error
|
ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error
|
||||||
ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[1]); // reconnect
|
ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[1]); // reconnect
|
||||||
ASSERT_EQ(FS_NN_READ_EVENT, callbacks[2]); // makes an error
|
ASSERT_EQ(FS_NN_READ_EVENT, callbacks[2]); // makes an error
|
||||||
|
|
Loading…
Reference in New Issue