HDFS-3568. fuse_dfs: add support for security. Contributed by Colin McCabe.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1359824 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
408d1895fa
commit
fb4a6ac9e9
|
@ -166,5 +166,12 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
|
|||
"hadoop.http.staticuser.user";
|
||||
public static final String DEFAULT_HADOOP_HTTP_STATIC_USER =
|
||||
"dr.who";
|
||||
|
||||
/* Path to the Kerberos ticket cache. Setting this will force
|
||||
* UserGroupInformation to use only this ticket cache file when creating a
|
||||
* FileSystem instance.
|
||||
*/
|
||||
public static final String KERBEROS_TICKET_CACHE_PATH =
|
||||
"hadoop.security.kerberos.ticket.cache.path";
|
||||
}
|
||||
|
||||
|
|
|
@ -137,12 +137,10 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|||
*/
|
||||
public static FileSystem get(final URI uri, final Configuration conf,
|
||||
final String user) throws IOException, InterruptedException {
|
||||
UserGroupInformation ugi;
|
||||
if (user == null) {
|
||||
ugi = UserGroupInformation.getCurrentUser();
|
||||
} else {
|
||||
ugi = UserGroupInformation.createRemoteUser(user);
|
||||
}
|
||||
String ticketCachePath =
|
||||
conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
|
||||
UserGroupInformation ugi =
|
||||
UserGroupInformation.getBestUGI(ticketCachePath, user);
|
||||
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
||||
public FileSystem run() throws IOException {
|
||||
return get(uri, conf);
|
||||
|
@ -314,12 +312,10 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|||
*/
|
||||
public static FileSystem newInstance(final URI uri, final Configuration conf,
|
||||
final String user) throws IOException, InterruptedException {
|
||||
UserGroupInformation ugi;
|
||||
if (user == null) {
|
||||
ugi = UserGroupInformation.getCurrentUser();
|
||||
} else {
|
||||
ugi = UserGroupInformation.createRemoteUser(user);
|
||||
}
|
||||
String ticketCachePath =
|
||||
conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
|
||||
UserGroupInformation ugi =
|
||||
UserGroupInformation.getBestUGI(ticketCachePath, user);
|
||||
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
||||
public FileSystem run() throws IOException {
|
||||
return newInstance(uri,conf);
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.security;
|
|||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.security.AccessControlContext;
|
||||
|
@ -32,6 +33,7 @@ import java.util.Arrays;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -453,9 +455,31 @@ public class UserGroupInformation {
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static final HadoopConfiguration HADOOP_LOGIN_CONFIG =
|
||||
new HadoopConfiguration();
|
||||
|
||||
/**
|
||||
* Represents a javax.security configuration that is created at runtime.
|
||||
*/
|
||||
private static class DynamicConfiguration
|
||||
extends javax.security.auth.login.Configuration {
|
||||
private AppConfigurationEntry[] ace;
|
||||
|
||||
DynamicConfiguration(AppConfigurationEntry[] ace) {
|
||||
this.ace = ace;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
|
||||
return ace;
|
||||
}
|
||||
}
|
||||
|
||||
private static LoginContext
|
||||
newLoginContext(String appName, Subject subject) throws LoginException {
|
||||
newLoginContext(String appName, Subject subject,
|
||||
javax.security.auth.login.Configuration loginConf)
|
||||
throws LoginException {
|
||||
// Temporarily switch the thread's ContextClassLoader to match this
|
||||
// class's classloader, so that we can properly load HadoopLoginModule
|
||||
// from the JAAS libraries.
|
||||
|
@ -463,7 +487,7 @@ public class UserGroupInformation {
|
|||
ClassLoader oldCCL = t.getContextClassLoader();
|
||||
t.setContextClassLoader(HadoopLoginModule.class.getClassLoader());
|
||||
try {
|
||||
return new LoginContext(appName, subject, null, new HadoopConfiguration());
|
||||
return new LoginContext(appName, subject, null, loginConf);
|
||||
} finally {
|
||||
t.setContextClassLoader(oldCCL);
|
||||
}
|
||||
|
@ -515,6 +539,82 @@ public class UserGroupInformation {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the most appropriate UserGroupInformation to use
|
||||
*
|
||||
* @param ticketCachePath The Kerberos ticket cache path, or NULL
|
||||
* if none is specfied
|
||||
* @param user The user name, or NULL if none is specified.
|
||||
*
|
||||
* @return The most appropriate UserGroupInformation
|
||||
*/
|
||||
public static UserGroupInformation getBestUGI(
|
||||
String ticketCachePath, String user) throws IOException {
|
||||
if (ticketCachePath != null) {
|
||||
return getUGIFromTicketCache(ticketCachePath, user);
|
||||
} else if (user == null) {
|
||||
return getCurrentUser();
|
||||
} else {
|
||||
return createRemoteUser(user);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a UserGroupInformation from a Kerberos ticket cache.
|
||||
*
|
||||
* @param user The principal name to load from the ticket
|
||||
* cache
|
||||
* @param ticketCachePath the path to the ticket cache file
|
||||
*
|
||||
* @throws IOException if the kerberos login fails
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public static UserGroupInformation getUGIFromTicketCache(
|
||||
String ticketCache, String user) throws IOException {
|
||||
if (!isSecurityEnabled()) {
|
||||
return getBestUGI(null, user);
|
||||
}
|
||||
try {
|
||||
Map<String,String> krbOptions = new HashMap<String,String>();
|
||||
krbOptions.put("doNotPrompt", "true");
|
||||
krbOptions.put("useTicketCache", "true");
|
||||
krbOptions.put("useKeyTab", "false");
|
||||
krbOptions.put("renewTGT", "false");
|
||||
krbOptions.put("ticketCache", ticketCache);
|
||||
krbOptions.putAll(HadoopConfiguration.BASIC_JAAS_OPTIONS);
|
||||
AppConfigurationEntry ace = new AppConfigurationEntry(
|
||||
KerberosUtil.getKrb5LoginModuleName(),
|
||||
LoginModuleControlFlag.REQUIRED,
|
||||
krbOptions);
|
||||
DynamicConfiguration dynConf =
|
||||
new DynamicConfiguration(new AppConfigurationEntry[]{ ace });
|
||||
LoginContext login = newLoginContext(
|
||||
HadoopConfiguration.USER_KERBEROS_CONFIG_NAME, null, dynConf);
|
||||
login.login();
|
||||
|
||||
Subject loginSubject = login.getSubject();
|
||||
Set<Principal> loginPrincipals = loginSubject.getPrincipals();
|
||||
if (loginPrincipals.isEmpty()) {
|
||||
throw new RuntimeException("No login principals found!");
|
||||
}
|
||||
if (loginPrincipals.size() != 1) {
|
||||
LOG.warn("found more than one principal in the ticket cache file " +
|
||||
ticketCache);
|
||||
}
|
||||
User ugiUser = new User(loginPrincipals.iterator().next().getName(),
|
||||
AuthenticationMethod.KERBEROS, login);
|
||||
loginSubject.getPrincipals().add(ugiUser);
|
||||
UserGroupInformation ugi = new UserGroupInformation(loginSubject);
|
||||
ugi.setLogin(login);
|
||||
ugi.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
|
||||
return ugi;
|
||||
} catch (LoginException le) {
|
||||
throw new IOException("failure to login using ticket cache file " +
|
||||
ticketCache, le);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the currently logged in user.
|
||||
* @return the logged in user
|
||||
|
@ -530,10 +630,10 @@ public class UserGroupInformation {
|
|||
LoginContext login;
|
||||
if (isSecurityEnabled()) {
|
||||
login = newLoginContext(HadoopConfiguration.USER_KERBEROS_CONFIG_NAME,
|
||||
subject);
|
||||
subject, HADOOP_LOGIN_CONFIG);
|
||||
} else {
|
||||
login = newLoginContext(HadoopConfiguration.SIMPLE_CONFIG_NAME,
|
||||
subject);
|
||||
subject, HADOOP_LOGIN_CONFIG);
|
||||
}
|
||||
login.login();
|
||||
loginUser = new UserGroupInformation(subject);
|
||||
|
@ -673,8 +773,8 @@ public class UserGroupInformation {
|
|||
LoginContext login;
|
||||
long start = 0;
|
||||
try {
|
||||
login =
|
||||
newLoginContext(HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, subject);
|
||||
login = newLoginContext(HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME,
|
||||
subject, HADOOP_LOGIN_CONFIG);
|
||||
start = System.currentTimeMillis();
|
||||
login.login();
|
||||
metrics.loginSuccess.add(System.currentTimeMillis() - start);
|
||||
|
@ -756,7 +856,8 @@ public class UserGroupInformation {
|
|||
// login and also update the subject field of this instance to
|
||||
// have the new credentials (pass it to the LoginContext constructor)
|
||||
login = newLoginContext(
|
||||
HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, getSubject());
|
||||
HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, getSubject(),
|
||||
HADOOP_LOGIN_CONFIG);
|
||||
LOG.info("Initiating re-login for " + keytabPrincipal);
|
||||
start = System.currentTimeMillis();
|
||||
login.login();
|
||||
|
@ -807,7 +908,7 @@ public class UserGroupInformation {
|
|||
//have the new credentials (pass it to the LoginContext constructor)
|
||||
login =
|
||||
newLoginContext(HadoopConfiguration.USER_KERBEROS_CONFIG_NAME,
|
||||
getSubject());
|
||||
getSubject(), HADOOP_LOGIN_CONFIG);
|
||||
LOG.info("Initiating re-login for " + getUserName());
|
||||
login.login();
|
||||
setLogin(login);
|
||||
|
@ -842,8 +943,9 @@ public class UserGroupInformation {
|
|||
keytabPrincipal = user;
|
||||
Subject subject = new Subject();
|
||||
|
||||
LoginContext login =
|
||||
newLoginContext(HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, subject);
|
||||
LoginContext login = newLoginContext(
|
||||
HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, subject,
|
||||
HADOOP_LOGIN_CONFIG);
|
||||
|
||||
start = System.currentTimeMillis();
|
||||
login.login();
|
||||
|
|
|
@ -290,6 +290,8 @@ Branch-2 ( Unreleased changes )
|
|||
HDFS-3555. idle client socket triggers DN ERROR log
|
||||
(should be INFO or DEBUG). (Andy Isaacson via harsh)
|
||||
|
||||
HDFS-3568. fuse_dfs: add support for security. (Colin McCabe via atm)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-2982. Startup performance suffers when there are many edit log
|
||||
|
|
|
@ -21,10 +21,20 @@
|
|||
#include "fuse_connect.h"
|
||||
#include "fuse_users.h"
|
||||
|
||||
#include <limits.h>
|
||||
#include <search.h>
|
||||
|
||||
#define HADOOP_SECURITY_AUTHENTICATION "hadoop.security.authentication"
|
||||
|
||||
enum authConf {
|
||||
AUTH_CONF_UNKNOWN,
|
||||
AUTH_CONF_KERBEROS,
|
||||
AUTH_CONF_OTHER,
|
||||
};
|
||||
|
||||
#define MAX_ELEMENTS (16 * 1024)
|
||||
static struct hsearch_data *fsTable = NULL;
|
||||
static enum authConf hdfsAuthConf = AUTH_CONF_UNKNOWN;
|
||||
static pthread_mutex_t tableMutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
/*
|
||||
|
@ -75,13 +85,96 @@ static int insertFs(char *key, hdfsFS fs) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find out what type of authentication the system administrator
|
||||
* has configured.
|
||||
*
|
||||
* @return the type of authentication, or AUTH_CONF_UNKNOWN on error.
|
||||
*/
|
||||
static enum authConf discoverAuthConf(void)
|
||||
{
|
||||
int ret;
|
||||
char *val = NULL;
|
||||
enum authConf authConf;
|
||||
|
||||
ret = hdfsConfGet(HADOOP_SECURITY_AUTHENTICATION, &val);
|
||||
if (ret)
|
||||
authConf = AUTH_CONF_UNKNOWN;
|
||||
else if (!strcmp(val, "kerberos"))
|
||||
authConf = AUTH_CONF_KERBEROS;
|
||||
else
|
||||
authConf = AUTH_CONF_OTHER;
|
||||
free(val);
|
||||
return authConf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the Kerberos ticket cache path.
|
||||
*
|
||||
* This function finds the Kerberos ticket cache path from the thread ID and
|
||||
* user ID of the process making the request.
|
||||
*
|
||||
* Normally, the ticket cache path is in a well-known location in /tmp.
|
||||
* However, it's possible that the calling process could set the KRB5CCNAME
|
||||
* environment variable, indicating that its Kerberos ticket cache is at a
|
||||
* non-default location. We try to handle this possibility by reading the
|
||||
* process' environment here. This will be allowed if we have root
|
||||
* capabilities, or if our UID is the same as the remote process' UID.
|
||||
*
|
||||
* Note that we don't check to see if the cache file actually exists or not.
|
||||
* We're just trying to find out where it would be if it did exist.
|
||||
*
|
||||
* @param path (out param) the path to the ticket cache file
|
||||
* @param pathLen length of the path buffer
|
||||
*/
|
||||
static void findKerbTicketCachePath(char *path, size_t pathLen)
|
||||
{
|
||||
struct fuse_context *ctx = fuse_get_context();
|
||||
FILE *fp = NULL;
|
||||
static const char * const KRB5CCNAME = "\0KRB5CCNAME=";
|
||||
int c = '\0', pathIdx = 0, keyIdx = 0;
|
||||
size_t KRB5CCNAME_LEN = strlen(KRB5CCNAME + 1) + 1;
|
||||
|
||||
// /proc/<tid>/environ contains the remote process' environment. It is
|
||||
// exposed to us as a series of KEY=VALUE pairs, separated by NULL bytes.
|
||||
snprintf(path, pathLen, "/proc/%d/environ", ctx->pid);
|
||||
fp = fopen(path, "r");
|
||||
if (!fp)
|
||||
goto done;
|
||||
while (1) {
|
||||
if (c == EOF)
|
||||
goto done;
|
||||
if (keyIdx == KRB5CCNAME_LEN) {
|
||||
if (pathIdx >= pathLen - 1)
|
||||
goto done;
|
||||
if (c == '\0')
|
||||
goto done;
|
||||
path[pathIdx++] = c;
|
||||
} else if (KRB5CCNAME[keyIdx++] != c) {
|
||||
keyIdx = 0;
|
||||
}
|
||||
c = fgetc(fp);
|
||||
}
|
||||
|
||||
done:
|
||||
if (fp)
|
||||
fclose(fp);
|
||||
if (pathIdx == 0) {
|
||||
snprintf(path, pathLen, "/tmp/krb5cc_%d", ctx->uid);
|
||||
} else {
|
||||
path[pathIdx] = '\0';
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Connect to the NN as the current user/group.
|
||||
* Returns a fs handle on success, or NULL on failure.
|
||||
*/
|
||||
hdfsFS doConnectAsUser(const char *hostname, int port) {
|
||||
struct hdfsBuilder *bld;
|
||||
uid_t uid = fuse_get_context()->uid;
|
||||
char *user = getUsername(uid);
|
||||
char kpath[PATH_MAX];
|
||||
int ret;
|
||||
hdfsFS fs = NULL;
|
||||
if (NULL == user) {
|
||||
|
@ -93,9 +186,31 @@ hdfsFS doConnectAsUser(const char *hostname, int port) {
|
|||
|
||||
fs = findFs(user);
|
||||
if (NULL == fs) {
|
||||
fs = hdfsConnectAsUserNewInstance(hostname, port, user);
|
||||
if (hdfsAuthConf == AUTH_CONF_UNKNOWN) {
|
||||
hdfsAuthConf = discoverAuthConf();
|
||||
if (hdfsAuthConf == AUTH_CONF_UNKNOWN) {
|
||||
ERROR("Unable to determine the configured value for %s.",
|
||||
HADOOP_SECURITY_AUTHENTICATION);
|
||||
goto done;
|
||||
}
|
||||
}
|
||||
bld = hdfsNewBuilder();
|
||||
if (!bld) {
|
||||
ERROR("Unable to create hdfs builder");
|
||||
goto done;
|
||||
}
|
||||
hdfsBuilderSetForceNewInstance(bld);
|
||||
hdfsBuilderSetNameNode(bld, hostname);
|
||||
hdfsBuilderSetNameNodePort(bld, port);
|
||||
hdfsBuilderSetUserName(bld, user);
|
||||
if (hdfsAuthConf == AUTH_CONF_KERBEROS) {
|
||||
findKerbTicketCachePath(kpath, sizeof(kpath));
|
||||
hdfsBuilderSetKerbTicketCachePath(bld, kpath);
|
||||
}
|
||||
fs = hdfsBuilderConnect(bld);
|
||||
if (NULL == fs) {
|
||||
ERROR("Unable to create fs for user %s", user);
|
||||
int err = errno;
|
||||
ERROR("Unable to create fs for user %s: error code %d", user, err);
|
||||
goto done;
|
||||
}
|
||||
if (-1 == insertFs(user, fs)) {
|
||||
|
@ -106,9 +221,7 @@ hdfsFS doConnectAsUser(const char *hostname, int port) {
|
|||
done:
|
||||
ret = pthread_mutex_unlock(&tableMutex);
|
||||
assert(0 == ret);
|
||||
if (user) {
|
||||
free(user);
|
||||
}
|
||||
free(user);
|
||||
return fs;
|
||||
}
|
||||
|
||||
|
|
|
@ -45,6 +45,7 @@
|
|||
#define JMETHOD2(X, Y, R) "(" X Y ")" R
|
||||
#define JMETHOD3(X, Y, Z, R) "(" X Y Z")" R
|
||||
|
||||
#define KERBEROS_TICKET_CACHE_PATH "hadoop.security.kerberos.ticket.cache.path"
|
||||
|
||||
/**
|
||||
* hdfsJniEnv: A wrapper struct to be used as 'value'
|
||||
|
@ -168,268 +169,378 @@ done:
|
|||
return errnum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a configuration value.
|
||||
*
|
||||
* @param env The JNI environment
|
||||
* @param jConfiguration The configuration object to modify
|
||||
* @param key The key to modify
|
||||
* @param value The value to set the key to
|
||||
*
|
||||
* @return 0 on success; error code otherwise
|
||||
*/
|
||||
static int hadoopConfSet(JNIEnv *env, jobject jConfiguration,
|
||||
const char *key, const char *value)
|
||||
{
|
||||
int ret;
|
||||
jthrowable jExc = NULL;
|
||||
jstring jkey = NULL, jvalue = NULL;
|
||||
char buf[1024];
|
||||
|
||||
jkey = (*env)->NewStringUTF(env, key);
|
||||
if (!jkey) {
|
||||
ret = ENOMEM;
|
||||
goto done;
|
||||
}
|
||||
jvalue = (*env)->NewStringUTF(env, value);
|
||||
if (!jvalue) {
|
||||
ret = ENOMEM;
|
||||
goto done;
|
||||
}
|
||||
ret = invokeMethod(env, NULL, &jExc, INSTANCE, jConfiguration,
|
||||
HADOOP_CONF, "set", JMETHOD2(JPARAM(JAVA_STRING),
|
||||
JPARAM(JAVA_STRING), JAVA_VOID),
|
||||
jkey, jvalue);
|
||||
if (ret) {
|
||||
snprintf(buf, sizeof(buf), "hadoopConfSet(%s, %s)", key, value);
|
||||
ret = errnoFromException(jExc, env, buf);
|
||||
goto done;
|
||||
}
|
||||
done:
|
||||
if (jkey)
|
||||
destroyLocalReference(env, jkey);
|
||||
if (jvalue)
|
||||
destroyLocalReference(env, jvalue);
|
||||
if (ret)
|
||||
errno = ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a Java string into a C string.
|
||||
*
|
||||
* @param env The JNI environment
|
||||
* @param jStr The Java string to convert
|
||||
* @param cstr (out param) the C string.
|
||||
* This will be set to a dynamically allocated
|
||||
* UTF-8 C string on success.
|
||||
*
|
||||
* @return 0 on success; error code otherwise
|
||||
*/
|
||||
static int jStrToCstr(JNIEnv *env, jstring jstr, char **cstr)
|
||||
{
|
||||
char *tmp;
|
||||
|
||||
hdfsFS hdfsConnect(const char* host, tPort port) {
|
||||
// connect with NULL as user name
|
||||
return hdfsConnectAsUser(host, port, NULL);
|
||||
tmp = (*env)->GetStringUTFChars(env, jstr, NULL);
|
||||
*cstr = strdup(tmp);
|
||||
(*env)->ReleaseStringUTFChars(env, jstr, tmp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int hadoopConfGet(JNIEnv *env, jobject jConfiguration,
|
||||
const char *key, char **val)
|
||||
{
|
||||
int ret;
|
||||
jthrowable jExc = NULL;
|
||||
jvalue jVal;
|
||||
jstring jkey = NULL;
|
||||
char buf[1024];
|
||||
|
||||
jkey = (*env)->NewStringUTF(env, key);
|
||||
if (!jkey) {
|
||||
ret = ENOMEM;
|
||||
goto done;
|
||||
}
|
||||
ret = invokeMethod(env, &jVal, &jExc, INSTANCE, jConfiguration,
|
||||
HADOOP_CONF, "get", JMETHOD1(JPARAM(JAVA_STRING),
|
||||
JPARAM(JAVA_STRING)), jkey);
|
||||
if (ret) {
|
||||
snprintf(buf, sizeof(buf), "hadoopConfGet(%s)", key);
|
||||
ret = errnoFromException(jExc, env, buf);
|
||||
goto done;
|
||||
}
|
||||
if (!jVal.l) {
|
||||
*val = NULL;
|
||||
goto done;
|
||||
}
|
||||
|
||||
ret = jStrToCstr(env, jVal.l, val);
|
||||
if (ret)
|
||||
goto done;
|
||||
done:
|
||||
if (jkey)
|
||||
destroyLocalReference(env, jkey);
|
||||
if (ret)
|
||||
errno = ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int hdfsConfGet(const char *key, char **val)
|
||||
{
|
||||
JNIEnv *env;
|
||||
int ret;
|
||||
jobject jConfiguration = NULL;
|
||||
|
||||
env = getJNIEnv();
|
||||
if (env == NULL) {
|
||||
ret = EINTERNAL;
|
||||
goto done;
|
||||
}
|
||||
jConfiguration = constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V");
|
||||
if (jConfiguration == NULL) {
|
||||
fprintf(stderr, "Can't construct instance of class "
|
||||
"org.apache.hadoop.conf.Configuration\n");
|
||||
ret = EINTERNAL;
|
||||
goto done;
|
||||
}
|
||||
ret = hadoopConfGet(env, jConfiguration, key, val);
|
||||
if (ret)
|
||||
goto done;
|
||||
ret = 0;
|
||||
done:
|
||||
if (jConfiguration)
|
||||
destroyLocalReference(env, jConfiguration);
|
||||
if (ret)
|
||||
errno = ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
void hdfsConfFree(char *val)
|
||||
{
|
||||
free(val);
|
||||
}
|
||||
|
||||
struct hdfsBuilder {
|
||||
int forceNewInstance;
|
||||
const char *nn;
|
||||
tPort port;
|
||||
const char *kerbTicketCachePath;
|
||||
const char *userName;
|
||||
};
|
||||
|
||||
struct hdfsBuilder *hdfsNewBuilder(void)
|
||||
{
|
||||
struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder));
|
||||
if (!bld) {
|
||||
errno = ENOMEM;
|
||||
return NULL;
|
||||
}
|
||||
return bld;
|
||||
}
|
||||
|
||||
void hdfsFreeBuilder(struct hdfsBuilder *bld)
|
||||
{
|
||||
free(bld);
|
||||
}
|
||||
|
||||
void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld)
|
||||
{
|
||||
bld->forceNewInstance = 1;
|
||||
}
|
||||
|
||||
void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
|
||||
{
|
||||
bld->nn = nn;
|
||||
}
|
||||
|
||||
void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
|
||||
{
|
||||
bld->port = port;
|
||||
}
|
||||
|
||||
void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
|
||||
{
|
||||
bld->userName = userName;
|
||||
}
|
||||
|
||||
void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
|
||||
const char *kerbTicketCachePath)
|
||||
{
|
||||
bld->kerbTicketCachePath = kerbTicketCachePath;
|
||||
}
|
||||
|
||||
hdfsFS hdfsConnect(const char* host, tPort port)
|
||||
{
|
||||
struct hdfsBuilder *bld = hdfsNewBuilder();
|
||||
if (!bld)
|
||||
return NULL;
|
||||
hdfsBuilderSetNameNode(bld, host);
|
||||
hdfsBuilderSetNameNodePort(bld, port);
|
||||
return hdfsBuilderConnect(bld);
|
||||
}
|
||||
|
||||
/** Always return a new FileSystem handle */
|
||||
hdfsFS hdfsConnectNewInstance(const char* host, tPort port) {
|
||||
// connect with NULL as user name/groups
|
||||
return hdfsConnectAsUserNewInstance(host, port, NULL);
|
||||
hdfsFS hdfsConnectNewInstance(const char* host, tPort port)
|
||||
{
|
||||
struct hdfsBuilder *bld = hdfsNewBuilder();
|
||||
if (!bld)
|
||||
return NULL;
|
||||
hdfsBuilderSetNameNode(bld, host);
|
||||
hdfsBuilderSetNameNodePort(bld, port);
|
||||
hdfsBuilderSetForceNewInstance(bld);
|
||||
return hdfsBuilderConnect(bld);
|
||||
}
|
||||
|
||||
hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user)
|
||||
{
|
||||
// JAVA EQUIVALENT:
|
||||
// FileSystem fs = FileSystem.get(new Configuration());
|
||||
// return fs;
|
||||
|
||||
JNIEnv *env = 0;
|
||||
jobject jConfiguration = NULL;
|
||||
jobject jFS = NULL;
|
||||
jobject jURI = NULL;
|
||||
jstring jURIString = NULL;
|
||||
jvalue jVal;
|
||||
jthrowable jExc = NULL;
|
||||
char *cURI = 0;
|
||||
jobject gFsRef = NULL;
|
||||
jstring jUserString = NULL;
|
||||
|
||||
|
||||
//Get the JNIEnv* corresponding to current thread
|
||||
env = getJNIEnv();
|
||||
if (env == NULL) {
|
||||
errno = EINTERNAL;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
//Create the org.apache.hadoop.conf.Configuration object
|
||||
jConfiguration =
|
||||
constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V");
|
||||
|
||||
if (jConfiguration == NULL) {
|
||||
fprintf(stderr, "Can't construct instance of class "
|
||||
"org.apache.hadoop.conf.Configuration\n");
|
||||
errno = EINTERNAL;
|
||||
struct hdfsBuilder *bld = hdfsNewBuilder();
|
||||
if (!bld)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (user != NULL) {
|
||||
jUserString = (*env)->NewStringUTF(env, user);
|
||||
}
|
||||
//Check what type of FileSystem the caller wants...
|
||||
if (host == NULL) {
|
||||
// fs = FileSytem::getLocal(conf);
|
||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "getLocal",
|
||||
JMETHOD1(JPARAM(HADOOP_CONF),
|
||||
JPARAM(HADOOP_LOCALFS)),
|
||||
jConfiguration) != 0) {
|
||||
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
||||
"FileSystem::getLocal");
|
||||
goto done;
|
||||
}
|
||||
jFS = jVal.l;
|
||||
}
|
||||
//FileSystem.get(conf) -> FileSystem.get(FileSystem.getDefaultUri(conf),
|
||||
// conf, user)
|
||||
else if (!strcmp(host, "default") && port == 0) {
|
||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS,
|
||||
"getDefaultUri",
|
||||
"(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/URI;",
|
||||
jConfiguration) != 0) {
|
||||
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs.",
|
||||
"FileSystem::getDefaultUri");
|
||||
goto done;
|
||||
}
|
||||
jURI = jVal.l;
|
||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "get",
|
||||
JMETHOD3(JPARAM(JAVA_NET_URI),
|
||||
JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING),
|
||||
JPARAM(HADOOP_FS)),
|
||||
jURI, jConfiguration, jUserString) != 0) {
|
||||
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
||||
"Filesystem::get(URI, Configuration)");
|
||||
goto done;
|
||||
}
|
||||
|
||||
jFS = jVal.l;
|
||||
}
|
||||
else {
|
||||
// fs = FileSystem::get(URI, conf, ugi);
|
||||
cURI = malloc(strlen(host)+16);
|
||||
sprintf(cURI, "hdfs://%s:%d", host, (int)(port));
|
||||
if (cURI == NULL) {
|
||||
fprintf (stderr, "Couldn't allocate an object of size %d",
|
||||
(int)(strlen(host) + 16));
|
||||
errno = EINTERNAL;
|
||||
goto done;
|
||||
}
|
||||
|
||||
jURIString = (*env)->NewStringUTF(env, cURI);
|
||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, JAVA_NET_URI,
|
||||
"create", "(Ljava/lang/String;)Ljava/net/URI;",
|
||||
jURIString) != 0) {
|
||||
errno = errnoFromException(jExc, env, "java.net.URI::create");
|
||||
goto done;
|
||||
}
|
||||
jURI = jVal.l;
|
||||
|
||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "get",
|
||||
JMETHOD3(JPARAM(JAVA_NET_URI),
|
||||
JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING),
|
||||
JPARAM(HADOOP_FS)),
|
||||
jURI, jConfiguration, jUserString) != 0) {
|
||||
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
||||
"Filesystem::get(URI, Configuration)");
|
||||
goto done;
|
||||
}
|
||||
|
||||
jFS = jVal.l;
|
||||
}
|
||||
|
||||
done:
|
||||
|
||||
// Release unnecessary local references
|
||||
destroyLocalReference(env, jConfiguration);
|
||||
destroyLocalReference(env, jURIString);
|
||||
destroyLocalReference(env, jURI);
|
||||
destroyLocalReference(env, jUserString);
|
||||
|
||||
if (cURI) free(cURI);
|
||||
|
||||
/* Create a global reference for this fs */
|
||||
if (jFS) {
|
||||
gFsRef = (*env)->NewGlobalRef(env, jFS);
|
||||
destroyLocalReference(env, jFS);
|
||||
}
|
||||
|
||||
return gFsRef;
|
||||
hdfsBuilderSetNameNode(bld, host);
|
||||
hdfsBuilderSetNameNodePort(bld, port);
|
||||
hdfsBuilderSetUserName(bld, user);
|
||||
return hdfsBuilderConnect(bld);
|
||||
}
|
||||
|
||||
|
||||
/** Always return a new FileSystem handle */
|
||||
hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *user)
|
||||
hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port,
|
||||
const char *user)
|
||||
{
|
||||
// JAVA EQUIVALENT:
|
||||
// FileSystem fs = FileSystem.get(new Configuration());
|
||||
// return fs;
|
||||
struct hdfsBuilder *bld = hdfsNewBuilder();
|
||||
if (!bld)
|
||||
return NULL;
|
||||
hdfsBuilderSetNameNode(bld, host);
|
||||
hdfsBuilderSetNameNodePort(bld, port);
|
||||
hdfsBuilderSetForceNewInstance(bld);
|
||||
hdfsBuilderSetUserName(bld, user);
|
||||
return hdfsBuilderConnect(bld);
|
||||
}
|
||||
|
||||
hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld)
|
||||
{
|
||||
JNIEnv *env = 0;
|
||||
jobject jConfiguration = NULL;
|
||||
jobject jFS = NULL;
|
||||
jobject jURI = NULL;
|
||||
jstring jURIString = NULL;
|
||||
jobject gFsRef = NULL;
|
||||
jobject jConfiguration = NULL, jFS = NULL, jURI = NULL, jCachePath = NULL;
|
||||
jstring jURIString = NULL, jUserString = NULL;
|
||||
jvalue jVal;
|
||||
jthrowable jExc = NULL;
|
||||
char *cURI = 0;
|
||||
jobject gFsRef = NULL;
|
||||
jstring jUserString = NULL;
|
||||
size_t cURILen;
|
||||
char *cURI = 0;
|
||||
int ret = 0;
|
||||
|
||||
//Get the JNIEnv* corresponding to current thread
|
||||
env = getJNIEnv();
|
||||
if (env == NULL) {
|
||||
errno = EINTERNAL;
|
||||
return NULL;
|
||||
ret = EINTERNAL;
|
||||
goto done;
|
||||
}
|
||||
|
||||
//Create the org.apache.hadoop.conf.Configuration object
|
||||
jConfiguration =
|
||||
constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V");
|
||||
|
||||
// jConfiguration = new Configuration();
|
||||
jConfiguration = constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V");
|
||||
if (jConfiguration == NULL) {
|
||||
fprintf(stderr, "Can't construct instance of class "
|
||||
"org.apache.hadoop.conf.Configuration\n");
|
||||
errno = EINTERNAL;
|
||||
return NULL;
|
||||
goto done;
|
||||
}
|
||||
|
||||
if (user != NULL) {
|
||||
jUserString = (*env)->NewStringUTF(env, user);
|
||||
}
|
||||
//Check what type of FileSystem the caller wants...
|
||||
if (host == NULL) {
|
||||
// fs = FileSytem::newInstanceLocal(conf);
|
||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "newInstanceLocal",
|
||||
JMETHOD1(JPARAM(HADOOP_CONF),
|
||||
JPARAM(HADOOP_LOCALFS)),
|
||||
jConfiguration) != 0) {
|
||||
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
||||
"FileSystem::newInstanceLocal");
|
||||
goto done;
|
||||
if (bld->nn == NULL) {
|
||||
// Get a local filesystem.
|
||||
if (bld->forceNewInstance) {
|
||||
// fs = FileSytem::newInstanceLocal(conf);
|
||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS,
|
||||
"newInstanceLocal", JMETHOD1(JPARAM(HADOOP_CONF),
|
||||
JPARAM(HADOOP_LOCALFS)), jConfiguration)) {
|
||||
ret = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
||||
"FileSystem::newInstanceLocal");
|
||||
goto done;
|
||||
}
|
||||
jFS = jVal.l;
|
||||
} else {
|
||||
// fs = FileSytem::getLocal(conf);
|
||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "getLocal",
|
||||
JMETHOD1(JPARAM(HADOOP_CONF),
|
||||
JPARAM(HADOOP_LOCALFS)),
|
||||
jConfiguration) != 0) {
|
||||
ret = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
||||
"FileSystem::getLocal");
|
||||
goto done;
|
||||
}
|
||||
jFS = jVal.l;
|
||||
}
|
||||
jFS = jVal.l;
|
||||
}
|
||||
else if (!strcmp(host, "default") && port == 0) {
|
||||
//fs = FileSystem::get(conf);
|
||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS,
|
||||
"getDefaultUri",
|
||||
"(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/URI;",
|
||||
jConfiguration) != 0) {
|
||||
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs.",
|
||||
"FileSystem::getDefaultUri");
|
||||
goto done;
|
||||
}
|
||||
jURI = jVal.l;
|
||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL,
|
||||
HADOOP_FS, "newInstance",
|
||||
JMETHOD3(JPARAM(JAVA_NET_URI),
|
||||
JPARAM(HADOOP_CONF),
|
||||
JPARAM(JAVA_STRING),
|
||||
JPARAM(HADOOP_FS)),
|
||||
jURI, jConfiguration, jUserString) != 0) {
|
||||
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
||||
"FileSystem::newInstance");
|
||||
goto done;
|
||||
}
|
||||
jFS = jVal.l;
|
||||
}
|
||||
else {
|
||||
// fs = FileSystem::newInstance(URI, conf);
|
||||
cURI = malloc(strlen(host)+16);
|
||||
sprintf(cURI, "hdfs://%s:%d", host, (int)(port));
|
||||
|
||||
jURIString = (*env)->NewStringUTF(env, cURI);
|
||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, JAVA_NET_URI,
|
||||
"create", "(Ljava/lang/String;)Ljava/net/URI;",
|
||||
jURIString) != 0) {
|
||||
errno = errnoFromException(jExc, env, "java.net.URI::create");
|
||||
goto done;
|
||||
}
|
||||
jURI = jVal.l;
|
||||
|
||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "newInstance",
|
||||
JMETHOD3(JPARAM(JAVA_NET_URI),
|
||||
JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING),
|
||||
JPARAM(HADOOP_FS)),
|
||||
jURI, jConfiguration, jUserString) != 0) {
|
||||
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
||||
"Filesystem::newInstance(URI, Configuration)");
|
||||
goto done;
|
||||
} else {
|
||||
if (!strcmp(bld->nn, "default")) {
|
||||
// jURI = FileSystem.getDefaultUri(conf)
|
||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS,
|
||||
"getDefaultUri",
|
||||
"(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/URI;",
|
||||
jConfiguration) != 0) {
|
||||
ret = errnoFromException(jExc, env, "org.apache.hadoop.fs.",
|
||||
"FileSystem::getDefaultUri");
|
||||
goto done;
|
||||
}
|
||||
jURI = jVal.l;
|
||||
} else {
|
||||
// fs = FileSystem::get(URI, conf, ugi);
|
||||
cURILen = strlen(bld->nn) + 16;
|
||||
cURI = malloc(cURILen);
|
||||
if (!cURI) {
|
||||
fprintf(stderr, "failed to allocate memory for HDFS URI\n");
|
||||
ret = ENOMEM;
|
||||
goto done;
|
||||
}
|
||||
snprintf(cURI, cURILen, "hdfs://%s:%d", bld->nn, (int)(bld->port));
|
||||
jURIString = (*env)->NewStringUTF(env, cURI);
|
||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, JAVA_NET_URI,
|
||||
"create", "(Ljava/lang/String;)Ljava/net/URI;",
|
||||
jURIString) != 0) {
|
||||
ret = errnoFromException(jExc, env, "java.net.URI::create");
|
||||
goto done;
|
||||
}
|
||||
jURI = jVal.l;
|
||||
}
|
||||
|
||||
jFS = jVal.l;
|
||||
if (bld->kerbTicketCachePath) {
|
||||
ret = hadoopConfSet(env, jConfiguration,
|
||||
KERBEROS_TICKET_CACHE_PATH, bld->kerbTicketCachePath);
|
||||
if (ret)
|
||||
goto done;
|
||||
}
|
||||
if (bld->userName) {
|
||||
jUserString = (*env)->NewStringUTF(env, bld->userName);
|
||||
}
|
||||
if (bld->forceNewInstance) {
|
||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL,
|
||||
HADOOP_FS, "newInstance",
|
||||
JMETHOD3(JPARAM(JAVA_NET_URI), JPARAM(HADOOP_CONF),
|
||||
JPARAM(JAVA_STRING), JPARAM(HADOOP_FS)), jURI,
|
||||
jConfiguration, jUserString)) {
|
||||
ret = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
||||
"Filesystem::newInstance(URI, Configuration)");
|
||||
goto done;
|
||||
}
|
||||
jFS = jVal.l;
|
||||
} else {
|
||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "get",
|
||||
JMETHOD3(JPARAM(JAVA_NET_URI), JPARAM(HADOOP_CONF),
|
||||
JPARAM(JAVA_STRING), JPARAM(HADOOP_FS)),
|
||||
jURI, jConfiguration, jUserString)) {
|
||||
ret = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
||||
"Filesystem::get(URI, Configuration, String)");
|
||||
goto done;
|
||||
}
|
||||
jFS = jVal.l;
|
||||
}
|
||||
}
|
||||
|
||||
done:
|
||||
if (jFS) {
|
||||
/* Create a global reference for this fs */
|
||||
gFsRef = (*env)->NewGlobalRef(env, jFS);
|
||||
}
|
||||
|
||||
done:
|
||||
|
||||
// Release unnecessary local references
|
||||
destroyLocalReference(env, jConfiguration);
|
||||
destroyLocalReference(env, jURIString);
|
||||
destroyLocalReference(env, jFS);
|
||||
destroyLocalReference(env, jURI);
|
||||
destroyLocalReference(env, jCachePath);
|
||||
destroyLocalReference(env, jURIString);
|
||||
destroyLocalReference(env, jUserString);
|
||||
free(cURI);
|
||||
free(bld);
|
||||
|
||||
if (cURI) free(cURI);
|
||||
|
||||
/* Create a global reference for this fs */
|
||||
if (jFS) {
|
||||
gFsRef = (*env)->NewGlobalRef(env, jFS);
|
||||
destroyLocalReference(env, jFS);
|
||||
}
|
||||
|
||||
if (ret)
|
||||
errno = ret;
|
||||
return gFsRef;
|
||||
}
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ extern "C" {
|
|||
/**
|
||||
* Some utility decls used in libhdfs.
|
||||
*/
|
||||
|
||||
struct hdfsBuilder;
|
||||
typedef int32_t tSize; /// size of data for read/write io ops
|
||||
typedef time_t tTime; /// time type in seconds
|
||||
typedef int64_t tOffset;/// offset within the file
|
||||
|
@ -97,39 +97,148 @@ extern "C" {
|
|||
/**
|
||||
* hdfsConnectAsUser - Connect to a hdfs file system as a specific user
|
||||
* Connect to the hdfs.
|
||||
* @param host A string containing either a host name, or an ip address
|
||||
* of the namenode of a hdfs cluster. 'host' should be passed as NULL if
|
||||
* you want to connect to local filesystem. 'host' should be passed as
|
||||
* 'default' (and port as 0) to used the 'configured' filesystem
|
||||
* (core-site/core-default.xml).
|
||||
* @param nn The NameNode. See hdfsBuilderSetNameNode for details.
|
||||
* @param port The port on which the server is listening.
|
||||
* @param user the user name (this is hadoop domain user). Or NULL is equivelant to hhdfsConnect(host, port)
|
||||
* @return Returns a handle to the filesystem or NULL on error.
|
||||
* @deprecated Use hdfsBuilderConnect instead.
|
||||
*/
|
||||
hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user);
|
||||
hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user);
|
||||
|
||||
|
||||
/**
|
||||
* hdfsConnect - Connect to a hdfs file system.
|
||||
* Connect to the hdfs.
|
||||
* @param host A string containing either a host name, or an ip address
|
||||
* of the namenode of a hdfs cluster. 'host' should be passed as NULL if
|
||||
* you want to connect to local filesystem. 'host' should be passed as
|
||||
* 'default' (and port as 0) to used the 'configured' filesystem
|
||||
* (core-site/core-default.xml).
|
||||
* @param nn The NameNode. See hdfsBuilderSetNameNode for details.
|
||||
* @param port The port on which the server is listening.
|
||||
* @return Returns a handle to the filesystem or NULL on error.
|
||||
* @deprecated Use hdfsBuilderConnect instead.
|
||||
*/
|
||||
hdfsFS hdfsConnect(const char* host, tPort port);
|
||||
hdfsFS hdfsConnect(const char* nn, tPort port);
|
||||
|
||||
/**
|
||||
* hdfsConnect - Connect to an hdfs file system.
|
||||
*
|
||||
* Forces a new instance to be created
|
||||
*
|
||||
* @param nn The NameNode. See hdfsBuilderSetNameNode for details.
|
||||
* @param port The port on which the server is listening.
|
||||
* @param user The user name to use when connecting
|
||||
* @return Returns a handle to the filesystem or NULL on error.
|
||||
* @deprecated Use hdfsBuilderConnect instead.
|
||||
*/
|
||||
hdfsFS hdfsConnectAsUserNewInstance(const char* nn, tPort port, const char *user );
|
||||
|
||||
/**
|
||||
* hdfsConnect - Connect to an hdfs file system.
|
||||
*
|
||||
* Forces a new instance to be created
|
||||
*
|
||||
* @param nn The NameNode. See hdfsBuilderSetNameNode for details.
|
||||
* @param port The port on which the server is listening.
|
||||
* @return Returns a handle to the filesystem or NULL on error.
|
||||
* @deprecated Use hdfsBuilderConnect instead.
|
||||
*/
|
||||
hdfsFS hdfsConnectNewInstance(const char* nn, tPort port);
|
||||
|
||||
/**
|
||||
* Connect to HDFS using the parameters defined by the builder.
|
||||
*
|
||||
* The HDFS builder will be freed, whether or not the connection was
|
||||
* successful.
|
||||
*
|
||||
* Every successful call to hdfsBuilderConnect should be matched with a call
|
||||
* to hdfsDisconnect, when the hdfsFS is no longer needed.
|
||||
*
|
||||
* @param bld The HDFS builder
|
||||
* @return Returns a handle to the filesystem, or NULL on error.
|
||||
*/
|
||||
hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld);
|
||||
|
||||
/**
|
||||
* This are the same as hdfsConnectAsUser except that every invocation returns a new FileSystem handle.
|
||||
* Applications should call a hdfsDisconnect for every call to hdfsConnectAsUserNewInstance.
|
||||
* Create an HDFS builder.
|
||||
*
|
||||
* @return The HDFS builder, or NULL on error.
|
||||
*/
|
||||
hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *user );
|
||||
hdfsFS hdfsConnectNewInstance(const char* host, tPort port);
|
||||
hdfsFS hdfsConnectPath(const char* uri);
|
||||
struct hdfsBuilder *hdfsNewBuilder(void);
|
||||
|
||||
/**
|
||||
* Force the builder to always create a new instance of the FileSystem,
|
||||
* rather than possibly finding one in the cache.
|
||||
*
|
||||
* @param bld The HDFS builder
|
||||
*/
|
||||
void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld);
|
||||
|
||||
/**
|
||||
* Set the HDFS NameNode to connect to.
|
||||
*
|
||||
* @param bld The HDFS builder
|
||||
* @param nn The NameNode to use.
|
||||
* If the string given is 'default', the default NameNode
|
||||
* configuration will be used (from the XML configuration files)
|
||||
* If NULL is given, a LocalFileSystem will be created.
|
||||
* Otherwise, the string will be interpreted as a hostname or IP
|
||||
* address.
|
||||
*/
|
||||
void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn);
|
||||
|
||||
/**
|
||||
* Set the port of the HDFS NameNode to connect to.
|
||||
*
|
||||
* @param bld The HDFS builder
|
||||
* @param port The port.
|
||||
*/
|
||||
void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port);
|
||||
|
||||
/**
|
||||
* Set the username to use when connecting to the HDFS cluster.
|
||||
*
|
||||
* @param bld The HDFS builder
|
||||
* @param userName The user name. The string will be shallow-copied.
|
||||
*/
|
||||
void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName);
|
||||
|
||||
/**
|
||||
* Set the path to the Kerberos ticket cache to use when connecting to
|
||||
* the HDFS cluster.
|
||||
*
|
||||
* @param bld The HDFS builder
|
||||
* @param kerbTicketCachePath The Kerberos ticket cache path. The string
|
||||
* will be shallow-copied.
|
||||
*/
|
||||
void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
|
||||
const char *kerbTicketCachePath);
|
||||
|
||||
/**
|
||||
* Free an HDFS builder.
|
||||
*
|
||||
* It is normally not necessary to call this function since
|
||||
* hdfsBuilderConnect frees the builder.
|
||||
*
|
||||
* @param bld The HDFS builder
|
||||
*/
|
||||
void hdfsFreeBuilder(struct hdfsBuilder *bld);
|
||||
|
||||
/**
|
||||
* Get a configuration string.
|
||||
*
|
||||
* @param key The key to find
|
||||
* @param val (out param) The value. This will be NULL if the
|
||||
* key isn't found. You must free this string with
|
||||
* hdfsConfFree.
|
||||
*
|
||||
* @return 0 on success; nonzero error code otherwise.
|
||||
* Failure to find the key is not an error.
|
||||
*/
|
||||
int hdfsConfGet(const char *key, char **val);
|
||||
|
||||
/**
|
||||
* Free a configuration string found with hdfsConfGet.
|
||||
*
|
||||
* @param val A configuration string obtained from hdfsConfGet
|
||||
*/
|
||||
void hdfsConfFree(char *val);
|
||||
|
||||
/**
|
||||
* hdfsDisconnect - Disconnect from the hdfs file system.
|
||||
|
|
|
@ -124,7 +124,7 @@ public class TestNameNodeRecovery {
|
|||
}
|
||||
} catch (Throwable e) {
|
||||
fail("caught IOException while trying to skip over bad " +
|
||||
"transaction. message was " + e.getMessage() +
|
||||
"transaction. message was " + e.getMessage() +
|
||||
"\nstack trace\n" + StringUtils.stringifyException(e));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue