HDFS-3608. fuse_dfs: detect changes in UID ticket cache. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1363906 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-07-20 19:17:50 +00:00
parent 75a419f15b
commit 1721ae0511
30 changed files with 1740 additions and 336 deletions

View File

@ -364,6 +364,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3597. SNN fails to start after DFS upgrade. (Andy Isaacson via todd)
HDFS-3608. HDFS-3608. fuse_dfs: detect changes in UID ticket cache. (Colin
Patrick McCabe via atm)
BREAKDOWN OF HDFS-3042 SUBTASKS
HDFS-2185. HDFS portion of ZK-based FailoverController (todd)

View File

@ -87,6 +87,7 @@ include_directories(
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_BINARY_DIR}
${JNI_INCLUDE_DIRS}
main/native
main/native/libhdfs
)

View File

@ -69,5 +69,6 @@ IF(FUSE_FOUND)
${JAVA_JVM_LIBRARY}
hdfs
m
pthread
)
ENDIF(FUSE_FOUND)

View File

@ -16,17 +16,38 @@
* limitations under the License.
*/
#include "hdfs.h"
#include "fuse_dfs.h"
#include "fuse_connect.h"
#include "fuse_dfs.h"
#include "fuse_users.h"
#include "libhdfs/hdfs.h"
#include "util/tree.h"
#include <inttypes.h>
#include <limits.h>
#include <poll.h>
#include <search.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/types.h>
#include <utime.h>
#define FUSE_CONN_DEFAULT_TIMER_PERIOD 5
#define FUSE_CONN_DEFAULT_EXPIRY_PERIOD (5 * 60)
#define HADOOP_SECURITY_AUTHENTICATION "hadoop.security.authentication"
#define HADOOP_FUSE_CONNECTION_TIMEOUT "hadoop.fuse.connection.timeout"
#define HADOOP_FUSE_TIMER_PERIOD "hadoop.fuse.timer.period"
/** Length of the buffer needed by asctime_r */
#define TIME_STR_LEN 26
struct hdfsConn;
static int hdfsConnCompare(const struct hdfsConn *a, const struct hdfsConn *b);
static void hdfsConnExpiry(void);
static void* hdfsConnExpiryThread(void *v);
RB_HEAD(hdfsConnTree, hdfsConn);
enum authConf {
AUTH_CONF_UNKNOWN,
@ -34,58 +55,54 @@ enum authConf {
AUTH_CONF_OTHER,
};
#define MAX_ELEMENTS (16 * 1024)
static struct hsearch_data *fsTable = NULL;
static enum authConf hdfsAuthConf = AUTH_CONF_UNKNOWN;
static pthread_mutex_t tableMutex = PTHREAD_MUTEX_INITIALIZER;
struct hdfsConn {
RB_ENTRY(hdfsConn) entry;
/** How many threads are currently using this hdfsConnection object */
int64_t refcnt;
/** The username used to make this connection. Dynamically allocated. */
char *usrname;
/** Kerberos ticket cache path, or NULL if this is not a kerberized
* connection. Dynamically allocated. */
char *kpath;
/** mtime of the kpath, if the kpath is non-NULL */
time_t kPathMtime;
/** nanosecond component of the mtime of the kpath, if the kpath is non-NULL */
long kPathMtimeNs;
/** The cached libhdfs fs instance */
hdfsFS fs;
/** Nonzero if this hdfs connection needs to be closed as soon as possible.
* If this is true, the connection has been removed from the tree. */
int condemned;
/** Number of times we should run the expiration timer on this connection
* before removing it. */
int expirationCount;
};
/*
* Allocate a hash table for fs handles. Returns 0 on success,
* -1 on failure.
*/
int allocFsTable(void) {
assert(NULL == fsTable);
fsTable = calloc(1, sizeof(struct hsearch_data));
if (0 == hcreate_r(MAX_ELEMENTS, fsTable)) {
ERROR("Unable to initialize connection table");
return -1;
}
return 0;
}
RB_GENERATE(hdfsConnTree, hdfsConn, entry, hdfsConnCompare);
/*
* Find a fs handle for the given key. Returns a fs handle,
* or NULL if there is no fs for the given key.
*/
static hdfsFS findFs(char *key) {
ENTRY entry;
ENTRY *entryP = NULL;
entry.key = key;
if (0 == hsearch_r(entry, FIND, &entryP, fsTable)) {
return NULL;
}
assert(NULL != entryP->data);
return (hdfsFS)entryP->data;
}
/** Current cached libhdfs connections */
static struct hdfsConnTree gConnTree;
/*
* Insert the given fs handle into the table.
* Returns 0 on success, -1 on failure.
*/
static int insertFs(char *key, hdfsFS fs) {
ENTRY entry;
ENTRY *entryP = NULL;
assert(NULL != fs);
entry.key = strdup(key);
if (entry.key == NULL) {
return -1;
}
entry.data = (void*)fs;
if (0 == hsearch_r(entry, ENTER, &entryP, fsTable)) {
return -1;
}
return 0;
}
/** The URI used to make our connections. Dynamically allocated. */
static char *gUri;
/** The port used to make our connections, or 0. */
static int gPort;
/** Lock which protects gConnTree and gConnectTimer->active */
static pthread_mutex_t gConnMutex;
/** Type of authentication configured */
static enum authConf gHdfsAuthConf;
/** FUSE connection timer expiration period */
static int32_t gTimerPeriod;
/** FUSE connection expiry period */
static int32_t gExpiryPeriod;
/** FUSE timer expiration thread */
static pthread_t gTimerThread;
/**
* Find out what type of authentication the system administrator
@ -99,9 +116,11 @@ static enum authConf discoverAuthConf(void)
char *val = NULL;
enum authConf authConf;
ret = hdfsConfGet(HADOOP_SECURITY_AUTHENTICATION, &val);
ret = hdfsConfGetStr(HADOOP_SECURITY_AUTHENTICATION, &val);
if (ret)
authConf = AUTH_CONF_UNKNOWN;
else if (!val)
authConf = AUTH_CONF_OTHER;
else if (!strcmp(val, "kerberos"))
authConf = AUTH_CONF_KERBEROS;
else
@ -110,6 +129,236 @@ static enum authConf discoverAuthConf(void)
return authConf;
}
int fuseConnectInit(const char *nnUri, int port)
{
const char *timerPeriod;
int ret;
gTimerPeriod = FUSE_CONN_DEFAULT_TIMER_PERIOD;
ret = hdfsConfGetInt(HADOOP_FUSE_CONNECTION_TIMEOUT, &gTimerPeriod);
if (ret) {
fprintf(stderr, "Unable to determine the configured value for %s.",
HADOOP_FUSE_TIMER_PERIOD);
return -EINVAL;
}
if (gTimerPeriod < 1) {
fprintf(stderr, "Invalid value %d given for %s.\n",
gTimerPeriod, HADOOP_FUSE_TIMER_PERIOD);
return -EINVAL;
}
gExpiryPeriod = FUSE_CONN_DEFAULT_EXPIRY_PERIOD;
ret = hdfsConfGetInt(HADOOP_FUSE_CONNECTION_TIMEOUT, &gExpiryPeriod);
if (ret) {
fprintf(stderr, "Unable to determine the configured value for %s.",
HADOOP_FUSE_CONNECTION_TIMEOUT);
return -EINVAL;
}
if (gExpiryPeriod < 1) {
fprintf(stderr, "Invalid value %d given for %s.\n",
gExpiryPeriod, HADOOP_FUSE_CONNECTION_TIMEOUT);
return -EINVAL;
}
gHdfsAuthConf = discoverAuthConf();
if (gHdfsAuthConf == AUTH_CONF_UNKNOWN) {
fprintf(stderr, "Unable to determine the configured value for %s.",
HADOOP_SECURITY_AUTHENTICATION);
return -EINVAL;
}
gPort = port;
gUri = strdup(nnUri);
if (!gUri) {
fprintf(stderr, "fuseConnectInit: OOM allocting nnUri\n");
return -ENOMEM;
}
ret = pthread_mutex_init(&gConnMutex, NULL);
if (ret) {
free(gUri);
fprintf(stderr, "fuseConnectInit: pthread_mutex_init failed with error %d\n",
ret);
return -ret;
}
RB_INIT(&gConnTree);
ret = pthread_create(&gTimerThread, NULL, hdfsConnExpiryThread, NULL);
if (ret) {
free(gUri);
pthread_mutex_destroy(&gConnMutex);
fprintf(stderr, "fuseConnectInit: pthread_create failed with error %d\n",
ret);
return -ret;
}
fprintf(stderr, "fuseConnectInit: initialized with timer period %d, "
"expiry period %d\n", gTimerPeriod, gExpiryPeriod);
return 0;
}
/**
* Compare two libhdfs connections by username
*
* @param a The first libhdfs connection
* @param b The second libhdfs connection
*
* @return -1, 0, or 1 depending on a < b, a ==b, a > b
*/
static int hdfsConnCompare(const struct hdfsConn *a, const struct hdfsConn *b)
{
return strcmp(a->usrname, b->usrname);
}
/**
* Find a libhdfs connection by username
*
* @param usrname The username to look up
*
* @return The connection, or NULL if none could be found
*/
static struct hdfsConn* hdfsConnFind(const char *usrname)
{
struct hdfsConn exemplar;
memset(&exemplar, 0, sizeof(exemplar));
exemplar.usrname = (char*)usrname;
return RB_FIND(hdfsConnTree, &gConnTree, &exemplar);
}
/**
* Free the resource associated with a libhdfs connection.
*
* You must remove the connection from the tree before calling this function.
*
* @param conn The libhdfs connection
*/
static void hdfsConnFree(struct hdfsConn *conn)
{
int ret;
ret = hdfsDisconnect(conn->fs);
if (ret) {
fprintf(stderr, "hdfsConnFree(username=%s): "
"hdfsDisconnect failed with error %d\n",
(conn->usrname ? conn->usrname : "(null)"), ret);
}
free(conn->usrname);
free(conn->kpath);
free(conn);
}
/**
* Convert a time_t to a string.
*
* @param sec time in seconds since the epoch
* @param buf (out param) output buffer
* @param bufLen length of output buffer
*
* @return 0 on success; ENAMETOOLONG if the provided buffer was
* too short
*/
static int timeToStr(time_t sec, char *buf, size_t bufLen)
{
struct tm tm, *out;
size_t l;
if (bufLen < TIME_STR_LEN) {
return -ENAMETOOLONG;
}
out = localtime_r(&sec, &tm);
asctime_r(out, buf);
// strip trailing newline
l = strlen(buf);
if (l != 0)
buf[l - 1] = '\0';
return 0;
}
/**
* Check an HDFS connection's Kerberos path.
*
* If the mtime of the Kerberos ticket cache file has changed since we first
* opened the connection, mark the connection as condemned and remove it from
* the hdfs connection tree.
*
* @param conn The HDFS connection
*/
static int hdfsConnCheckKpath(const struct hdfsConn *conn)
{
int ret;
struct stat st;
char prevTimeBuf[TIME_STR_LEN], newTimeBuf[TIME_STR_LEN];
if (stat(conn->kpath, &st) < 0) {
ret = errno;
if (ret == ENOENT) {
fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): the kerberos "
"ticket cache file '%s' has disappeared. Condemning the "
"connection.\n", conn->usrname, conn->kpath);
} else {
fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): stat(%s) "
"failed with error code %d. Pessimistically condemning the "
"connection.\n", conn->usrname, conn->kpath, ret);
}
return -ret;
}
if ((st.st_mtim.tv_sec != conn->kPathMtime) ||
(st.st_mtim.tv_nsec != conn->kPathMtimeNs)) {
timeToStr(conn->kPathMtime, prevTimeBuf, sizeof(prevTimeBuf));
timeToStr(st.st_mtim.tv_sec, newTimeBuf, sizeof(newTimeBuf));
fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): mtime on '%s' "
"has changed from '%s' to '%s'. Condemning the connection "
"because our cached Kerberos credentials have probably "
"changed.\n", conn->usrname, conn->kpath, prevTimeBuf, newTimeBuf);
return -EINTERNAL;
}
return 0;
}
/**
* Cache expiration logic.
*
* This function is called periodically by the cache expiration thread. For
* each FUSE connection not currently in use (refcnt == 0) it will decrement the
* expirationCount for that connection. Once the expirationCount reaches 0 for
* a connection, it can be garbage collected.
*
* We also check to see if the Kerberos credentials have changed. If so, the
* connecton is immediately condemned, even if it is currently in use.
*/
static void hdfsConnExpiry(void)
{
struct hdfsConn *conn, *tmpConn;
pthread_mutex_lock(&gConnMutex);
RB_FOREACH_SAFE(conn, hdfsConnTree, &gConnTree, tmpConn) {
if (conn->kpath) {
if (hdfsConnCheckKpath(conn)) {
conn->condemned = 1;
RB_REMOVE(hdfsConnTree, &gConnTree, conn);
if (conn->refcnt == 0) {
/* If the connection is not in use by any threads, delete it
* immediately. If it is still in use by some threads, the last
* thread using it will clean it up later inside hdfsConnRelease. */
hdfsConnFree(conn);
continue;
}
}
}
if (conn->refcnt == 0) {
/* If the connection is not currently in use by a thread, check to see if
* it ought to be removed because it's too old. */
conn->expirationCount--;
if (conn->expirationCount <= 0) {
if (conn->condemned) {
fprintf(stderr, "hdfsConnExpiry: LOGIC ERROR: condemned connection "
"as %s is still in the tree!\n", conn->usrname);
}
fprintf(stderr, "hdfsConnExpiry: freeing and removing connection as "
"%s because it's now too old.\n", conn->usrname);
RB_REMOVE(hdfsConnTree, &gConnTree, conn);
hdfsConnFree(conn);
}
}
}
pthread_mutex_unlock(&gConnMutex);
}
/**
* Find the Kerberos ticket cache path.
*
@ -129,9 +378,9 @@ static enum authConf discoverAuthConf(void)
* @param path (out param) the path to the ticket cache file
* @param pathLen length of the path buffer
*/
static void findKerbTicketCachePath(char *path, size_t pathLen)
static void findKerbTicketCachePath(struct fuse_context *ctx,
char *path, size_t pathLen)
{
struct fuse_context *ctx = fuse_get_context();
FILE *fp = NULL;
static const char * const KRB5CCNAME = "\0KRB5CCNAME=";
int c = '\0', pathIdx = 0, keyIdx = 0;
@ -168,72 +417,213 @@ done:
}
}
/*
* Connect to the NN as the current user/group.
* Returns a fs handle on success, or NULL on failure.
/**
* Create a new libhdfs connection.
*
* @param usrname Username to use for the new connection
* @param ctx FUSE context to use for the new connection
* @param out (out param) the new libhdfs connection
*
* @return 0 on success; error code otherwise
*/
hdfsFS doConnectAsUser(const char *nn_uri, int nn_port) {
struct hdfsBuilder *bld;
uid_t uid = fuse_get_context()->uid;
char *user = getUsername(uid);
char kpath[PATH_MAX];
static int fuseNewConnect(const char *usrname, struct fuse_context *ctx,
struct hdfsConn **out)
{
struct hdfsBuilder *bld = NULL;
char kpath[PATH_MAX] = { 0 };
struct hdfsConn *conn = NULL;
int ret;
hdfsFS fs = NULL;
if (NULL == user) {
goto done;
}
struct stat st;
ret = pthread_mutex_lock(&tableMutex);
assert(0 == ret);
fs = findFs(user);
if (NULL == fs) {
if (hdfsAuthConf == AUTH_CONF_UNKNOWN) {
hdfsAuthConf = discoverAuthConf();
if (hdfsAuthConf == AUTH_CONF_UNKNOWN) {
ERROR("Unable to determine the configured value for %s.",
HADOOP_SECURITY_AUTHENTICATION);
goto done;
}
conn = calloc(1, sizeof(struct hdfsConn));
if (!conn) {
fprintf(stderr, "fuseNewConnect: OOM allocating struct hdfsConn\n");
ret = -ENOMEM;
goto error;
}
bld = hdfsNewBuilder();
if (!bld) {
ERROR("Unable to create hdfs builder");
goto done;
fprintf(stderr, "Unable to create hdfs builder\n");
ret = -ENOMEM;
goto error;
}
hdfsBuilderSetForceNewInstance(bld);
hdfsBuilderSetNameNode(bld, nn_uri);
if (nn_port) {
hdfsBuilderSetNameNodePort(bld, nn_port);
}
hdfsBuilderSetUserName(bld, user);
if (hdfsAuthConf == AUTH_CONF_KERBEROS) {
findKerbTicketCachePath(kpath, sizeof(kpath));
hdfsBuilderSetKerbTicketCachePath(bld, kpath);
}
fs = hdfsBuilderConnect(bld);
if (NULL == fs) {
int err = errno;
ERROR("Unable to create fs for user %s: error code %d", user, err);
goto done;
}
if (-1 == insertFs(user, fs)) {
ERROR("Unable to cache fs for user %s", user);
}
}
done:
ret = pthread_mutex_unlock(&tableMutex);
assert(0 == ret);
free(user);
return fs;
}
/*
* We currently cache a fs handle per-user in this module rather
* than use the FileSystem cache in the java client. Therefore
* we do not disconnect the fs handle here.
/* We always want to get a new FileSystem instance here-- that's why we call
* hdfsBuilderSetForceNewInstance. Otherwise the 'cache condemnation' logic
* in hdfsConnExpiry will not work correctly, since FileSystem might re-use the
* existing cached connection which we wanted to get rid of.
*/
int doDisconnect(hdfsFS fs) {
hdfsBuilderSetForceNewInstance(bld);
hdfsBuilderSetNameNode(bld, gUri);
if (gPort) {
hdfsBuilderSetNameNodePort(bld, gPort);
}
hdfsBuilderSetUserName(bld, usrname);
if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
findKerbTicketCachePath(ctx, kpath, sizeof(kpath));
if (stat(kpath, &st) < 0) {
fprintf(stderr, "fuseNewConnect: failed to find Kerberos ticket cache "
"file '%s'. Did you remember to kinit for UID %d?\n",
kpath, ctx->uid);
ret = -EACCES;
goto error;
}
conn->kPathMtime = st.st_mtim.tv_sec;
conn->kPathMtimeNs = st.st_mtim.tv_nsec;
hdfsBuilderSetKerbTicketCachePath(bld, kpath);
conn->kpath = strdup(kpath);
if (!conn->kpath) {
fprintf(stderr, "fuseNewConnect: OOM allocating kpath\n");
ret = -ENOMEM;
goto error;
}
}
conn->usrname = strdup(usrname);
if (!conn->usrname) {
fprintf(stderr, "fuseNewConnect: OOM allocating usrname\n");
ret = -ENOMEM;
goto error;
}
conn->fs = hdfsBuilderConnect(bld);
bld = NULL;
if (!conn->fs) {
ret = errno;
fprintf(stderr, "fuseNewConnect(usrname=%s): Unable to create fs: "
"error code %d\n", usrname, ret);
goto error;
}
RB_INSERT(hdfsConnTree, &gConnTree, conn);
*out = conn;
return 0;
error:
if (bld) {
hdfsFreeBuilder(bld);
}
if (conn) {
free(conn->kpath);
free(conn->usrname);
free(conn);
}
return ret;
}
int fuseConnect(const char *usrname, struct fuse_context *ctx,
struct hdfsConn **out)
{
int ret;
struct hdfsConn* conn;
pthread_mutex_lock(&gConnMutex);
conn = hdfsConnFind(usrname);
if (!conn) {
ret = fuseNewConnect(usrname, ctx, &conn);
if (ret) {
pthread_mutex_unlock(&gConnMutex);
fprintf(stderr, "fuseConnect(usrname=%s): fuseNewConnect failed with "
"error code %d\n", usrname, ret);
return ret;
}
}
conn->refcnt++;
conn->expirationCount = (gExpiryPeriod + gTimerPeriod - 1) / gTimerPeriod;
if (conn->expirationCount < 2)
conn->expirationCount = 2;
pthread_mutex_unlock(&gConnMutex);
*out = conn;
return 0;
}
int fuseConnectAsThreadUid(struct hdfsConn **conn)
{
struct fuse_context *ctx;
char *usrname;
int ret;
ctx = fuse_get_context();
usrname = getUsername(ctx->uid);
ret = fuseConnect(usrname, ctx, conn);
free(usrname);
return ret;
}
int fuseConnectTest(void)
{
int ret;
struct hdfsConn *conn;
if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
// TODO: call some method which can tell us whether the FS exists. In order
// to implement this, we have to add a method to FileSystem in order to do
// this without valid Kerberos authentication. See HDFS-3674 for details.
return 0;
}
ret = fuseNewConnect("root", NULL, &conn);
if (ret) {
fprintf(stderr, "fuseConnectTest failed with error code %d\n", ret);
return ret;
}
hdfsConnRelease(conn);
return 0;
}
struct hdfs_internal* hdfsConnGetFs(struct hdfsConn *conn)
{
return conn->fs;
}
void hdfsConnRelease(struct hdfsConn *conn)
{
pthread_mutex_lock(&gConnMutex);
conn->refcnt--;
if ((conn->refcnt == 0) && (conn->condemned)) {
fprintf(stderr, "hdfsConnRelease(usrname=%s): freeing condemend FS!\n",
conn->usrname);
/* Notice that we're not removing the connection from gConnTree here.
* If the connection is condemned, it must have already been removed from
* the tree, so that no other threads start using it.
*/
hdfsConnFree(conn);
}
pthread_mutex_unlock(&gConnMutex);
}
/**
* Get the monotonic time.
*
* Unlike the wall-clock time, monotonic time only ever goes forward. If the
* user adjusts the time, the monotonic time will not be affected.
*
* @return The monotonic time
*/
static time_t getMonotonicTime(void)
{
int res;
struct timespec ts;
res = clock_gettime(CLOCK_MONOTONIC, &ts);
if (res)
abort();
return ts.tv_sec;
}
/**
* FUSE connection expiration thread
*
*/
static void* hdfsConnExpiryThread(void *v)
{
time_t nextTime, curTime;
int waitTime;
nextTime = getMonotonicTime() + gTimerPeriod;
while (1) {
curTime = getMonotonicTime();
if (curTime >= nextTime) {
hdfsConnExpiry();
nextTime = curTime + gTimerPeriod;
}
waitTime = (nextTime - curTime) * 1000;
poll(NULL, 0, waitTime);
}
return NULL;
}

