HDFS-3634. Add self-contained, mavenized fuse_dfs test. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1371232 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-08-09 14:45:08 +00:00
parent 054f5282ba
commit cc33d7b2ce
11 changed files with 1013 additions and 4 deletions

View File

@ -190,6 +190,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3291. add test that covers HttpFS working w/ a non-HDFS Hadoop
filesystem (tucu)
HDFS-3634. Add self-contained, mavenized fuse_dfs test. (Colin Patrick
McCabe via atm)
OPTIMIZATIONS
HDFS-2982. Startup performance suffers when there are many edit log

View File

@ -107,6 +107,10 @@ set(LIBHDFS_VERSION "0.0.0")
set_target_properties(hdfs PROPERTIES
SOVERSION ${LIBHDFS_VERSION})
add_library(posix_util
main/native/util/posix_util.c
)
add_executable(test_libhdfs_ops
main/native/libhdfs/test/test_libhdfs_ops.c
)

View File

@ -71,6 +71,16 @@ IF(FUSE_FOUND)
m
pthread
)
add_executable(test_fuse_dfs
test/test_fuse_dfs.c
test/fuse_workload.c
)
target_link_libraries(test_fuse_dfs
${FUSE_LIBRARIES}
native_mini_dfs
posix_util
pthread
)
ELSE(FUSE_FOUND)
IF(REQUIRE_FUSE)
MESSAGE(FATAL_ERROR "Required component fuse_dfs could not be built.")

View File

