HDFS-4140. fuse-dfs handles open(O_TRUNC) poorly. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1423257 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-12-18 02:38:14 +00:00
parent ebcc708d78
commit a1279e68bf
6 changed files with 200 additions and 51 deletions

View File

@ -622,6 +622,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-4315. DNs with multiple BPs can have BPOfferServices fail to start
due to unsynchronized map access. (atm)
HDFS-4140. fuse-dfs handles open(O_TRUNC) poorly. (Colin Patrick McCabe
via atm)
BREAKDOWN OF HDFS-3077 SUBTASKS
HDFS-3077. Quorum-based protocol for reading and writing edit logs.

View File

@ -131,7 +131,6 @@ static enum authConf discoverAuthConf(void)
int fuseConnectInit(const char *nnUri, int port)
{
const char *timerPeriod;
int ret;
gTimerPeriod = FUSE_CONN_DEFAULT_TIMER_PERIOD;

View File

@ -24,12 +24,77 @@
#include <stdio.h>
#include <stdlib.h>
static int get_hdfs_open_flags_from_info(hdfsFS fs, const char *path,
int flags, int *outflags, const hdfsFileInfo *info);
/**
* Given a set of FUSE flags, determine the libhdfs flags we need.
*
* This is complicated by two things:
* 1. libhdfs doesn't support O_RDWR at all;
* 2. when given O_WRONLY, libhdfs will truncate the file unless O_APPEND is
* also given. In other words, there is an implicit O_TRUNC.
*
* Probably the next iteration of the libhdfs interface should not use the POSIX
* flags at all, since, as you can see, they don't really match up very closely
* to the POSIX meaning. However, for the time being, this is the API.
*
* @param fs The libhdfs object
* @param path The path we're opening
* @param flags The FUSE flags
*
* @return negative error code on failure; flags otherwise.
*/
static int64_t get_hdfs_open_flags(hdfsFS fs, const char *path, int flags)
{
int hasContent;
int64_t ret;
hdfsFileInfo *info;
if ((flags & O_ACCMODE) == O_RDONLY) {
return O_RDONLY;
}
if (flags & O_TRUNC) {
/* If we're opening for write or read/write, O_TRUNC means we should blow
* away the file which is there and create our own file.
* */
return O_WRONLY;
}
info = hdfsGetPathInfo(fs, path);
if (info) {
if (info->mSize == 0) {
// If the file has zero length, we shouldn't feel bad about blowing it
// away.
ret = O_WRONLY;
} else if ((flags & O_ACCMODE) == O_RDWR) {
// HACK: translate O_RDWR requests into O_RDONLY if the file already
// exists and has non-zero length.
ret = O_RDONLY;
} else { // O_WRONLY
// HACK: translate O_WRONLY requests into append if the file already
// exists.
ret = O_WRONLY | O_APPEND;
}
} else { // !info
if (flags & O_CREAT) {
ret = O_WRONLY;
} else {
ret = -ENOENT;
}
}
if (info) {
hdfsFreeFileInfo(info, 1);
}
return ret;
}
int dfs_open(const char *path, struct fuse_file_info *fi)
{
hdfsFS fs = NULL;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
dfs_fh *fh = NULL;
int mutexInit = 0, ret;
int mutexInit = 0, ret, flags = 0;
int64_t flagRet;
TRACE1("open", path)
@ -38,10 +103,6 @@ int dfs_open(const char *path, struct fuse_file_info *fi)
assert('/' == *path);
assert(dfs);
// 0x8000 is always passed in and hadoop doesn't like it, so killing it here
// bugbug figure out what this flag is and report problem to Hadoop JIRA
int flags = (fi->flags & 0x7FFF);
// retrieve dfs specific data
fh = (dfs_fh*)calloc(1, sizeof (dfs_fh));
if (!fh) {
@ -57,22 +118,12 @@ int dfs_open(const char *path, struct fuse_file_info *fi)
goto error;
}
fs = hdfsConnGetFs(fh->conn);
if (flags & O_RDWR) {
hdfsFileInfo *info = hdfsGetPathInfo(fs, path);
if (info == NULL) {
// File does not exist (maybe?); interpret it as a O_WRONLY
// If the actual error was something else, we'll get it again when
// we try to open the file.
flags ^= O_RDWR;
flags |= O_WRONLY;
} else {
// File exists; open this as read only.
flags ^= O_RDWR;
flags |= O_RDONLY;
}
flagRet = get_hdfs_open_flags(fs, path, fi->flags);
if (flagRet < 0) {
ret = -flagRet;
goto error;
}
flags = flagRet;
if ((fh->hdfsFH = hdfsOpenFile(fs, path, flags, 0, 0, 0)) == NULL) {
ERROR("Could not open file %s (errno=%d)", path, errno);
if (errno == 0 || errno == EINTERNAL) {
@ -91,7 +142,7 @@ int dfs_open(const char *path, struct fuse_file_info *fi)
}
mutexInit = 1;
if (fi->flags & O_WRONLY || fi->flags & O_CREAT) {
if ((flags & O_ACCMODE) == O_WRONLY) {
fh->buf = NULL;
} else {
assert(dfs->rdbuffer_size > 0);

View File

@ -98,7 +98,7 @@ static void dfsPrintOptions(FILE *fp, const struct options *o)
o->attribute_timeout, o->rdbuffer_size, o->direct_io);
}
void *dfs_init(void)
void *dfs_init(struct fuse_conn_info *conn)
{
int ret;
@ -143,6 +143,45 @@ void *dfs_init(void)
exit(EXIT_FAILURE);
}
}
#ifdef FUSE_CAP_ATOMIC_O_TRUNC
// If FUSE_CAP_ATOMIC_O_TRUNC is set, open("foo", O_CREAT | O_TRUNC) will
// result in dfs_open being called with O_TRUNC.
//
// If this capability is not present, fuse will try to use multiple
// operation to "simulate" open(O_TRUNC). This doesn't work very well with
// HDFS.
// Unfortunately, this capability is only implemented on Linux 2.6.29 or so.
// See HDFS-4140 for details.
if (conn->capable & FUSE_CAP_ATOMIC_O_TRUNC) {
conn->want |= FUSE_CAP_ATOMIC_O_TRUNC;
}
#endif
#ifdef FUSE_CAP_ASYNC_READ
// We're OK with doing reads at the same time as writes.
if (conn->capable & FUSE_CAP_ASYNC_READ) {
conn->want |= FUSE_CAP_ASYNC_READ;
}
#endif
#ifdef FUSE_CAP_BIG_WRITES
// Yes, we can read more than 4kb at a time. In fact, please do!
if (conn->capable & FUSE_CAP_BIG_WRITES) {
conn->want |= FUSE_CAP_BIG_WRITES;
}
#endif
#ifdef FUSE_CAP_DONT_MASK
if ((options.no_permissions) && (conn->capable & FUSE_CAP_DONT_MASK)) {
// If we're handing permissions ourselves, we don't want the kernel
// applying its own umask. HDFS already implements its own per-user
// umasks! Sadly, this only actually does something on kernels 2.6.31 and
// later.
conn->want |= FUSE_CAP_DONT_MASK;
}
#endif
return (void*)dfs;
}

View File

@ -19,13 +19,15 @@
#ifndef __FUSE_INIT_H__
#define __FUSE_INIT_H__
struct fuse_conn_info;
/**
* These are responsible for initializing connections to dfs and internal
* data structures and then freeing them.
* i.e., what happens on mount and unmount.
*
*/
void *dfs_init();
void *dfs_init(struct fuse_conn_info *conn);
void dfs_destroy (void *ptr);
#endif

View File

@ -16,6 +16,8 @@
* limitations under the License.
*/
#define FUSE_USE_VERSION 26
#include "fuse-dfs/test/fuse_workload.h"
#include "libhdfs/expect.h"
#include "util/posix_util.h"
@ -23,6 +25,7 @@
#include <dirent.h>
#include <errno.h>
#include <fcntl.h>
#include <fuse.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
@ -138,13 +141,89 @@ static int safeRead(int fd, void *buf, int c)
return amt;
}
/* Bug: HDFS-2551.
* When a program writes a file, closes it, and immediately re-opens it,
* it might not appear to have the correct length. This is because FUSE
* invokes the release() callback asynchronously.
*
* To work around this, we keep retrying until the file length is what we
* expect.
*/
static int closeWorkaroundHdfs2551(int fd, const char *path, off_t expectedSize)
{
int ret, try;
struct stat stBuf;
RETRY_ON_EINTR_GET_ERRNO(ret, close(fd));
EXPECT_ZERO(ret);
for (try = 0; try < MAX_TRIES; try++) {
EXPECT_ZERO(stat(path, &stBuf));
EXPECT_NONZERO(S_ISREG(stBuf.st_mode));
if (stBuf.st_size == expectedSize) {
return 0;
}
sleepNoSig(1);
}
fprintf(stderr, "FUSE_WORKLOAD: error: expected file %s to have length "
"%lld; instead, it had length %lld\n",
path, (long long)expectedSize, (long long)stBuf.st_size);
return -EIO;
}
#ifdef FUSE_CAP_ATOMIC_O_TRUNC
/**
* Test that we can create a file, write some contents to it, close that file,
* and then successfully re-open with O_TRUNC.
*/
static int testOpenTrunc(const char *base)
{
int fd, err;
char path[PATH_MAX];
const char * const SAMPLE1 = "this is the first file that we wrote.";
const char * const SAMPLE2 = "this is the second file that we wrote. "
"It's #2!";
snprintf(path, sizeof(path), "%s/trunc.txt", base);
fd = open(path, O_CREAT | O_TRUNC | O_WRONLY, 0644);
if (fd < 0) {
err = errno;
fprintf(stderr, "TEST_ERROR: testOpenTrunc(%s): first open "
"failed with error %d\n", path, err);
return -err;
}
EXPECT_ZERO(safeWrite(fd, SAMPLE1, strlen(SAMPLE1)));
EXPECT_ZERO(closeWorkaroundHdfs2551(fd, path, strlen(SAMPLE1)));
fd = open(path, O_CREAT | O_TRUNC | O_WRONLY, 0644);
if (fd < 0) {
err = errno;
fprintf(stderr, "TEST_ERROR: testOpenTrunc(%s): second open "
"failed with error %d\n", path, err);
return -err;
}
EXPECT_ZERO(safeWrite(fd, SAMPLE2, strlen(SAMPLE2)));
EXPECT_ZERO(closeWorkaroundHdfs2551(fd, path, strlen(SAMPLE2)));
return 0;
}
#else
static int testOpenTrunc(const char *base)
{
fprintf(stderr, "FUSE_WORKLOAD: We lack FUSE_CAP_ATOMIC_O_TRUNC support. "
"Not testing open(O_TRUNC).\n");
return 0;
}
#endif
int runFuseWorkloadImpl(const char *root, const char *pcomp,
struct fileCtx *ctx)
{
char base[PATH_MAX], tmp[PATH_MAX], *tmpBuf;
char src[PATH_MAX], dst[PATH_MAX];
struct stat stBuf;
int ret, i, try;
int ret, i;
struct utimbuf tbuf;
struct statvfs stvBuf;
@ -241,34 +320,9 @@ int runFuseWorkloadImpl(const char *root, const char *pcomp,
EXPECT_ZERO(safeWrite(ctx[i].fd, ctx[i].str, ctx[i].strLen));
}
for (i = 0; i < NUM_FILE_CTX; i++) {
RETRY_ON_EINTR_GET_ERRNO(ret, close(ctx[i].fd));
EXPECT_ZERO(ret);
EXPECT_ZERO(closeWorkaroundHdfs2551(ctx[i].fd, ctx[i].path, ctx[i].strLen));
ctx[i].fd = -1;
}
for (i = 0; i < NUM_FILE_CTX; i++) {
/* Bug: HDFS-2551.
* When a program writes a file, closes it, and immediately re-opens it,
* it might not appear to have the correct length. This is because FUSE
* invokes the release() callback asynchronously.
*
* To work around this, we keep retrying until the file length is what we
* expect.
*/
for (try = 0; try < MAX_TRIES; try++) {
EXPECT_ZERO(stat(ctx[i].path, &stBuf));
EXPECT_NONZERO(S_ISREG(stBuf.st_mode));
if (ctx[i].strLen == stBuf.st_size) {
break;
}
sleepNoSig(1);
}
if (try == MAX_TRIES) {
fprintf(stderr, "FUSE_WORKLOAD: error: expected file %s to have length "
"%d; instead, it had length %lld\n",
ctx[i].path, ctx[i].strLen, (long long)stBuf.st_size);
return -EIO;
}
}
for (i = 0; i < NUM_FILE_CTX; i++) {
ctx[i].fd = open(ctx[i].path, O_RDONLY);
if (ctx[i].fd < 0) {
@ -308,6 +362,7 @@ int runFuseWorkloadImpl(const char *root, const char *pcomp,
for (i = 0; i < NUM_FILE_CTX; i++) {
free(ctx[i].path);
}
EXPECT_ZERO(testOpenTrunc(base));
EXPECT_ZERO(recursiveDelete(base));
return 0;
}