View File

@ -19,10 +19,72 @@
#ifndef __FUSE_CONNECT_H__
#define __FUSE_CONNECT_H__
#include "fuse_dfs.h"
struct fuse_context;
struct hdfsConn;
struct hdfs_internal;
hdfsFS doConnectAsUser(const char *nn_uri, int nn_port);
int doDisconnect(hdfsFS fs);
int allocFsTable(void);
/**
* Initialize the fuse connection subsystem.
*
* This must be called before any of the other functions in this module.
*
* @param nnUri The NameNode URI
* @param port The NameNode port
*
* @return 0 on success; error code otherwise
*/
int fuseConnectInit(const char *nnUri, int port);
/**
* Get a libhdfs connection.
*
* If there is an existing connection, it will be reused. If not, a new one
* will be created.
*
* You must call hdfsConnRelease on the connection you get back!
*
* @param usrname The username to use
* @param ctx The FUSE context to use (contains UID, PID of requestor)
* @param conn (out param) The HDFS connection
*
* @return 0 on success; error code otherwise
*/
int fuseConnect(const char *usrname, struct fuse_context *ctx,
struct hdfsConn **out);
/**
* Get a libhdfs connection.
*
* The same as fuseConnect, except the username will be determined from the FUSE
* thread context.
*
* @param conn (out param) The HDFS connection
*
* @return 0 on success; error code otherwise
*/
int fuseConnectAsThreadUid(struct hdfsConn **conn);
/**
* Test whether we can connect to the HDFS cluster
*
* @return 0 on success; error code otherwise
*/
int fuseConnectTest(void);
/**
* Get the hdfsFS associated with an hdfsConn.
*
* @param conn The hdfsConn
*
* @return the hdfsFS
*/
struct hdfs_internal* hdfsConnGetFs(struct hdfsConn *conn);
/**
* Release an hdfsConn when we're done with it.
*
* @param conn The hdfsConn
*/
void hdfsConnRelease(struct hdfsConn *conn);
#endif