@ -0,0 +1,348 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fuse-dfs/test/fuse_workload.h"
#include "libhdfs/expect.h"
#include "util/posix_util.h"
#include <dirent.h>
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/statvfs.h>
#include <sys/types.h>
#include <unistd.h>
#include <utime.h>
typedef int (*testReadDirFn)(const struct dirent *de, void *v);
struct fileCtx {
int fd;
char *str;
int strLen;
char *path;
};
static const char *DIRS_A_AND_B[] = { "a", "b", NULL };
static const char *DIRS_B_AND_C[] = { "b", "c", NULL };
#define LONG_STR_LEN 1024
#define NUM_FILE_CTX 3
#define MAX_TRIES 120
// TODO: implement/test access, mknod, symlink
// TODO: rmdir on non-dir, non-empty dir
// TODO: test unlink-during-writing
// TODO: test weird open flags failing
// TODO: test chown, chmod
static int testReadDirImpl(DIR *dp, testReadDirFn fn, void *data)
{
struct dirent *de;
int ret, noda = 0;
while (1) {
de = readdir(dp);
if (!de)
return noda;
if (!strcmp(de->d_name, "."))
continue;
if (!strcmp(de->d_name, ".."))
continue;
ret = fn(de, data);
if (ret < 0)
return ret;
++noda;
}
return noda;
}
static int testReadDir(const char *dirName, testReadDirFn fn, void *data)
{
int ret;
DIR *dp;
dp = opendir(dirName);
if (!dp) {
return -errno;
}
ret = testReadDirImpl(dp, fn, data);
closedir(dp);
return ret;
}
static int expectDirs(const struct dirent *de, void *v)
{
const char **names = v;
const char **n;
for (n = names; *n; ++n) {
if (!strcmp(de->d_name, *n)) {
return 0;
}
}
return -ENOENT;
}
static int safeWrite(int fd, const void *buf, size_t amt)
{
while (amt > 0) {
int r = write(fd, buf, amt);
if (r < 0) {
if (errno != EINTR)
return -errno;
continue;
}
amt -= r;
buf = (const char *)buf + r;
}
return 0;
}
static int safeRead(int fd, void *buf, int c)
{
int res;
size_t amt = 0;
while (amt < c) {
res = read(fd, buf, c - amt);
if (res <= 0) {
if (res == 0)
return amt;
if (errno != EINTR)
return -errno;
continue;
}
amt += res;
buf = (char *)buf + res;
}
return amt;
}
int runFuseWorkloadImpl(const char *root, const char *pcomp,
struct fileCtx *ctx)
{
char base[PATH_MAX], tmp[PATH_MAX], *tmpBuf;
char src[PATH_MAX], dst[PATH_MAX];
struct stat stBuf;
int ret, i, try;
struct utimbuf tbuf;
struct statvfs stvBuf;
// The root must be a directory
EXPECT_ZERO(stat(root, &stBuf));
EXPECT_NONZERO(S_ISDIR(stBuf.st_mode));
// base = <root>/<pcomp>. base must not exist yet
snprintf(base, sizeof(base), "%s/%s", root, pcomp);
EXPECT_NEGATIVE_ONE_WITH_ERRNO(stat(base, &stBuf), ENOENT);
// mkdir <base>
RETRY_ON_EINTR_GET_ERRNO(ret, mkdir(base, 0755));
EXPECT_ZERO(ret);
// rmdir <base>
RETRY_ON_EINTR_GET_ERRNO(ret, rmdir(base));
EXPECT_ZERO(ret);
// mkdir <base>
RETRY_ON_EINTR_GET_ERRNO(ret, mkdir(base, 0755));
EXPECT_ZERO(ret);
// stat <base>
EXPECT_ZERO(stat(base, &stBuf));
EXPECT_NONZERO(S_ISDIR(stBuf.st_mode));
// mkdir <base>/a
snprintf(tmp, sizeof(tmp), "%s/a", base);
RETRY_ON_EINTR_GET_ERRNO(ret, mkdir(tmp, 0755));
EXPECT_ZERO(ret);
/* readdir test */
EXPECT_INT_EQ(1, testReadDir(base, expectDirs, DIRS_A_AND_B));
// mkdir <base>/b
snprintf(tmp, sizeof(tmp), "%s/b", base);
RETRY_ON_EINTR_GET_ERRNO(ret, mkdir(tmp, 0755));
EXPECT_ZERO(ret);
// readdir a and b
EXPECT_INT_EQ(2, testReadDir(base, expectDirs, DIRS_A_AND_B));
// rename a -> c
snprintf(src, sizeof(src), "%s/a", base);
snprintf(dst, sizeof(dst), "%s/c", base);
EXPECT_ZERO(rename(src, dst));
// readdir c and b
EXPECT_INT_EQ(-ENOENT, testReadDir(base, expectDirs, DIRS_A_AND_B));
EXPECT_INT_EQ(2, testReadDir(base, expectDirs, DIRS_B_AND_C));
// statvfs
memset(&stvBuf, 0, sizeof(stvBuf));
EXPECT_ZERO(statvfs(root, &stvBuf));
// set utime on base
memset(&tbuf, 0, sizeof(tbuf));
tbuf.actime = 123;
tbuf.modtime = 456;
EXPECT_ZERO(utime(base, &tbuf));
// stat(base)
EXPECT_ZERO(stat(base, &stBuf));
EXPECT_NONZERO(S_ISDIR(stBuf.st_mode));
//EXPECT_INT_EQ(456, stBuf.st_atime); // hdfs doesn't store atime on directories
EXPECT_INT_EQ(456, stBuf.st_mtime);
// open some files and write to them
for (i = 0; i < NUM_FILE_CTX; i++) {
snprintf(tmp, sizeof(tmp), "%s/b/%d", base, i);
ctx[i].path = strdup(tmp);
if (!ctx[i].path) {
fprintf(stderr, "FUSE_WORKLOAD: OOM on line %d\n", __LINE__);
return -ENOMEM;
}
ctx[i].strLen = strlen(ctx[i].str);
}
for (i = 0; i < NUM_FILE_CTX; i++) {
ctx[i].fd = open(ctx[i].path, O_RDONLY);
if (ctx[i].fd >= 0) {
fprintf(stderr, "FUSE_WORKLOAD: Error: was able to open %s for read before it "
"was created!\n", ctx[i].path);
return -EIO;
}
ctx[i].fd = creat(ctx[i].path, 0755);
if (ctx[i].fd < 0) {
fprintf(stderr, "FUSE_WORKLOAD: Failed to create file %s for writing!\n",
ctx[i].path);
return -EIO;
}
}
for (i = 0; i < NUM_FILE_CTX; i++) {
EXPECT_ZERO(safeWrite(ctx[i].fd, ctx[i].str, ctx[i].strLen));
}
for (i = 0; i < NUM_FILE_CTX; i++) {
RETRY_ON_EINTR_GET_ERRNO(ret, close(ctx[i].fd));
EXPECT_ZERO(ret);
ctx[i].fd = -1;
}
for (i = 0; i < NUM_FILE_CTX; i++) {
/* Bug: HDFS-2551.
* When a program writes a file, closes it, and immediately re-opens it,
* it might not appear to have the correct length. This is because FUSE
* invokes the release() callback asynchronously.
*
* To work around this, we keep retrying until the file length is what we
* expect.
*/
for (try = 0; try < MAX_TRIES; try++) {
EXPECT_ZERO(stat(ctx[i].path, &stBuf));
EXPECT_NONZERO(S_ISREG(stBuf.st_mode));
if (ctx[i].strLen == stBuf.st_size) {
break;
}
sleepNoSig(1);
}
if (try == MAX_TRIES) {
fprintf(stderr, "FUSE_WORKLOAD: error: expected file %s to have length "
"%d; instead, it had length %lld\n",
ctx[i].path, ctx[i].strLen, (long long)stBuf.st_size);
return -EIO;
}
}
for (i = 0; i < NUM_FILE_CTX; i++) {
ctx[i].fd = open(ctx[i].path, O_RDONLY);
if (ctx[i].fd < 0) {
fprintf(stderr, "FUSE_WORKLOAD: Failed to open file %s for reading!\n",
ctx[i].path);
return -EIO;
}
}
for (i = 0; i < NUM_FILE_CTX; i++) {
tmpBuf = calloc(1, ctx[i].strLen);
if (!tmpBuf) {
fprintf(stderr, "FUSE_WORKLOAD: OOM on line %d\n", __LINE__);
return -ENOMEM;
}
EXPECT_INT_EQ(ctx[i].strLen, safeRead(ctx[i].fd, tmpBuf, ctx[i].strLen));
EXPECT_ZERO(memcmp(ctx[i].str, tmpBuf, ctx[i].strLen));
RETRY_ON_EINTR_GET_ERRNO(ret, close(ctx[i].fd));
ctx[i].fd = -1;
EXPECT_ZERO(ret);
free(tmpBuf);
}
for (i = 0; i < NUM_FILE_CTX; i++) {
EXPECT_ZERO(truncate(ctx[i].path, 0));
}
for (i = 0; i < NUM_FILE_CTX; i++) {
EXPECT_ZERO(stat(ctx[i].path, &stBuf));
EXPECT_NONZERO(S_ISREG(stBuf.st_mode));
EXPECT_INT_EQ(0, stBuf.st_size);
}
for (i = 0; i < NUM_FILE_CTX; i++) {
RETRY_ON_EINTR_GET_ERRNO(ret, unlink(ctx[i].path));
EXPECT_ZERO(ret);
}
for (i = 0; i < NUM_FILE_CTX; i++) {
EXPECT_NEGATIVE_ONE_WITH_ERRNO(stat(ctx[i].path, &stBuf), ENOENT);
}
for (i = 0; i < NUM_FILE_CTX; i++) {
free(ctx[i].path);
}
EXPECT_ZERO(recursiveDelete(base));
return 0;
}
int runFuseWorkload(const char *root, const char *pcomp)
{
int ret, err;
size_t i;
char longStr[LONG_STR_LEN];
struct fileCtx ctx[NUM_FILE_CTX] = {
{
.fd = -1,
.str = "hello, world",
},
{
.fd = -1,
.str = "A",
},
{
.fd = -1,
.str = longStr,
},
};
for (i = 0; i < LONG_STR_LEN - 1; i++) {
longStr[i] = 'a' + (i % 10);
}
longStr[LONG_STR_LEN - 1] = '\0';
ret = runFuseWorkloadImpl(root, pcomp, ctx);
// Make sure all file descriptors are closed, or else we won't be able to
// unmount
for (i = 0; i < NUM_FILE_CTX; i++) {
if (ctx[i].fd >= 0) {
RETRY_ON_EINTR_GET_ERRNO(err, close(ctx[i].fd));
}
}
return ret;
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __FUSE_WORKLOAD_H__
#define __FUSE_WORKLOAD_H__
/**
* Perform some FUSE operations.
*
* The operations will be performed under <root>/<pcomp>.
* This directory should not exist prior to the test, and should not be used by
* any other tests concurrently.
*
* @param root The root directory for the testing.
* @param pcomp Path component to add to root
*
* @return 0 on success; negative error code otherwise
*/
int runFuseWorkload(const char *root, const char *pcomp);
#endif

View File

@ -0,0 +1,378 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fuse-dfs/test/fuse_workload.h"
#include "libhdfs/expect.h"
#include "libhdfs/hdfs.h"
#include "libhdfs/native_mini_dfs.h"
#include "util/posix_util.h"
#include <ctype.h>
#include <errno.h>
#include <libgen.h>
#include <limits.h>
#include <mntent.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>
/** Exit status to return when there is an exec error.
*
* We assume that fusermount and fuse_dfs don't normally return this error code.
*/
#define EXIT_STATUS_EXEC_ERROR 240
/** Maximum number of times to try to unmount the fuse filesystem we created.
*/
#define MAX_UNMOUNT_TRIES 20
/**
* Verify that the fuse workload works on the local FS.
*
* @return 0 on success; error code otherwise
*/
static int verifyFuseWorkload(void)
{
char tempDir[PATH_MAX];
EXPECT_ZERO(createTempDir(tempDir, sizeof(tempDir), 0755));
EXPECT_ZERO(runFuseWorkload(tempDir, "test"));
EXPECT_ZERO(recursiveDelete(tempDir));
return 0;
}
static int fuserMount(int *procRet, ...) __attribute__((sentinel));
/**
* Invoke the fusermount binary.
*
* @param retVal (out param) The return value from fusermount
*
* @return 0 on success; error code if the fork fails
*/
static int fuserMount(int *procRet, ...)
{
int ret, status;
size_t i = 0;
char *args[64], *c, *env[] = { NULL };
va_list ap;
pid_t pid, pret;
args[i++] = "fusermount";
va_start(ap, procRet);
while (1) {
c = va_arg(ap, char *);
args[i++] = c;
if (!c)
break;
if (i > sizeof(args)/sizeof(args[0])) {
va_end(ap);
return -EINVAL;
}
}
va_end(ap);
pid = fork();
if (pid < 0) {
ret = errno;
fprintf(stderr, "FUSE_TEST: failed to fork: error %d: %s\n",
ret, strerror(ret));
return -ret;
} else if (pid == 0) {
if (execvpe("fusermount", args, env)) {
ret = errno;
fprintf(stderr, "FUSE_TEST: failed to execute fusermount: "
"error %d: %s\n", ret, strerror(ret));
exit(EXIT_STATUS_EXEC_ERROR);
}
}
pret = waitpid(pid, &status, 0);
if (pret != pid) {
ret = errno;
fprintf(stderr, "FUSE_TEST: failed to wait for pid %d: returned %d "
"(error %d: %s)\n", pid, pret, ret, strerror(ret));
return -ret;
}
if (WIFEXITED(status)) {
*procRet = WEXITSTATUS(status);
} else if (WIFSIGNALED(status)) {
fprintf(stderr, "FUSE_TEST: fusermount exited with signal %d\n",
WTERMSIG(status));
*procRet = -1;
} else {
fprintf(stderr, "FUSE_TEST: fusermount exited with unknown exit type\n");
*procRet = -1;
}
return 0;
}
static int isMounted(const char *mntPoint)
{
int ret;
FILE *fp;
struct mntent *mntEnt;
fp = setmntent("/proc/mounts", "r");
if (!fp) {
ret = errno;
fprintf(stderr, "FUSE_TEST: isMounted(%s) failed to open /proc/mounts: "
"error %d: %s", mntPoint, ret, strerror(ret));
return -ret;
}
while ((mntEnt = getmntent(fp))) {
if (!strcmp(mntEnt->mnt_dir, mntPoint)) {
endmntent(fp);
return 1;
}
}
endmntent(fp);
return 0;
}
static int waitForMount(const char *mntPoint, int retries)
{
int ret, try = 0;
while (try++ < retries) {
ret = isMounted(mntPoint);
if (ret < 0) {
fprintf(stderr, "FUSE_TEST: waitForMount(%s, %d): isMounted returned "
"error %d\n", mntPoint, retries, ret);
} else if (ret == 1) {
return 0;
}
sleepNoSig(2);
}
return -ETIMEDOUT;
}
/**
* Try to unmount the fuse filesystem we mounted.
*
* Normally, only one try should be sufficient to unmount the filesystem.
* However, if our tests needs to exit right after starting the fuse_dfs process,
* the fuse_dfs process might not have gotten around to mounting itself yet. The
* retry loop in this function ensures that we always unmount FUSE before
* exiting, rather than leaving around a zombie fuse_dfs.
*
* @param mntPoint Where the FUSE filesystem is mounted
* @return 0 on success; error code otherwise
*/
static int cleanupFuse(const char *mntPoint)
{
int ret, pret, tries = 0;
while (1) {
ret = fuserMount(&pret, "-u", mntPoint, NULL);
if (ret) {
fprintf(stderr, "FUSE_TEST: unmountFuse: failed to invoke fuserMount: "
"error %d\n", ret);
return ret;
}
if (pret == 0) {
fprintf(stderr, "FUSE_TEST: successfully unmounted FUSE filesystem.\n");
return 0;
}
if (tries++ > MAX_UNMOUNT_TRIES) {
return -EIO;
}
fprintf(stderr, "FUSE_TEST: retrying unmount in 2 seconds...\n");
sleepNoSig(2);
}
}
/**
* Create a fuse_dfs process using the miniDfsCluster we set up.
*
* @param argv0 argv[0], as passed into main
* @param cluster The NativeMiniDfsCluster to connect to
* @param mntPoint The mount point
* @param pid (out param) the fuse_dfs process
*
* @return 0 on success; error code otherwise
*/
static int spawnFuseServer(const char *argv0,
const struct NativeMiniDfsCluster *cluster, const char *mntPoint,
pid_t *pid)
{
int ret, procRet;
char scratch[PATH_MAX], *dir, fusePath[PATH_MAX], portOpt[128];
snprintf(scratch, sizeof(scratch), "%s", argv0);
dir = dirname(scratch);
snprintf(fusePath, sizeof(fusePath), "%s/fuse_dfs", dir);
if (access(fusePath, X_OK)) {
fprintf(stderr, "FUSE_TEST: spawnFuseServer: failed to find fuse_dfs "
"binary at %s!\n", fusePath);
return -ENOENT;
}
/* Let's make sure no other FUSE filesystem is mounted at mntPoint */
ret = fuserMount(&procRet, "-u", mntPoint, NULL);
if (ret) {
fprintf(stderr, "FUSE_TEST: fuserMount -u %s failed with error %d\n",
mntPoint, ret);
return -EIO;
}
if (procRet == EXIT_STATUS_EXEC_ERROR) {
fprintf(stderr, "FUSE_TEST: fuserMount probably could not be executed\n");
return -EIO;
}
/* fork and exec the fuse_dfs process */
*pid = fork();
if (*pid < 0) {
ret = errno;
fprintf(stderr, "FUSE_TEST: spawnFuseServer: failed to fork: "
"error %d: %s\n", ret, strerror(ret));
return -ret;
} else if (*pid == 0) {
snprintf(portOpt, sizeof(portOpt), "-oport=%d",
nmdGetNameNodePort(cluster));
if (execl(fusePath, fusePath, "-obig_writes", "-oserver=hdfs://localhost",
portOpt, "-onopermissions", "-ononempty", "-oinitchecks",
mntPoint, "-f", NULL)) {
ret = errno;
fprintf(stderr, "FUSE_TEST: spawnFuseServer: failed to execv %s: "
"error %d: %s\n", fusePath, ret, strerror(ret));
exit(EXIT_STATUS_EXEC_ERROR);
}
}
return 0;
}
/**
* Test that we can start up fuse_dfs and do some stuff.
*/
int main(int argc, char **argv)
{
int ret, pret, status;
pid_t fusePid;
const char *mntPoint;
char mntTmp[PATH_MAX] = "";
struct NativeMiniDfsCluster* tlhCluster;
struct NativeMiniDfsConf conf = {
.doFormat = 1,
};
mntPoint = getenv("TLH_FUSE_MNT_POINT");
if (!mntPoint) {
if (createTempDir(mntTmp, sizeof(mntTmp), 0755)) {
fprintf(stderr, "FUSE_TEST: failed to create temporary directory for "
"fuse mount point.\n");
ret = EXIT_FAILURE;
goto done;
}
fprintf(stderr, "FUSE_TEST: creating mount point at '%s'\n", mntTmp);
mntPoint = mntTmp;
}
if (verifyFuseWorkload()) {
fprintf(stderr, "FUSE_TEST: failed to verify fuse workload on "
"local FS.\n");
ret = EXIT_FAILURE;
goto done_rmdir;
}
tlhCluster = nmdCreate(&conf);
if (!tlhCluster) {
ret = EXIT_FAILURE;
goto done_rmdir;
}
if (nmdWaitClusterUp(tlhCluster)) {
ret = EXIT_FAILURE;
goto done_nmd_shutdown;
}
ret = spawnFuseServer(argv[0], tlhCluster, mntPoint, &fusePid);
if (ret) {
fprintf(stderr, "FUSE_TEST: spawnFuseServer failed with error "
"code %d\n", ret);
ret = EXIT_FAILURE;
goto done_nmd_shutdown;
}
ret = waitForMount(mntPoint, 20);
if (ret) {
fprintf(stderr, "FUSE_TEST: waitForMount(%s) failed with error "
"code %d\n", mntPoint, ret);
cleanupFuse(mntPoint);
ret = EXIT_FAILURE;
goto done_nmd_shutdown;
}
ret = runFuseWorkload(mntPoint, "test");
if (ret) {
fprintf(stderr, "FUSE_TEST: runFuseWorkload failed with error "
"code %d\n", ret);
cleanupFuse(mntPoint);
ret = EXIT_FAILURE;
goto done_nmd_shutdown;
}
if (cleanupFuse(mntPoint)) {
fprintf(stderr, "FUSE_TEST: fuserMount -u %s failed with error "
"code %d\n", mntPoint, ret);
ret = EXIT_FAILURE;
goto done_nmd_shutdown;
}
alarm(120);
pret = waitpid(fusePid, &status, 0);
if (pret != fusePid) {
ret = errno;
fprintf(stderr, "FUSE_TEST: failed to wait for fusePid %d: "
"returned %d: error %d (%s)\n",
fusePid, pret, ret, strerror(ret));
ret = EXIT_FAILURE;
goto done_nmd_shutdown;
}
if (WIFEXITED(status)) {
ret = WEXITSTATUS(status);
if (ret) {
fprintf(stderr, "FUSE_TEST: fuse exited with failure status "
"%d!\n", ret);
ret = EXIT_FAILURE;
goto done_nmd_shutdown;
}
} else if (WIFSIGNALED(status)) {
ret = WTERMSIG(status);
if (ret != SIGTERM) {
fprintf(stderr, "FUSE_TEST: fuse exited with unexpected "
"signal %d!\n", ret);
ret = EXIT_FAILURE;
goto done_nmd_shutdown;
}
} else {
fprintf(stderr, "FUSE_TEST: fusermount exited with unknown exit type\n");
ret = EXIT_FAILURE;
goto done_nmd_shutdown;
}
ret = EXIT_SUCCESS;
done_nmd_shutdown:
EXPECT_ZERO(nmdShutdown(tlhCluster));
nmdFree(tlhCluster);
done_rmdir:
if (mntTmp[0]) {
rmdir(mntTmp);
}
done:
if (ret == EXIT_SUCCESS) {
fprintf(stderr, "FUSE_TEST: SUCCESS.\n");
} else {
fprintf(stderr, "FUSE_TEST: FAILURE!\n");
}
return ret;
}

View File

@ -78,7 +78,7 @@
do { \
int __my_ret__ = x; \
int __my_errno__ = errno; \
if (__my_ret__) { \
if (!__my_ret__) { \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
"code %d (errno: %d): got zero from %s\n", __LINE__, \
__my_ret__, __my_errno__, #x); \
@ -98,4 +98,23 @@
} \
} while (0);
#define EXPECT_INT_EQ(x, y) \
do { \
int __my_ret__ = y; \
int __my_errno__ = errno; \
if (__my_ret__ != (x)) { \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
"code %d (errno: %d): expected %d\n", \
__LINE__, __my_ret__, __my_errno__, (x)); \
return -1; \
} \
} while (0);
#define RETRY_ON_EINTR_GET_ERRNO(ret, expr) do { \
ret = expr; \
if (!ret) \
break; \
ret = -errno; \
} while (ret == -EINTR);
#endif