View File

@ -31,8 +31,6 @@
//
typedef struct dfs_context_struct {
int debug;
char *nn_uri;
int nn_port;
int read_only;
int usetrash;
int direct_io;

View File

@ -65,8 +65,19 @@ static struct fuse_operations dfs_oper = {
.truncate = dfs_truncate,
};
static void print_env_vars(void)
{
const char *cp = getenv("CLASSPATH");
const char *ld = getenv("LD_LIBRARY_PATH");
fprintf(stderr, "LD_LIBRARY_PATH=%s",ld == NULL ? "NULL" : ld);
fprintf(stderr, "CLASSPATH=%s",cp == NULL ? "NULL" : cp);
}
int main(int argc, char *argv[])
{
int ret;
umask(0);
extern const char *program;
@ -106,24 +117,22 @@ int main(int argc, char *argv[])
exit(0);
}
// Check connection as root
if (options.initchecks == 1) {
hdfsFS tempFS = hdfsConnectAsUser(options.nn_uri, options.nn_port, "root");
if (NULL == tempFS) {
const char *cp = getenv("CLASSPATH");
const char *ld = getenv("LD_LIBRARY_PATH");
ERROR("FATAL: misconfiguration - cannot connect to HDFS");
ERROR("LD_LIBRARY_PATH=%s",ld == NULL ? "NULL" : ld);
ERROR("CLASSPATH=%s",cp == NULL ? "NULL" : cp);
exit(1);
ret = fuseConnectInit(options.nn_uri, options.nn_port);
if (ret) {
ERROR("FATAL: dfs_init: fuseConnInit failed with error %d!", ret);
print_env_vars();
exit(EXIT_FAILURE);
}
if (doDisconnect(tempFS)) {
ERROR("FATAL: unable to disconnect from test filesystem.");
exit(1);
if (options.initchecks == 1) {
ret = fuseConnectTest();
if (ret) {
ERROR("FATAL: dfs_init: fuseConnTest failed with error %d!", ret);
print_env_vars();
exit(EXIT_FAILURE);
}
}
int ret = fuse_main(args.argc, args.argv, &dfs_oper, NULL);
ret = fuse_main(args.argc, args.argv, &dfs_oper, NULL);
fuse_opt_free_args(&args);
return ret;
}

View File

@ -22,6 +22,8 @@
#include <hdfs.h>
#include <pthread.h>
struct hdfsConn;
/**
*
* dfs_fh_struct is passed around for open files. Fuse provides a hook (the context)
@ -34,10 +36,10 @@
*/
typedef struct dfs_fh_struct {
hdfsFile hdfsFH;
struct hdfsConn *conn;
char *buf;
tSize bufferSize; //what is the size of the buffer we have
off_t buffersStartOffset; //where the buffer starts in the file
hdfsFS fs; // for reads/writes need to access as the real user
pthread_mutex_t mutex;
} dfs_fh;

View File

@ -23,6 +23,8 @@
int dfs_chmod(const char *path, mode_t mode)
{
struct hdfsConn *conn = NULL;
hdfsFS fs;
TRACE1("chmod", path)
int ret = 0;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
@ -31,22 +33,24 @@ int dfs_chmod(const char *path, mode_t mode)
assert(dfs);
assert('/' == *path);
hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
if (userFS == NULL) {
ERROR("Could not connect to HDFS");
ret = fuseConnectAsThreadUid(&conn);
if (ret) {
fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
"connection! error %d.\n", ret);
ret = -EIO;
goto cleanup;
}
fs = hdfsConnGetFs(conn);
if (hdfsChmod(userFS, path, (short)mode)) {
if (hdfsChmod(fs, path, (short)mode)) {
ERROR("Could not chmod %s to %d", path, (int)mode);
ret = (errno > 0) ? -errno : -EIO;
goto cleanup;
}
cleanup:
if (doDisconnect(userFS)) {
ret = -EIO;
if (conn) {
hdfsConnRelease(conn);
}
return ret;

View File

@ -25,12 +25,12 @@
int dfs_chown(const char *path, uid_t uid, gid_t gid)
{
TRACE1("chown", path)
struct hdfsConn *conn = NULL;
int ret = 0;
char *user = NULL;
char *group = NULL;
hdfsFS userFS = NULL;
TRACE1("chown", path)
// retrieve dfs specific data
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
@ -61,14 +61,15 @@ int dfs_chown(const char *path, uid_t uid, gid_t gid)
}
}
userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
if (userFS == NULL) {
ERROR("Could not connect to HDFS");
ret = fuseConnect(user, fuse_get_context(), &conn);
if (ret) {
fprintf(stderr, "fuseConnect: failed to open a libhdfs connection! "
"error %d.\n", ret);
ret = -EIO;
goto cleanup;
}
if (hdfsChown(userFS, path, user, group)) {
if (hdfsChown(hdfsConnGetFs(conn), path, user, group)) {
ret = errno;
ERROR("Could not chown %s to %d:%d: error %d", path, (int)uid, gid, ret);
ret = (ret > 0) ? -ret : -EIO;
@ -76,16 +77,11 @@ int dfs_chown(const char *path, uid_t uid, gid_t gid)
}
cleanup:
if (userFS && doDisconnect(userFS)) {
ret = -EIO;
if (conn) {
hdfsConnRelease(conn);
}
if (user) {
free(user);
}
if (group) {
free(group);
}
return ret;
}

View File

@ -16,6 +16,7 @@
* limitations under the License.
*/
#include "fuse_connect.h"
#include "fuse_dfs.h"
#include "fuse_impls.h"
#include "fuse_file_handle.h"
@ -43,9 +44,7 @@ int dfs_flush(const char *path, struct fuse_file_info *fi) {
assert(fh);
hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
assert(file_handle);
assert(fh->fs);
if (hdfsFlush(fh->fs, file_handle) != 0) {
if (hdfsFlush(hdfsConnGetFs(fh->conn), file_handle) != 0) {
ERROR("Could not flush %lx for %s\n",(long)file_handle, path);
return -EIO;
}

View File

@ -23,22 +23,27 @@
int dfs_getattr(const char *path, struct stat *st)
{
struct hdfsConn *conn = NULL;
hdfsFS fs;
int ret;
hdfsFileInfo *info;
TRACE1("getattr", path)
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
assert(dfs);
assert(path);
assert(st);
hdfsFS fs = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
if (NULL == fs) {
ERROR("Could not connect to %s:%d", dfs->nn_uri, dfs->nn_port);
return -EIO;
ret = fuseConnectAsThreadUid(&conn);
if (ret) {
fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
"connection! error %d.\n", ret);
ret = -EIO;
goto cleanup;
}
fs = hdfsConnGetFs(conn);
int ret = 0;
hdfsFileInfo *info = hdfsGetPathInfo(fs,path);
info = hdfsGetPathInfo(fs,path);
if (NULL == info) {
ret = -ENOENT;
goto cleanup;
@ -63,9 +68,8 @@ int dfs_getattr(const char *path, struct stat *st)
hdfsFreeFileInfo(info,1);
cleanup:
if (doDisconnect(fs)) {
ERROR("Could not disconnect from filesystem");
ret = -EIO;
if (conn) {
hdfsConnRelease(conn);
}
return ret;
}

View File

@ -23,9 +23,12 @@
int dfs_mkdir(const char *path, mode_t mode)
{
TRACE1("mkdir", path)
struct hdfsConn *conn = NULL;
hdfsFS fs;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
int ret;
TRACE1("mkdir", path)
assert(path);
assert(dfs);
@ -41,29 +44,32 @@ int dfs_mkdir(const char *path, mode_t mode)
return -EACCES;
}
hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
if (userFS == NULL) {
ERROR("Could not connect");
return -EIO;
ret = fuseConnectAsThreadUid(&conn);
if (ret) {
fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
"connection! error %d.\n", ret);
ret = -EIO;
goto cleanup;
}
fs = hdfsConnGetFs(conn);
// In theory the create and chmod should be atomic.
int ret = 0;
if (hdfsCreateDirectory(userFS, path)) {
if (hdfsCreateDirectory(fs, path)) {
ERROR("HDFS could not create directory %s", path);
ret = (errno > 0) ? -errno : -EIO;
goto cleanup;
}
if (hdfsChmod(userFS, path, (short)mode)) {
if (hdfsChmod(fs, path, (short)mode)) {
ERROR("Could not chmod %s to %d", path, (int)mode);
ret = (errno > 0) ? -errno : -EIO;
}
ret = 0;
cleanup:
if (doDisconnect(userFS)) {
ret = -EIO;
if (conn) {
hdfsConnRelease(conn);
}
return ret;
}

View File

@ -21,38 +21,45 @@
#include "fuse_connect.h"
#include "fuse_file_handle.h"
#include <stdio.h>
#include <stdlib.h>
int dfs_open(const char *path, struct fuse_file_info *fi)
{
TRACE1("open", path)
hdfsFS fs = NULL;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
dfs_fh *fh = NULL;
int mutexInit = 0, ret;
TRACE1("open", path)
// check params and the context var
assert(path);
assert('/' == *path);
assert(dfs);
int ret = 0;
// 0x8000 is always passed in and hadoop doesn't like it, so killing it here
// bugbug figure out what this flag is and report problem to Hadoop JIRA
int flags = (fi->flags & 0x7FFF);
// retrieve dfs specific data
dfs_fh *fh = (dfs_fh*)calloc(1, sizeof (dfs_fh));
if (fh == NULL) {
fh = (dfs_fh*)calloc(1, sizeof (dfs_fh));
if (!fh) {
ERROR("Malloc of new file handle failed");
return -EIO;
ret = -EIO;
goto error;
}
fh->fs = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
if (fh->fs == NULL) {
ERROR("Could not connect to dfs");
return -EIO;
ret = fuseConnectAsThreadUid(&fh->conn);
if (ret) {
fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
"connection! error %d.\n", ret);
ret = -EIO;
goto error;
}
fs = hdfsConnGetFs(fh->conn);
if (flags & O_RDWR) {
hdfsFileInfo *info = hdfsGetPathInfo(fh->fs,path);
hdfsFileInfo *info = hdfsGetPathInfo(fs, path);
if (info == NULL) {
// File does not exist (maybe?); interpret it as a O_WRONLY
// If the actual error was something else, we'll get it again when
@ -66,15 +73,23 @@ int dfs_open(const char *path, struct fuse_file_info *fi)
}
}
if ((fh->hdfsFH = hdfsOpenFile(fh->fs, path, flags, 0, 0, 0)) == NULL) {
if ((fh->hdfsFH = hdfsOpenFile(fs, path, flags, 0, 0, 0)) == NULL) {
ERROR("Could not open file %s (errno=%d)", path, errno);
if (errno == 0 || errno == EINTERNAL) {
return -EIO;
ret = -EIO;
goto error;
}
return -errno;
ret = -errno;
goto error;
}
pthread_mutex_init(&fh->mutex, NULL);
ret = pthread_mutex_init(&fh->mutex, NULL);
if (ret) {
fprintf(stderr, "dfs_open: error initializing mutex: error %d\n", ret);
ret = -EIO;
goto error;
}
mutexInit = 1;
if (fi->flags & O_WRONLY || fi->flags & O_CREAT) {
fh->buf = NULL;
@ -84,11 +99,27 @@ int dfs_open(const char *path, struct fuse_file_info *fi)
if (NULL == fh->buf) {
ERROR("Could not allocate memory for a read for file %s\n", path);
ret = -EIO;
goto error;
}
fh->buffersStartOffset = 0;
fh->bufferSize = 0;
}
fi->fh = (uint64_t)fh;
return 0;
error:
if (fh) {
if (mutexInit) {
pthread_mutex_destroy(&fh->mutex);
}
free(fh->buf);
if (fh->hdfsFH) {
hdfsCloseFile(fs, fh->hdfsFH);
}
if (fh->conn) {
hdfsConnRelease(fh->conn);
}
free(fh);
}
return ret;
}

View File

@ -16,9 +16,10 @@
* limitations under the License.
*/
#include "fuse_connect.h"
#include "fuse_dfs.h"
#include "fuse_impls.h"
#include "fuse_file_handle.h"
#include "fuse_impls.h"
static size_t min(const size_t x, const size_t y) {
return x < y ? x : y;
@ -48,9 +49,9 @@ int dfs_read(const char *path, char *buf, size_t size, off_t offset,
assert(fi);
dfs_fh *fh = (dfs_fh*)fi->fh;
hdfsFS fs = hdfsConnGetFs(fh->conn);
assert(fh != NULL);
assert(fh->fs != NULL);
assert(fh->hdfsFH != NULL);
// special case this as simplifies the rest of the logic to know the caller wanted > 0 bytes
@ -61,7 +62,7 @@ int dfs_read(const char *path, char *buf, size_t size, off_t offset,
if ( size >= dfs->rdbuffer_size) {
int num_read;
size_t total_read = 0;
while (size - total_read > 0 && (num_read = hdfsPread(fh->fs, fh->hdfsFH, offset + total_read, buf + total_read, size - total_read)) > 0) {
while (size - total_read > 0 && (num_read = hdfsPread(fs, fh->hdfsFH, offset + total_read, buf + total_read, size - total_read)) > 0) {
total_read += num_read;
}
// if there was an error before satisfying the current read, this logic declares it an error
@ -98,7 +99,7 @@ int dfs_read(const char *path, char *buf, size_t size, off_t offset,
size_t total_read = 0;
while (dfs->rdbuffer_size - total_read > 0 &&
(num_read = hdfsPread(fh->fs, fh->hdfsFH, offset + total_read, fh->buf + total_read, dfs->rdbuffer_size - total_read)) > 0) {
(num_read = hdfsPread(fs, fh->hdfsFH, offset + total_read, fh->buf + total_read, dfs->rdbuffer_size - total_read)) > 0) {
total_read += num_read;
}

View File

@ -24,25 +24,31 @@
int dfs_readdir(const char *path, void *buf, fuse_fill_dir_t filler,
off_t offset, struct fuse_file_info *fi)
{
TRACE1("readdir", path)
int ret;
struct hdfsConn *conn = NULL;
hdfsFS fs;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
TRACE1("readdir", path)
assert(dfs);
assert(path);
assert(buf);
hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
if (userFS == NULL) {
ERROR("Could not connect");
return -EIO;
ret = fuseConnectAsThreadUid(&conn);
if (ret) {
fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
"connection! error %d.\n", ret);
ret = -EIO;
goto cleanup;
}
fs = hdfsConnGetFs(conn);
// Read dirents. Calling a variant that just returns the final path
// component (HDFS-975) would save us from parsing it out below.
int numEntries = 0;
hdfsFileInfo *info = hdfsListDirectory(userFS, path, &numEntries);
hdfsFileInfo *info = hdfsListDirectory(fs, path, &numEntries);
int ret = 0;
// NULL means either the directory doesn't exist or maybe IO error.
if (NULL == info) {
ret = (errno > 0) ? -errno : -ENOENT;
@ -106,11 +112,11 @@ int dfs_readdir(const char *path, void *buf, fuse_fill_dir_t filler,
}
// free the info pointers
hdfsFreeFileInfo(info,numEntries);
ret = 0;
cleanup:
if (doDisconnect(userFS)) {
ret = -EIO;
ERROR("Failed to disconnect %d", errno);
if (conn) {
hdfsConnRelease(conn);
}
return ret;
}

View File

@ -52,15 +52,13 @@ int dfs_release (const char *path, struct fuse_file_info *fi) {
assert(fh);
hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
if (NULL != file_handle) {
if (hdfsCloseFile(fh->fs, file_handle) != 0) {
if (hdfsCloseFile(hdfsConnGetFs(fh->conn), file_handle) != 0) {
ERROR("Could not close handle %ld for %s\n",(long)file_handle, path);
ret = -EIO;
}
}
free(fh->buf);
if (doDisconnect(fh->fs)) {
ret = -EIO;
}
hdfsConnRelease(fh->conn);
pthread_mutex_destroy(&fh->mutex);
free(fh);
fi->fh = 0;

View File

@ -23,10 +23,12 @@
int dfs_rename(const char *from, const char *to)
{
TRACE1("rename", from)
// retrieve dfs specific data
struct hdfsConn *conn = NULL;
hdfsFS fs;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
int ret;
TRACE1("rename", from)
// check params and the context var
assert(from);
@ -46,23 +48,24 @@ int dfs_rename(const char *from, const char *to)
return -EACCES;
}
hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
if (userFS == NULL) {
ERROR("Could not connect");
return -EIO;
ret = fuseConnectAsThreadUid(&conn);
if (ret) {
fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
"connection! error %d.\n", ret);
ret = -EIO;
goto cleanup;
}
int ret = 0;
if (hdfsRename(userFS, from, to)) {
fs = hdfsConnGetFs(conn);
if (hdfsRename(fs, from, to)) {
ERROR("Rename %s to %s failed", from, to);
ret = (errno > 0) ? -errno : -EIO;
goto cleanup;
}
ret = 0;
cleanup:
if (doDisconnect(userFS)) {
ret = -EIO;
if (conn) {
hdfsConnRelease(conn);
}
return ret;
}

View File

@ -25,9 +25,14 @@ extern const char *const TrashPrefixDir;
int dfs_rmdir(const char *path)
{
TRACE1("rmdir", path)
struct hdfsConn *conn = NULL;
hdfsFS fs;
int ret;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
int numEntries = 0;
hdfsFileInfo *info = NULL;
TRACE1("rmdir", path)
assert(path);
assert(dfs);
@ -35,42 +40,43 @@ int dfs_rmdir(const char *path)
if (is_protected(path)) {
ERROR("Trying to delete protected directory %s", path);
return -EACCES;
ret = -EACCES;
goto cleanup;
}
if (dfs->read_only) {
ERROR("HDFS configured read-only, cannot delete directory %s", path);
return -EACCES;
ret = -EACCES;
goto cleanup;
}
hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
if (userFS == NULL) {
ERROR("Could not connect");
return -EIO;
ret = fuseConnectAsThreadUid(&conn);
if (ret) {
fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
"connection! error %d.\n", ret);
ret = -EIO;
goto cleanup;
}
int ret = 0;
int numEntries = 0;
hdfsFileInfo *info = hdfsListDirectory(userFS,path,&numEntries);
if (info) {
hdfsFreeFileInfo(info, numEntries);
}
fs = hdfsConnGetFs(conn);
info = hdfsListDirectory(fs, path, &numEntries);
if (numEntries) {
ret = -ENOTEMPTY;
goto cleanup;
}
if (hdfsDeleteWithTrash(userFS, path, dfs->usetrash)) {
if (hdfsDeleteWithTrash(fs, path, dfs->usetrash)) {
ERROR("Error trying to delete directory %s", path);
ret = -EIO;
goto cleanup;
}
ret = 0;
cleanup:
if (doDisconnect(userFS)) {
ret = -EIO;
if (info) {
hdfsFreeFileInfo(info, numEntries);
}
if (conn) {
hdfsConnRelease(conn);
}
return ret;
}

View File

@ -23,9 +23,12 @@
int dfs_statfs(const char *path, struct statvfs *st)
{
TRACE1("statfs",path)
struct hdfsConn *conn = NULL;
hdfsFS fs;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
int ret;
TRACE1("statfs",path)
assert(path);
assert(st);
@ -33,19 +36,18 @@ int dfs_statfs(const char *path, struct statvfs *st)
memset(st,0,sizeof(struct statvfs));
hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
if (userFS == NULL) {
ERROR("Could not connect");
return -EIO;
ret = fuseConnectAsThreadUid(&conn);
if (ret) {
fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
"connection! error %d.\n", ret);
ret = -EIO;
goto cleanup;
}
fs = hdfsConnGetFs(conn);
const tOffset cap = hdfsGetCapacity(userFS);
const tOffset used = hdfsGetUsed(userFS);
const tOffset bsize = hdfsGetDefaultBlockSize(userFS);
if (doDisconnect(userFS)) {
return -EIO;
}
const tOffset cap = hdfsGetCapacity(fs);
const tOffset used = hdfsGetUsed(fs);
const tOffset bsize = hdfsGetDefaultBlockSize(fs);
st->f_bsize = bsize;
st->f_frsize = bsize;
@ -58,6 +60,11 @@ int dfs_statfs(const char *path, struct statvfs *st)
st->f_fsid = 1023;
st->f_flag = ST_RDONLY | ST_NOSUID;
st->f_namemax = 1023;
ret = 0;
return 0;
cleanup:
if (conn) {
hdfsConnRelease(conn);
}
return ret;
}

View File

@ -28,10 +28,12 @@
*/
int dfs_truncate(const char *path, off_t size)
{
TRACE1("truncate", path)
struct hdfsConn *conn = NULL;
hdfsFS fs;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
TRACE1("truncate", path)
assert(path);
assert('/' == *path);
assert(dfs);
@ -45,31 +47,33 @@ int dfs_truncate(const char *path, off_t size)
return ret;
}
hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
if (userFS == NULL) {
ERROR("Could not connect");
ret = fuseConnectAsThreadUid(&conn);
if (ret) {
fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
"connection! error %d.\n", ret);
ret = -EIO;
goto cleanup;
}
fs = hdfsConnGetFs(conn);
int flags = O_WRONLY | O_CREAT;
hdfsFile file;
if ((file = (hdfsFile)hdfsOpenFile(userFS, path, flags, 0, 0, 0)) == NULL) {
if ((file = (hdfsFile)hdfsOpenFile(fs, path, flags, 0, 0, 0)) == NULL) {
ERROR("Could not connect open file %s", path);
ret = -EIO;
goto cleanup;
}
if (hdfsCloseFile(userFS, file) != 0) {
if (hdfsCloseFile(fs, file) != 0) {
ERROR("Could not close file %s", path);
ret = -EIO;
goto cleanup;
}
cleanup:
if (doDisconnect(userFS)) {
ret = -EIO;
if (conn) {
hdfsConnRelease(conn);
}
return ret;
}

View File

@ -20,44 +20,51 @@
#include "fuse_impls.h"
#include "fuse_connect.h"
#include "fuse_trash.h"
extern const char *const TrashPrefixDir;
int dfs_unlink(const char *path)
{
TRACE1("unlink", path)
struct hdfsConn *conn = NULL;
hdfsFS fs;
int ret = 0;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
TRACE1("unlink", path)
assert(path);
assert(dfs);
assert('/' == *path);
if (is_protected(path)) {
ERROR("Trying to delete protected directory %s", path);
return -EACCES;
ret = -EACCES;
goto cleanup;
}
if (dfs->read_only) {
ERROR("HDFS configured read-only, cannot create directory %s", path);
return -EACCES;
ret = -EACCES;
goto cleanup;
}
hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
if (userFS == NULL) {
ERROR("Could not connect");
return -EIO;
ret = fuseConnectAsThreadUid(&conn);
if (ret) {
fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
"connection! error %d.\n", ret);
ret = -EIO;
goto cleanup;
}
fs = hdfsConnGetFs(conn);
if (hdfsDeleteWithTrash(userFS, path, dfs->usetrash)) {
if (hdfsDeleteWithTrash(fs, path, dfs->usetrash)) {
ERROR("Could not delete file %s", path);
ret = (errno > 0) ? -errno : -EIO;
goto cleanup;
}
ret = 0;
cleanup:
if (doDisconnect(userFS)) {
ret = -EIO;
if (conn) {
hdfsConnRelease(conn);
}
return ret;

View File

@ -22,10 +22,13 @@
int dfs_utimens(const char *path, const struct timespec ts[2])
{
TRACE1("utimens", path)
struct hdfsConn *conn = NULL;
hdfsFS fs;
int ret = 0;
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
TRACE1("utimens", path)
assert(path);
assert(dfs);
assert('/' == *path);
@ -33,14 +36,17 @@ int dfs_utimens(const char *path, const struct timespec ts[2])
time_t aTime = ts[0].tv_sec;
time_t mTime = ts[1].tv_sec;
hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
if (userFS == NULL) {
ERROR("Could not connect");
return -EIO;
ret = fuseConnectAsThreadUid(&conn);
if (ret) {
fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
"connection! error %d.\n", ret);
ret = -EIO;
goto cleanup;
}
fs = hdfsConnGetFs(conn);
if (hdfsUtime(userFS, path, mTime, aTime)) {
hdfsFileInfo *info = hdfsGetPathInfo(userFS, path);
if (hdfsUtime(fs, path, mTime, aTime)) {
hdfsFileInfo *info = hdfsGetPathInfo(fs, path);
if (info == NULL) {
ret = (errno > 0) ? -errno : -ENOENT;
goto cleanup;
@ -54,10 +60,11 @@ int dfs_utimens(const char *path, const struct timespec ts[2])
}
goto cleanup;
}
ret = 0;
cleanup:
if (doDisconnect(userFS)) {
ret = -EIO;
if (conn) {
hdfsConnRelease(conn);
}
return ret;
}

View File

@ -16,6 +16,7 @@
* limitations under the License.
*/
#include "fuse_connect.h"
#include "fuse_dfs.h"
#include "fuse_impls.h"
#include "fuse_file_handle.h"
@ -48,15 +49,15 @@ int dfs_write(const char *path, const char *buf, size_t size,
pthread_mutex_lock(&fh->mutex);
tSize length = 0;
assert(fh->fs);
hdfsFS fs = hdfsConnGetFs(fh->conn);
tOffset cur_offset = hdfsTell(fh->fs, file_handle);
tOffset cur_offset = hdfsTell(fs, file_handle);
if (cur_offset != offset) {
ERROR("User trying to random access write to a file %d != %d for %s",
(int)cur_offset, (int)offset, path);
ret = -ENOTSUP;
} else {
length = hdfsWrite(fh->fs, file_handle, buf, size);
length = hdfsWrite(fs, file_handle, buf, size);
if (length <= 0) {
ERROR("Could not write all bytes for %s %d != %d (errno=%d)",
path, length, (int)size, errno);

View File

@ -78,8 +78,20 @@ void init_protectedpaths(dfs_context *dfs) {
dfs->protectedpaths[j] = NULL;
}
static void dfsPrintOptions(FILE *fp, const struct options *o)
{
fprintf(fp, "[ protected=%s, nn_uri=%s, nn_port=%d, "
"debug=%d, read_only=%d, initchecks=%d, "
"no_permissions=%d, usetrash=%d, entry_timeout=%d, "
"attribute_timeout=%d, rdbuffer_size=%Zd, direct_io=%d ]",
(o->protected ? o->protected : "(NULL)"), o->nn_uri, o->nn_port,
o->debug, o->read_only, o->initchecks,
o->no_permissions, o->usetrash, o->entry_timeout,
o->attribute_timeout, o->rdbuffer_size, o->direct_io);
}
void *dfs_init(void) {
void *dfs_init(void)
{
//
// Create a private struct of data we will pass to fuse here and which
// will then be accessible on every call.
@ -92,15 +104,15 @@ void *dfs_init(void) {
// initialize the context
dfs->debug = options.debug;
dfs->nn_uri = options.nn_uri;
dfs->nn_port = options.nn_port;
dfs->read_only = options.read_only;
dfs->usetrash = options.usetrash;
dfs->protectedpaths = NULL;
dfs->rdbuffer_size = options.rdbuffer_size;
dfs->direct_io = options.direct_io;
INFO("Mounting. nn_uri=%s, nn_port=%d", dfs->nn_uri, dfs->nn_port);
fprintf(stderr, "Mounting with options ");
dfsPrintOptions(stderr, &options);
fprintf(stderr, "\n");
init_protectedpaths(dfs);
assert(dfs->protectedpaths != NULL);
@ -109,12 +121,6 @@ void *dfs_init(void) {
DEBUG("dfs->rdbuffersize <= 0 = %ld", dfs->rdbuffer_size);
dfs->rdbuffer_size = 32768;
}
if (0 != allocFsTable()) {
ERROR("FATAL: could not allocate ");
exit(1);
}
return (void*)dfs;
}

View File

@ -225,7 +225,7 @@ done:
*
* @return 0 on success; error code otherwise
*/
static int hadoopConfSet(JNIEnv *env, jobject jConfiguration,
static int hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
const char *key, const char *value)
{
int ret;
@ -283,7 +283,7 @@ static int jStrToCstr(JNIEnv *env, jstring jstr, char **cstr)
return 0;
}
static int hadoopConfGet(JNIEnv *env, jobject jConfiguration,
static int hadoopConfGetStr(JNIEnv *env, jobject jConfiguration,
const char *key, char **val)
{
int ret;
@ -301,7 +301,7 @@ static int hadoopConfGet(JNIEnv *env, jobject jConfiguration,
HADOOP_CONF, "get", JMETHOD1(JPARAM(JAVA_STRING),
JPARAM(JAVA_STRING)), jkey);
if (ret) {
snprintf(buf, sizeof(buf), "hadoopConfGet(%s)", key);
snprintf(buf, sizeof(buf), "hadoopConfGetStr(%s)", key);
ret = errnoFromException(jExc, env, buf);
goto done;
}
@ -321,7 +321,7 @@ done:
return ret;
}
int hdfsConfGet(const char *key, char **val)
int hdfsConfGetStr(const char *key, char **val)
{
JNIEnv *env;
int ret;
@ -339,19 +339,67 @@ int hdfsConfGet(const char *key, char **val)
ret = EINTERNAL;
goto done;
}
ret = hadoopConfGet(env, jConfiguration, key, val);
if (ret)
goto done;
ret = 0;
ret = hadoopConfGetStr(env, jConfiguration, key, val);
done:
if (jConfiguration)
destroyLocalReference(env, jConfiguration);
if (ret)
errno = ret;
return ret;
}
void hdfsConfFree(char *val)
static int hadoopConfGetInt(JNIEnv *env, jobject jConfiguration,
const char *key, int32_t *val)
{
int ret;
jthrowable jExc = NULL;
jvalue jVal;
jstring jkey = NULL;
char buf[1024];
jkey = (*env)->NewStringUTF(env, key);
if (!jkey) {
(*env)->ExceptionDescribe(env);
return ENOMEM;
}
ret = invokeMethod(env, &jVal, &jExc, INSTANCE, jConfiguration,
HADOOP_CONF, "getInt", JMETHOD2(JPARAM(JAVA_STRING), "I", "I"),
jkey, (jint)(*val));
destroyLocalReference(env, jkey);
if (ret) {
snprintf(buf, sizeof(buf), "hadoopConfGetInt(%s)", key);
return errnoFromException(jExc, env, buf);
}
*val = jVal.i;
return 0;
}
int hdfsConfGetInt(const char *key, int32_t *val)
{
JNIEnv *env;
int ret;
jobject jConfiguration = NULL;
env = getJNIEnv();
if (env == NULL) {
ret = EINTERNAL;
goto done;
}
jConfiguration = constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V");
if (jConfiguration == NULL) {
fprintf(stderr, "Can't construct instance of class "
"org.apache.hadoop.conf.Configuration\n");
ret = EINTERNAL;
goto done;
}
ret = hadoopConfGetInt(env, jConfiguration, key, val);
done:
destroyLocalReference(env, jConfiguration);
if (ret)
errno = ret;
return ret;
}
void hdfsConfStrFree(char *val)
{
free(val);
}
@ -583,7 +631,7 @@ hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld)
}
if (bld->kerbTicketCachePath) {
ret = hadoopConfSet(env, jConfiguration,
ret = hadoopConfSetStr(env, jConfiguration,
KERBEROS_TICKET_CACHE_PATH, bld->kerbTicketCachePath);
if (ret)
goto done;

View File

@ -220,21 +220,33 @@ extern "C" {
* Get a configuration string.
*
* @param key The key to find
* @param val (out param) The value. This will be NULL if the
* @param val (out param) The value. This will be set to NULL if the
* key isn't found. You must free this string with
* hdfsConfFree.
* hdfsConfStrFree.
*
* @return 0 on success; nonzero error code otherwise.
* Failure to find the key is not an error.
*/
int hdfsConfGet(const char *key, char **val);
int hdfsConfGetStr(const char *key, char **val);
/**
* Free a configuration string found with hdfsConfGet.
* Get a configuration integer.
*
* @param val A configuration string obtained from hdfsConfGet
* @param key The key to find
* @param val (out param) The value. This will NOT be changed if the
* key isn't found.
*
* @return 0 on success; nonzero error code otherwise.
* Failure to find the key is not an error.
*/
void hdfsConfFree(char *val);
int hdfsConfGetInt(const char *key, int32_t *val);
/**
* Free a configuration string found with hdfsConfGetStr.
*
* @param val A configuration string obtained from hdfsConfGetStr
*/
void hdfsConfStrFree(char *val);
/**
* hdfsDisconnect - Disconnect from the hdfs file system.

View File

@ -412,6 +412,7 @@ char *classNameOfObject(jobject jobj, JNIEnv *env) {
return newstr;
}
/**
* Get the global JNI environemnt.
*
@ -500,6 +501,11 @@ static JNIEnv* getGlobalJNIEnv(void)
"with error: %d\n", rv);
return NULL;
}
if (invokeMethod(env, NULL, NULL, STATIC, NULL,
"org/apache/hadoop/fs/FileSystem",
"loadFileSystems", "()V") != 0) {
(*env)->ExceptionDescribe(env);
}
}
else {
//Attach this thread to the VM

View File

@ -0,0 +1,765 @@
/* $NetBSD: tree.h,v 1.8 2004/03/28 19:38:30 provos Exp $ */
/* $OpenBSD: tree.h,v 1.7 2002/10/17 21:51:54 art Exp $ */
/* $FreeBSD: src/sys/sys/tree.h,v 1.9.4.1 2011/09/23 00:51:37 kensmith Exp $ */
/*-
* Copyright 2002 Niels Provos <provos@citi.umich.edu>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _SYS_TREE_H_
#define _SYS_TREE_H_
#include <sys/cdefs.h>
/*
* This file defines data structures for different types of trees:
* splay trees and red-black trees.
*
* A splay tree is a self-organizing data structure. Every operation
* on the tree causes a splay to happen. The splay moves the requested
* node to the root of the tree and partly rebalances it.
*
* This has the benefit that request locality causes faster lookups as
* the requested nodes move to the top of the tree. On the other hand,
* every lookup causes memory writes.
*
* The Balance Theorem bounds the total access time for m operations
* and n inserts on an initially empty tree as O((m + n)lg n). The
* amortized cost for a sequence of m accesses to a splay tree is O(lg n);
*
* A red-black tree is a binary search tree with the node color as an
* extra attribute. It fulfills a set of conditions:
* - every search path from the root to a leaf consists of the
* same number of black nodes,
* - each red node (except for the root) has a black parent,
* - each leaf node is black.
*
* Every operation on a red-black tree is bounded as O(lg n).
* The maximum height of a red-black tree is 2lg (n+1).
*/
#define SPLAY_HEAD(name, type) \
struct name { \
struct type *sph_root; /* root of the tree */ \
}
#define SPLAY_INITIALIZER(root) \
{ NULL }
#define SPLAY_INIT(root) do { \
(root)->sph_root = NULL; \
} while (/*CONSTCOND*/ 0)
#define SPLAY_ENTRY(type) \
struct { \
struct type *spe_left; /* left element */ \
struct type *spe_right; /* right element */ \
}
#define SPLAY_LEFT(elm, field) (elm)->field.spe_left
#define SPLAY_RIGHT(elm, field) (elm)->field.spe_right
#define SPLAY_ROOT(head) (head)->sph_root
#define SPLAY_EMPTY(head) (SPLAY_ROOT(head) == NULL)
/* SPLAY_ROTATE_{LEFT,RIGHT} expect that tmp hold SPLAY_{RIGHT,LEFT} */
#define SPLAY_ROTATE_RIGHT(head, tmp, field) do { \
SPLAY_LEFT((head)->sph_root, field) = SPLAY_RIGHT(tmp, field); \
SPLAY_RIGHT(tmp, field) = (head)->sph_root; \
(head)->sph_root = tmp; \
} while (/*CONSTCOND*/ 0)
#define SPLAY_ROTATE_LEFT(head, tmp, field) do { \
SPLAY_RIGHT((head)->sph_root, field) = SPLAY_LEFT(tmp, field); \
SPLAY_LEFT(tmp, field) = (head)->sph_root; \
(head)->sph_root = tmp; \
} while (/*CONSTCOND*/ 0)
#define SPLAY_LINKLEFT(head, tmp, field) do { \
SPLAY_LEFT(tmp, field) = (head)->sph_root; \
tmp = (head)->sph_root; \
(head)->sph_root = SPLAY_LEFT((head)->sph_root, field); \
} while (/*CONSTCOND*/ 0)
#define SPLAY_LINKRIGHT(head, tmp, field) do { \
SPLAY_RIGHT(tmp, field) = (head)->sph_root; \
tmp = (head)->sph_root; \
(head)->sph_root = SPLAY_RIGHT((head)->sph_root, field); \
} while (/*CONSTCOND*/ 0)
#define SPLAY_ASSEMBLE(head, node, left, right, field) do { \
SPLAY_RIGHT(left, field) = SPLAY_LEFT((head)->sph_root, field); \
SPLAY_LEFT(right, field) = SPLAY_RIGHT((head)->sph_root, field);\
SPLAY_LEFT((head)->sph_root, field) = SPLAY_RIGHT(node, field); \
SPLAY_RIGHT((head)->sph_root, field) = SPLAY_LEFT(node, field); \
} while (/*CONSTCOND*/ 0)
/* Generates prototypes and inline functions */
#define SPLAY_PROTOTYPE(name, type, field, cmp) \
void name##_SPLAY(struct name *, struct type *); \
void name##_SPLAY_MINMAX(struct name *, int); \
struct type *name##_SPLAY_INSERT(struct name *, struct type *); \
struct type *name##_SPLAY_REMOVE(struct name *, struct type *); \
\
/* Finds the node with the same key as elm */ \
static __inline struct type * \
name##_SPLAY_FIND(struct name *head, struct type *elm) \
{ \
if (SPLAY_EMPTY(head)) \
return(NULL); \
name##_SPLAY(head, elm); \
if ((cmp)(elm, (head)->sph_root) == 0) \
return (head->sph_root); \
return (NULL); \
} \
\
static __inline struct type * \
name##_SPLAY_NEXT(struct name *head, struct type *elm) \
{ \
name##_SPLAY(head, elm); \
if (SPLAY_RIGHT(elm, field) != NULL) { \
elm = SPLAY_RIGHT(elm, field); \
while (SPLAY_LEFT(elm, field) != NULL) { \
elm = SPLAY_LEFT(elm, field); \
} \
} else \
elm = NULL; \
return (elm); \
} \
\
static __inline struct type * \
name##_SPLAY_MIN_MAX(struct name *head, int val) \
{ \
name##_SPLAY_MINMAX(head, val); \
return (SPLAY_ROOT(head)); \
}
/* Main splay operation.
* Moves node close to the key of elm to top
*/
#define SPLAY_GENERATE(name, type, field, cmp) \
struct type * \
name##_SPLAY_INSERT(struct name *head, struct type *elm) \
{ \
if (SPLAY_EMPTY(head)) { \
SPLAY_LEFT(elm, field) = SPLAY_RIGHT(elm, field) = NULL; \
} else { \
int __comp; \
name##_SPLAY(head, elm); \
__comp = (cmp)(elm, (head)->sph_root); \
if(__comp < 0) { \
SPLAY_LEFT(elm, field) = SPLAY_LEFT((head)->sph_root, field);\
SPLAY_RIGHT(elm, field) = (head)->sph_root; \
SPLAY_LEFT((head)->sph_root, field) = NULL; \
} else if (__comp > 0) { \
SPLAY_RIGHT(elm, field) = SPLAY_RIGHT((head)->sph_root, field);\
SPLAY_LEFT(elm, field) = (head)->sph_root; \
SPLAY_RIGHT((head)->sph_root, field) = NULL; \
} else \
return ((head)->sph_root); \
} \
(head)->sph_root = (elm); \
return (NULL); \
} \
\
struct type * \
name##_SPLAY_REMOVE(struct name *head, struct type *elm) \
{ \
struct type *__tmp; \
if (SPLAY_EMPTY(head)) \
return (NULL); \
name##_SPLAY(head, elm); \
if ((cmp)(elm, (head)->sph_root) == 0) { \
if (SPLAY_LEFT((head)->sph_root, field) == NULL) { \
(head)->sph_root = SPLAY_RIGHT((head)->sph_root, field);\
} else { \
__tmp = SPLAY_RIGHT((head)->sph_root, field); \
(head)->sph_root = SPLAY_LEFT((head)->sph_root, field);\
name##_SPLAY(head, elm); \
SPLAY_RIGHT((head)->sph_root, field) = __tmp; \
} \
return (elm); \
} \
return (NULL); \
} \
\
void \
name##_SPLAY(struct name *head, struct type *elm) \
{ \
struct type __node, *__left, *__right, *__tmp; \
int __comp; \
\
SPLAY_LEFT(&__node, field) = SPLAY_RIGHT(&__node, field) = NULL;\
__left = __right = &__node; \
\
while ((__comp = (cmp)(elm, (head)->sph_root)) != 0) { \
if (__comp < 0) { \
__tmp = SPLAY_LEFT((head)->sph_root, field); \
if (__tmp == NULL) \
break; \
if ((cmp)(elm, __tmp) < 0){ \
SPLAY_ROTATE_RIGHT(head, __tmp, field); \
if (SPLAY_LEFT((head)->sph_root, field) == NULL)\
break; \
} \
SPLAY_LINKLEFT(head, __right, field); \
} else if (__comp > 0) { \
__tmp = SPLAY_RIGHT((head)->sph_root, field); \
if (__tmp == NULL) \
break; \
if ((cmp)(elm, __tmp) > 0){ \
SPLAY_ROTATE_LEFT(head, __tmp, field); \
if (SPLAY_RIGHT((head)->sph_root, field) == NULL)\
break; \
} \
SPLAY_LINKRIGHT(head, __left, field); \
} \
} \
SPLAY_ASSEMBLE(head, &__node, __left, __right, field); \
} \
\
/* Splay with either the minimum or the maximum element \
* Used to find minimum or maximum element in tree. \
*/ \
void name##_SPLAY_MINMAX(struct name *head, int __comp) \
{ \
struct type __node, *__left, *__right, *__tmp; \
\
SPLAY_LEFT(&__node, field) = SPLAY_RIGHT(&__node, field) = NULL;\
__left = __right = &__node; \
\
while (1) { \
if (__comp < 0) { \
__tmp = SPLAY_LEFT((head)->sph_root, field); \
if (__tmp == NULL) \
break; \
if (__comp < 0){ \
SPLAY_ROTATE_RIGHT(head, __tmp, field); \
if (SPLAY_LEFT((head)->sph_root, field) == NULL)\
break; \
} \
SPLAY_LINKLEFT(head, __right, field); \
} else if (__comp > 0) { \
__tmp = SPLAY_RIGHT((head)->sph_root, field); \
if (__tmp == NULL) \
break; \
if (__comp > 0) { \
SPLAY_ROTATE_LEFT(head, __tmp, field); \
if (SPLAY_RIGHT((head)->sph_root, field) == NULL)\
break; \
} \
SPLAY_LINKRIGHT(head, __left, field); \
} \
} \
SPLAY_ASSEMBLE(head, &__node, __left, __right, field); \
}
#define SPLAY_NEGINF -1
#define SPLAY_INF 1
#define SPLAY_INSERT(name, x, y) name##_SPLAY_INSERT(x, y)
#define SPLAY_REMOVE(name, x, y) name##_SPLAY_REMOVE(x, y)
#define SPLAY_FIND(name, x, y) name##_SPLAY_FIND(x, y)
#define SPLAY_NEXT(name, x, y) name##_SPLAY_NEXT(x, y)
#define SPLAY_MIN(name, x) (SPLAY_EMPTY(x) ? NULL \
: name##_SPLAY_MIN_MAX(x, SPLAY_NEGINF))
#define SPLAY_MAX(name, x) (SPLAY_EMPTY(x) ? NULL \
: name##_SPLAY_MIN_MAX(x, SPLAY_INF))
#define SPLAY_FOREACH(x, name, head) \
for ((x) = SPLAY_MIN(name, head); \
(x) != NULL; \
(x) = SPLAY_NEXT(name, head, x))
/* Macros that define a red-black tree */
#define RB_HEAD(name, type) \
struct name { \
struct type *rbh_root; /* root of the tree */ \
}
#define RB_INITIALIZER(root) \
{ NULL }
#define RB_INIT(root) do { \
(root)->rbh_root = NULL; \
} while (/*CONSTCOND*/ 0)
#define RB_BLACK 0
#define RB_RED 1
#define RB_ENTRY(type) \
struct { \
struct type *rbe_left; /* left element */ \
struct type *rbe_right; /* right element */ \
struct type *rbe_parent; /* parent element */ \
int rbe_color; /* node color */ \
}
#define RB_LEFT(elm, field) (elm)->field.rbe_left
#define RB_RIGHT(elm, field) (elm)->field.rbe_right
#define RB_PARENT(elm, field) (elm)->field.rbe_parent
#define RB_COLOR(elm, field) (elm)->field.rbe_color
#define RB_ROOT(head) (head)->rbh_root
#define RB_EMPTY(head) (RB_ROOT(head) == NULL)
#define RB_SET(elm, parent, field) do { \
RB_PARENT(elm, field) = parent; \
RB_LEFT(elm, field) = RB_RIGHT(elm, field) = NULL; \
RB_COLOR(elm, field) = RB_RED; \
} while (/*CONSTCOND*/ 0)
#define RB_SET_BLACKRED(black, red, field) do { \
RB_COLOR(black, field) = RB_BLACK; \
RB_COLOR(red, field) = RB_RED; \
} while (/*CONSTCOND*/ 0)
#ifndef RB_AUGMENT
#define RB_AUGMENT(x) do {} while (0)
#endif
#define RB_ROTATE_LEFT(head, elm, tmp, field) do { \
(tmp) = RB_RIGHT(elm, field); \
if ((RB_RIGHT(elm, field) = RB_LEFT(tmp, field)) != NULL) { \
RB_PARENT(RB_LEFT(tmp, field), field) = (elm); \
} \
RB_AUGMENT(elm); \
if ((RB_PARENT(tmp, field) = RB_PARENT(elm, field)) != NULL) { \
if ((elm) == RB_LEFT(RB_PARENT(elm, field), field)) \
RB_LEFT(RB_PARENT(elm, field), field) = (tmp); \
else \
RB_RIGHT(RB_PARENT(elm, field), field) = (tmp); \
} else \
(head)->rbh_root = (tmp); \
RB_LEFT(tmp, field) = (elm); \
RB_PARENT(elm, field) = (tmp); \
RB_AUGMENT(tmp); \
if ((RB_PARENT(tmp, field))) \
RB_AUGMENT(RB_PARENT(tmp, field)); \
} while (/*CONSTCOND*/ 0)
#define RB_ROTATE_RIGHT(head, elm, tmp, field) do { \
(tmp) = RB_LEFT(elm, field); \
if ((RB_LEFT(elm, field) = RB_RIGHT(tmp, field)) != NULL) { \
RB_PARENT(RB_RIGHT(tmp, field), field) = (elm); \
} \
RB_AUGMENT(elm); \
if ((RB_PARENT(tmp, field) = RB_PARENT(elm, field)) != NULL) { \
if ((elm) == RB_LEFT(RB_PARENT(elm, field), field)) \
RB_LEFT(RB_PARENT(elm, field), field) = (tmp); \
else \
RB_RIGHT(RB_PARENT(elm, field), field) = (tmp); \
} else \
(head)->rbh_root = (tmp); \
RB_RIGHT(tmp, field) = (elm); \
RB_PARENT(elm, field) = (tmp); \
RB_AUGMENT(tmp); \
if ((RB_PARENT(tmp, field))) \
RB_AUGMENT(RB_PARENT(tmp, field)); \
} while (/*CONSTCOND*/ 0)
/* Generates prototypes and inline functions */
#define RB_PROTOTYPE(name, type, field, cmp) \
RB_PROTOTYPE_INTERNAL(name, type, field, cmp,)
#define RB_PROTOTYPE_STATIC(name, type, field, cmp) \
RB_PROTOTYPE_INTERNAL(name, type, field, cmp, __unused static)
#define RB_PROTOTYPE_INTERNAL(name, type, field, cmp, attr) \
attr void name##_RB_INSERT_COLOR(struct name *, struct type *); \
attr void name##_RB_REMOVE_COLOR(struct name *, struct type *, struct type *);\
attr struct type *name##_RB_REMOVE(struct name *, struct type *); \
attr struct type *name##_RB_INSERT(struct name *, struct type *); \
attr struct type *name##_RB_FIND(struct name *, struct type *); \
attr struct type *name##_RB_NFIND(struct name *, struct type *); \
attr struct type *name##_RB_NEXT(struct type *); \
attr struct type *name##_RB_PREV(struct type *); \
attr struct type *name##_RB_MINMAX(struct name *, int); \
\
/* Main rb operation.
* Moves node close to the key of elm to top
*/
#define RB_GENERATE(name, type, field, cmp) \
RB_GENERATE_INTERNAL(name, type, field, cmp,)
#define RB_GENERATE_STATIC(name, type, field, cmp) \
RB_GENERATE_INTERNAL(name, type, field, cmp, __unused static)
#define RB_GENERATE_INTERNAL(name, type, field, cmp, attr) \
attr void \
name##_RB_INSERT_COLOR(struct name *head, struct type *elm) \
{ \
struct type *parent, *gparent, *tmp; \
while ((parent = RB_PARENT(elm, field)) != NULL && \
RB_COLOR(parent, field) == RB_RED) { \
gparent = RB_PARENT(parent, field); \
if (parent == RB_LEFT(gparent, field)) { \
tmp = RB_RIGHT(gparent, field); \
if (tmp && RB_COLOR(tmp, field) == RB_RED) { \
RB_COLOR(tmp, field) = RB_BLACK; \
RB_SET_BLACKRED(parent, gparent, field);\
elm = gparent; \
continue; \
} \
if (RB_RIGHT(parent, field) == elm) { \
RB_ROTATE_LEFT(head, parent, tmp, field);\
tmp = parent; \
parent = elm; \
elm = tmp; \
} \
RB_SET_BLACKRED(parent, gparent, field); \
RB_ROTATE_RIGHT(head, gparent, tmp, field); \
} else { \
tmp = RB_LEFT(gparent, field); \
if (tmp && RB_COLOR(tmp, field) == RB_RED) { \
RB_COLOR(tmp, field) = RB_BLACK; \
RB_SET_BLACKRED(parent, gparent, field);\
elm = gparent; \
continue; \
} \
if (RB_LEFT(parent, field) == elm) { \
RB_ROTATE_RIGHT(head, parent, tmp, field);\
tmp = parent; \
parent = elm; \
elm = tmp; \
} \
RB_SET_BLACKRED(parent, gparent, field); \
RB_ROTATE_LEFT(head, gparent, tmp, field); \
} \
} \
RB_COLOR(head->rbh_root, field) = RB_BLACK; \
} \
\
attr void \
name##_RB_REMOVE_COLOR(struct name *head, struct type *parent, struct type *elm) \
{ \
struct type *tmp; \
while ((elm == NULL || RB_COLOR(elm, field) == RB_BLACK) && \
elm != RB_ROOT(head)) { \
if (RB_LEFT(parent, field) == elm) { \
tmp = RB_RIGHT(parent, field); \
if (RB_COLOR(tmp, field) == RB_RED) { \
RB_SET_BLACKRED(tmp, parent, field); \
RB_ROTATE_LEFT(head, parent, tmp, field);\
tmp = RB_RIGHT(parent, field); \
} \
if ((RB_LEFT(tmp, field) == NULL || \
RB_COLOR(RB_LEFT(tmp, field), field) == RB_BLACK) &&\
(RB_RIGHT(tmp, field) == NULL || \
RB_COLOR(RB_RIGHT(tmp, field), field) == RB_BLACK)) {\
RB_COLOR(tmp, field) = RB_RED; \
elm = parent; \
parent = RB_PARENT(elm, field); \
} else { \
if (RB_RIGHT(tmp, field) == NULL || \
RB_COLOR(RB_RIGHT(tmp, field), field) == RB_BLACK) {\
struct type *oleft; \
if ((oleft = RB_LEFT(tmp, field)) \
!= NULL) \
RB_COLOR(oleft, field) = RB_BLACK;\
RB_COLOR(tmp, field) = RB_RED; \
RB_ROTATE_RIGHT(head, tmp, oleft, field);\
tmp = RB_RIGHT(parent, field); \
} \
RB_COLOR(tmp, field) = RB_COLOR(parent, field);\
RB_COLOR(parent, field) = RB_BLACK; \
if (RB_RIGHT(tmp, field)) \
RB_COLOR(RB_RIGHT(tmp, field), field) = RB_BLACK;\
RB_ROTATE_LEFT(head, parent, tmp, field);\
elm = RB_ROOT(head); \
break; \
} \
} else { \
tmp = RB_LEFT(parent, field); \
if (RB_COLOR(tmp, field) == RB_RED) { \
RB_SET_BLACKRED(tmp, parent, field); \
RB_ROTATE_RIGHT(head, parent, tmp, field);\
tmp = RB_LEFT(parent, field); \
} \
if ((RB_LEFT(tmp, field) == NULL || \
RB_COLOR(RB_LEFT(tmp, field), field) == RB_BLACK) &&\
(RB_RIGHT(tmp, field) == NULL || \
RB_COLOR(RB_RIGHT(tmp, field), field) == RB_BLACK)) {\
RB_COLOR(tmp, field) = RB_RED; \
elm = parent; \
parent = RB_PARENT(elm, field); \
} else { \
if (RB_LEFT(tmp, field) == NULL || \
RB_COLOR(RB_LEFT(tmp, field), field) == RB_BLACK) {\
struct type *oright; \
if ((oright = RB_RIGHT(tmp, field)) \
!= NULL) \
RB_COLOR(oright, field) = RB_BLACK;\
RB_COLOR(tmp, field) = RB_RED; \
RB_ROTATE_LEFT(head, tmp, oright, field);\
tmp = RB_LEFT(parent, field); \
} \
RB_COLOR(tmp, field) = RB_COLOR(parent, field);\
RB_COLOR(parent, field) = RB_BLACK; \
if (RB_LEFT(tmp, field)) \
RB_COLOR(RB_LEFT(tmp, field), field) = RB_BLACK;\
RB_ROTATE_RIGHT(head, parent, tmp, field);\
elm = RB_ROOT(head); \
break; \
} \
} \
} \
if (elm) \
RB_COLOR(elm, field) = RB_BLACK; \
} \
\
attr struct type * \
name##_RB_REMOVE(struct name *head, struct type *elm) \
{ \
struct type *child, *parent, *old = elm; \
int color; \
if (RB_LEFT(elm, field) == NULL) \
child = RB_RIGHT(elm, field); \
else if (RB_RIGHT(elm, field) == NULL) \
child = RB_LEFT(elm, field); \
else { \
struct type *left; \
elm = RB_RIGHT(elm, field); \
while ((left = RB_LEFT(elm, field)) != NULL) \
elm = left; \
child = RB_RIGHT(elm, field); \
parent = RB_PARENT(elm, field); \
color = RB_COLOR(elm, field); \
if (child) \
RB_PARENT(child, field) = parent; \
if (parent) { \
if (RB_LEFT(parent, field) == elm) \
RB_LEFT(parent, field) = child; \
else \
RB_RIGHT(parent, field) = child; \
RB_AUGMENT(parent); \
} else \
RB_ROOT(head) = child; \
if (RB_PARENT(elm, field) == old) \
parent = elm; \
(elm)->field = (old)->field; \
if (RB_PARENT(old, field)) { \
if (RB_LEFT(RB_PARENT(old, field), field) == old)\
RB_LEFT(RB_PARENT(old, field), field) = elm;\
else \
RB_RIGHT(RB_PARENT(old, field), field) = elm;\
RB_AUGMENT(RB_PARENT(old, field)); \
} else \
RB_ROOT(head) = elm; \
RB_PARENT(RB_LEFT(old, field), field) = elm; \
if (RB_RIGHT(old, field)) \
RB_PARENT(RB_RIGHT(old, field), field) = elm; \
if (parent) { \
left = parent; \
do { \
RB_AUGMENT(left); \
} while ((left = RB_PARENT(left, field)) != NULL); \
} \
goto color; \
} \
parent = RB_PARENT(elm, field); \
color = RB_COLOR(elm, field); \
if (child) \
RB_PARENT(child, field) = parent; \
if (parent) { \
if (RB_LEFT(parent, field) == elm) \
RB_LEFT(parent, field) = child; \
else \
RB_RIGHT(parent, field) = child; \
RB_AUGMENT(parent); \
} else \
RB_ROOT(head) = child; \
color: \
if (color == RB_BLACK) \
name##_RB_REMOVE_COLOR(head, parent, child); \
return (old); \
} \
\
/* Inserts a node into the RB tree */ \
attr struct type * \
name##_RB_INSERT(struct name *head, struct type *elm) \
{ \
struct type *tmp; \
struct type *parent = NULL; \
int comp = 0; \
tmp = RB_ROOT(head); \
while (tmp) { \
parent = tmp; \
comp = (cmp)(elm, parent); \
if (comp < 0) \
tmp = RB_LEFT(tmp, field); \
else if (comp > 0) \
tmp = RB_RIGHT(tmp, field); \
else \
return (tmp); \
} \
RB_SET(elm, parent, field); \
if (parent != NULL) { \
if (comp < 0) \
RB_LEFT(parent, field) = elm; \
else \
RB_RIGHT(parent, field) = elm; \
RB_AUGMENT(parent); \
} else \
RB_ROOT(head) = elm; \
name##_RB_INSERT_COLOR(head, elm); \
return (NULL); \
} \
\
/* Finds the node with the same key as elm */ \
attr struct type * \
name##_RB_FIND(struct name *head, struct type *elm) \
{ \
struct type *tmp = RB_ROOT(head); \
int comp; \
while (tmp) { \
comp = cmp(elm, tmp); \
if (comp < 0) \
tmp = RB_LEFT(tmp, field); \
else if (comp > 0) \
tmp = RB_RIGHT(tmp, field); \
else \
return (tmp); \
} \
return (NULL); \
} \
\
/* Finds the first node greater than or equal to the search key */ \
attr struct type * \
name##_RB_NFIND(struct name *head, struct type *elm) \
{ \
struct type *tmp = RB_ROOT(head); \
struct type *res = NULL; \
int comp; \
while (tmp) { \
comp = cmp(elm, tmp); \
if (comp < 0) { \
res = tmp; \
tmp = RB_LEFT(tmp, field); \
} \
else if (comp > 0) \
tmp = RB_RIGHT(tmp, field); \
else \
return (tmp); \
} \
return (res); \
} \
\
/* ARGSUSED */ \
attr struct type * \
name##_RB_NEXT(struct type *elm) \
{ \
if (RB_RIGHT(elm, field)) { \
elm = RB_RIGHT(elm, field); \
while (RB_LEFT(elm, field)) \
elm = RB_LEFT(elm, field); \
} else { \
if (RB_PARENT(elm, field) && \
(elm == RB_LEFT(RB_PARENT(elm, field), field))) \
elm = RB_PARENT(elm, field); \
else { \
while (RB_PARENT(elm, field) && \
(elm == RB_RIGHT(RB_PARENT(elm, field), field)))\
elm = RB_PARENT(elm, field); \
elm = RB_PARENT(elm, field); \
} \
} \
return (elm); \
} \
\
/* ARGSUSED */ \
attr struct type * \
name##_RB_PREV(struct type *elm) \
{ \
if (RB_LEFT(elm, field)) { \
elm = RB_LEFT(elm, field); \
while (RB_RIGHT(elm, field)) \
elm = RB_RIGHT(elm, field); \
} else { \
if (RB_PARENT(elm, field) && \
(elm == RB_RIGHT(RB_PARENT(elm, field), field))) \
elm = RB_PARENT(elm, field); \
else { \
while (RB_PARENT(elm, field) && \
(elm == RB_LEFT(RB_PARENT(elm, field), field)))\
elm = RB_PARENT(elm, field); \
elm = RB_PARENT(elm, field); \
} \
} \
return (elm); \
} \
\
attr struct type * \
name##_RB_MINMAX(struct name *head, int val) \
{ \
struct type *tmp = RB_ROOT(head); \
struct type *parent = NULL; \
while (tmp) { \
parent = tmp; \
if (val < 0) \
tmp = RB_LEFT(tmp, field); \
else \
tmp = RB_RIGHT(tmp, field); \
} \
return (parent); \
}
#define RB_NEGINF -1
#define RB_INF 1
#define RB_INSERT(name, x, y) name##_RB_INSERT(x, y)
#define RB_REMOVE(name, x, y) name##_RB_REMOVE(x, y)
#define RB_FIND(name, x, y) name##_RB_FIND(x, y)
#define RB_NFIND(name, x, y) name##_RB_NFIND(x, y)
#define RB_NEXT(name, x, y) name##_RB_NEXT(y)
#define RB_PREV(name, x, y) name##_RB_PREV(y)
#define RB_MIN(name, x) name##_RB_MINMAX(x, RB_NEGINF)
#define RB_MAX(name, x) name##_RB_MINMAX(x, RB_INF)
#define RB_FOREACH(x, name, head) \
for ((x) = RB_MIN(name, head); \
(x) != NULL; \
(x) = name##_RB_NEXT(x))
#define RB_FOREACH_FROM(x, name, y) \
for ((x) = (y); \
((x) != NULL) && ((y) = name##_RB_NEXT(x), (x) != NULL); \
(x) = (y))
#define RB_FOREACH_SAFE(x, name, head, y) \
for ((x) = RB_MIN(name, head); \
((x) != NULL) && ((y) = name##_RB_NEXT(x), (x) != NULL); \
(x) = (y))
#define RB_FOREACH_REVERSE(x, name, head) \
for ((x) = RB_MAX(name, head); \
(x) != NULL; \
(x) = name##_RB_PREV(x))
#define RB_FOREACH_REVERSE_FROM(x, name, y) \
for ((x) = (y); \
((x) != NULL) && ((y) = name##_RB_PREV(x), (x) != NULL); \
(x) = (y))
#define RB_FOREACH_REVERSE_SAFE(x, name, head, y) \
for ((x) = RB_MAX(name, head); \
((x) != NULL) && ((y) = name##_RB_PREV(x), (x) != NULL); \
(x) = (y))
#endif /* _SYS_TREE_H_ */

View File

@ -909,4 +909,25 @@
</description>
</property>
<property>
<name>hadoop.fuse.connection.timeout</name>
<value>300</value>
<description>
The minimum number of seconds that we'll cache libhdfs connection objects
in fuse_dfs. Lower values will result in lower memory consumption; higher
values may speed up access by avoiding the overhead of creating new
connection objects.
</description>
</property>
<property>
<name>hadoop.fuse.timer.period</name>
<value>5</value>
<description>
The number of seconds between cache expiry checks in fuse_dfs. Lower values
will result in fuse_dfs noticing changes to Kerberos ticket caches more
quickly.
</description>
</property>
</configuration>