View File

@ -156,7 +156,7 @@ int nmdWaitClusterUp(struct NativeMiniDfsCluster *cl)
return 0;
}
int nmdGetNameNodePort(struct NativeMiniDfsCluster *cl)
int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl)
{
JNIEnv *env = getJNIEnv();
jvalue jVal;

View File

@ -76,6 +76,5 @@ void nmdFree(struct NativeMiniDfsCluster* cl);
*
* @return the port, or a negative error code
*/
int nmdGetNameNodePort(struct NativeMiniDfsCluster *cl);
int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
#endif

View File

@ -0,0 +1,154 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "util/posix_util.h"
#include <dirent.h>
#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
static pthread_mutex_t gTempdirLock = PTHREAD_MUTEX_INITIALIZER;
static int gTempdirNonce = 0;
int recursiveDeleteContents(const char *path)
{
int ret;
DIR *dp;
struct dirent *de;
char tmp[PATH_MAX];
dp = opendir(path);
if (!dp) {
ret = -errno;
fprintf(stderr, "recursiveDelete(%s) failed with error %d\n", path, ret);
return ret;
}
while (1) {
de = readdir(dp);
if (!de) {
ret = 0;
break;
}
if ((de->d_name[0] == '.') && (de->d_name[1] == '\0'))
continue;
if ((de->d_name[0] == '.') && (de->d_name[1] == '.') &&
(de->d_name[2] == '\0'))
continue;
snprintf(tmp, sizeof(tmp), "%s/%s", path, de->d_name);
ret = recursiveDelete(tmp);
if (ret)
break;
}
if (closedir(dp)) {
ret = -errno;
fprintf(stderr, "recursiveDelete(%s): closedir failed with "
"error %d\n", path, ret);
return ret;
}
return ret;
}
/*
* Simple recursive delete implementation.
* It could be optimized, but there is no need at the moment.
* TODO: use fstat, etc.
*/
int recursiveDelete(const char *path)
{
int ret;
struct stat stBuf;
ret = stat(path, &stBuf);
if (ret != 0) {
ret = -errno;
fprintf(stderr, "recursiveDelete(%s): stat failed with "
"error %d\n", path, ret);
return ret;
}
if (S_ISDIR(stBuf.st_mode)) {
ret = recursiveDeleteContents(path);
if (ret)
return ret;
ret = rmdir(path);
if (ret) {
ret = errno;
fprintf(stderr, "recursiveDelete(%s): rmdir failed with error %d\n",
path, ret);
return ret;
}
} else {
ret = unlink(path);
if (ret) {
ret = -errno;
fprintf(stderr, "recursiveDelete(%s): unlink failed with "
"error %d\n", path, ret);
return ret;
}
}
return 0;
}
int createTempDir(char *tempDir, int nameMax, int mode)
{
char tmp[PATH_MAX];
int pid, nonce;
const char *base = getenv("TMPDIR");
if (!base)
base = "/tmp";
if (base[0] != '/') {
// canonicalize non-absolute TMPDIR
if (realpath(base, tmp) == NULL) {
return -errno;
}
base = tmp;
}
pid = getpid();
pthread_mutex_lock(&gTempdirLock);
nonce = gTempdirNonce++;
pthread_mutex_unlock(&gTempdirLock);
snprintf(tempDir, nameMax, "%s/temp.%08d.%08d", base, pid, nonce);
if (mkdir(tempDir, mode) == -1) {
int ret = errno;
return -ret;
}
return 0;
}
void sleepNoSig(int sec)
{
int ret;
struct timespec req, rem;
rem.tv_sec = sec;
rem.tv_nsec = 0;
do {
req = rem;
ret = nanosleep(&req, &rem);
} while ((ret == -1) && (errno == EINTR));
if (ret == -1) {
ret = errno;
fprintf(stderr, "nanosleep error %d (%s)\n", ret, strerror(ret));
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __POSIX_UTIL_H__
#define __POSIX_UTIL_H__
/**
* Recursively delete the contents of a directory
*
* @param path directory whose contents we should delete
*
* @return 0 on success; error code otherwise
*/
int recursiveDeleteContents(const char *path);
/**
* Recursively delete a local path, using unlink or rmdir as appropriate.
*
* @param path path to delete
*
* @return 0 on success; error code otherwise
*/
int recursiveDelete(const char *path);
/**
* Get a temporary directory
*
* @param tempDir (out param) path to the temporary directory
* @param nameMax Length of the tempDir buffer
* @param mode Mode to create with
*
* @return 0 on success; error code otherwise
*/
int createTempDir(char *tempDir, int nameMax, int mode);
/**
* Sleep without using signals
*
* @param sec Number of seconds to sleep
*/
void sleepNoSig(int sec);
#endif