HDFS-3568. fuse_dfs: add support for security. Contributed by Colin McCabe.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1359826 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2e4ec5ae32
commit
1ed7052a82
|
@ -166,5 +166,12 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
|
||||||
"hadoop.http.staticuser.user";
|
"hadoop.http.staticuser.user";
|
||||||
public static final String DEFAULT_HADOOP_HTTP_STATIC_USER =
|
public static final String DEFAULT_HADOOP_HTTP_STATIC_USER =
|
||||||
"dr.who";
|
"dr.who";
|
||||||
|
|
||||||
|
/* Path to the Kerberos ticket cache. Setting this will force
|
||||||
|
* UserGroupInformation to use only this ticket cache file when creating a
|
||||||
|
* FileSystem instance.
|
||||||
|
*/
|
||||||
|
public static final String KERBEROS_TICKET_CACHE_PATH =
|
||||||
|
"hadoop.security.kerberos.ticket.cache.path";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -137,12 +137,10 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
*/
|
*/
|
||||||
public static FileSystem get(final URI uri, final Configuration conf,
|
public static FileSystem get(final URI uri, final Configuration conf,
|
||||||
final String user) throws IOException, InterruptedException {
|
final String user) throws IOException, InterruptedException {
|
||||||
UserGroupInformation ugi;
|
String ticketCachePath =
|
||||||
if (user == null) {
|
conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
|
||||||
ugi = UserGroupInformation.getCurrentUser();
|
UserGroupInformation ugi =
|
||||||
} else {
|
UserGroupInformation.getBestUGI(ticketCachePath, user);
|
||||||
ugi = UserGroupInformation.createRemoteUser(user);
|
|
||||||
}
|
|
||||||
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
||||||
public FileSystem run() throws IOException {
|
public FileSystem run() throws IOException {
|
||||||
return get(uri, conf);
|
return get(uri, conf);
|
||||||
|
@ -314,12 +312,10 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
*/
|
*/
|
||||||
public static FileSystem newInstance(final URI uri, final Configuration conf,
|
public static FileSystem newInstance(final URI uri, final Configuration conf,
|
||||||
final String user) throws IOException, InterruptedException {
|
final String user) throws IOException, InterruptedException {
|
||||||
UserGroupInformation ugi;
|
String ticketCachePath =
|
||||||
if (user == null) {
|
conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
|
||||||
ugi = UserGroupInformation.getCurrentUser();
|
UserGroupInformation ugi =
|
||||||
} else {
|
UserGroupInformation.getBestUGI(ticketCachePath, user);
|
||||||
ugi = UserGroupInformation.createRemoteUser(user);
|
|
||||||
}
|
|
||||||
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
|
||||||
public FileSystem run() throws IOException {
|
public FileSystem run() throws IOException {
|
||||||
return newInstance(uri,conf);
|
return newInstance(uri,conf);
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.security;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.UndeclaredThrowableException;
|
import java.lang.reflect.UndeclaredThrowableException;
|
||||||
import java.security.AccessControlContext;
|
import java.security.AccessControlContext;
|
||||||
|
@ -32,6 +33,7 @@ import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -454,8 +456,30 @@ public class UserGroupInformation {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static final HadoopConfiguration HADOOP_LOGIN_CONFIG =
|
||||||
|
new HadoopConfiguration();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a javax.security configuration that is created at runtime.
|
||||||
|
*/
|
||||||
|
private static class DynamicConfiguration
|
||||||
|
extends javax.security.auth.login.Configuration {
|
||||||
|
private AppConfigurationEntry[] ace;
|
||||||
|
|
||||||
|
DynamicConfiguration(AppConfigurationEntry[] ace) {
|
||||||
|
this.ace = ace;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
|
||||||
|
return ace;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static LoginContext
|
private static LoginContext
|
||||||
newLoginContext(String appName, Subject subject) throws LoginException {
|
newLoginContext(String appName, Subject subject,
|
||||||
|
javax.security.auth.login.Configuration loginConf)
|
||||||
|
throws LoginException {
|
||||||
// Temporarily switch the thread's ContextClassLoader to match this
|
// Temporarily switch the thread's ContextClassLoader to match this
|
||||||
// class's classloader, so that we can properly load HadoopLoginModule
|
// class's classloader, so that we can properly load HadoopLoginModule
|
||||||
// from the JAAS libraries.
|
// from the JAAS libraries.
|
||||||
|
@ -463,7 +487,7 @@ public class UserGroupInformation {
|
||||||
ClassLoader oldCCL = t.getContextClassLoader();
|
ClassLoader oldCCL = t.getContextClassLoader();
|
||||||
t.setContextClassLoader(HadoopLoginModule.class.getClassLoader());
|
t.setContextClassLoader(HadoopLoginModule.class.getClassLoader());
|
||||||
try {
|
try {
|
||||||
return new LoginContext(appName, subject, null, new HadoopConfiguration());
|
return new LoginContext(appName, subject, null, loginConf);
|
||||||
} finally {
|
} finally {
|
||||||
t.setContextClassLoader(oldCCL);
|
t.setContextClassLoader(oldCCL);
|
||||||
}
|
}
|
||||||
|
@ -515,6 +539,82 @@ public class UserGroupInformation {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find the most appropriate UserGroupInformation to use
|
||||||
|
*
|
||||||
|
* @param ticketCachePath The Kerberos ticket cache path, or NULL
|
||||||
|
* if none is specfied
|
||||||
|
* @param user The user name, or NULL if none is specified.
|
||||||
|
*
|
||||||
|
* @return The most appropriate UserGroupInformation
|
||||||
|
*/
|
||||||
|
public static UserGroupInformation getBestUGI(
|
||||||
|
String ticketCachePath, String user) throws IOException {
|
||||||
|
if (ticketCachePath != null) {
|
||||||
|
return getUGIFromTicketCache(ticketCachePath, user);
|
||||||
|
} else if (user == null) {
|
||||||
|
return getCurrentUser();
|
||||||
|
} else {
|
||||||
|
return createRemoteUser(user);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a UserGroupInformation from a Kerberos ticket cache.
|
||||||
|
*
|
||||||
|
* @param user The principal name to load from the ticket
|
||||||
|
* cache
|
||||||
|
* @param ticketCachePath the path to the ticket cache file
|
||||||
|
*
|
||||||
|
* @throws IOException if the kerberos login fails
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public static UserGroupInformation getUGIFromTicketCache(
|
||||||
|
String ticketCache, String user) throws IOException {
|
||||||
|
if (!isSecurityEnabled()) {
|
||||||
|
return getBestUGI(null, user);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Map<String,String> krbOptions = new HashMap<String,String>();
|
||||||
|
krbOptions.put("doNotPrompt", "true");
|
||||||
|
krbOptions.put("useTicketCache", "true");
|
||||||
|
krbOptions.put("useKeyTab", "false");
|
||||||
|
krbOptions.put("renewTGT", "false");
|
||||||
|
krbOptions.put("ticketCache", ticketCache);
|
||||||
|
krbOptions.putAll(HadoopConfiguration.BASIC_JAAS_OPTIONS);
|
||||||
|
AppConfigurationEntry ace = new AppConfigurationEntry(
|
||||||
|
KerberosUtil.getKrb5LoginModuleName(),
|
||||||
|
LoginModuleControlFlag.REQUIRED,
|
||||||
|
krbOptions);
|
||||||
|
DynamicConfiguration dynConf =
|
||||||
|
new DynamicConfiguration(new AppConfigurationEntry[]{ ace });
|
||||||
|
LoginContext login = newLoginContext(
|
||||||
|
HadoopConfiguration.USER_KERBEROS_CONFIG_NAME, null, dynConf);
|
||||||
|
login.login();
|
||||||
|
|
||||||
|
Subject loginSubject = login.getSubject();
|
||||||
|
Set<Principal> loginPrincipals = loginSubject.getPrincipals();
|
||||||
|
if (loginPrincipals.isEmpty()) {
|
||||||
|
throw new RuntimeException("No login principals found!");
|
||||||
|
}
|
||||||
|
if (loginPrincipals.size() != 1) {
|
||||||
|
LOG.warn("found more than one principal in the ticket cache file " +
|
||||||
|
ticketCache);
|
||||||
|
}
|
||||||
|
User ugiUser = new User(loginPrincipals.iterator().next().getName(),
|
||||||
|
AuthenticationMethod.KERBEROS, login);
|
||||||
|
loginSubject.getPrincipals().add(ugiUser);
|
||||||
|
UserGroupInformation ugi = new UserGroupInformation(loginSubject);
|
||||||
|
ugi.setLogin(login);
|
||||||
|
ugi.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
|
||||||
|
return ugi;
|
||||||
|
} catch (LoginException le) {
|
||||||
|
throw new IOException("failure to login using ticket cache file " +
|
||||||
|
ticketCache, le);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the currently logged in user.
|
* Get the currently logged in user.
|
||||||
* @return the logged in user
|
* @return the logged in user
|
||||||
|
@ -530,10 +630,10 @@ public class UserGroupInformation {
|
||||||
LoginContext login;
|
LoginContext login;
|
||||||
if (isSecurityEnabled()) {
|
if (isSecurityEnabled()) {
|
||||||
login = newLoginContext(HadoopConfiguration.USER_KERBEROS_CONFIG_NAME,
|
login = newLoginContext(HadoopConfiguration.USER_KERBEROS_CONFIG_NAME,
|
||||||
subject);
|
subject, HADOOP_LOGIN_CONFIG);
|
||||||
} else {
|
} else {
|
||||||
login = newLoginContext(HadoopConfiguration.SIMPLE_CONFIG_NAME,
|
login = newLoginContext(HadoopConfiguration.SIMPLE_CONFIG_NAME,
|
||||||
subject);
|
subject, HADOOP_LOGIN_CONFIG);
|
||||||
}
|
}
|
||||||
login.login();
|
login.login();
|
||||||
loginUser = new UserGroupInformation(subject);
|
loginUser = new UserGroupInformation(subject);
|
||||||
|
@ -673,8 +773,8 @@ public class UserGroupInformation {
|
||||||
LoginContext login;
|
LoginContext login;
|
||||||
long start = 0;
|
long start = 0;
|
||||||
try {
|
try {
|
||||||
login =
|
login = newLoginContext(HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME,
|
||||||
newLoginContext(HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, subject);
|
subject, HADOOP_LOGIN_CONFIG);
|
||||||
start = System.currentTimeMillis();
|
start = System.currentTimeMillis();
|
||||||
login.login();
|
login.login();
|
||||||
metrics.loginSuccess.add(System.currentTimeMillis() - start);
|
metrics.loginSuccess.add(System.currentTimeMillis() - start);
|
||||||
|
@ -756,7 +856,8 @@ public class UserGroupInformation {
|
||||||
// login and also update the subject field of this instance to
|
// login and also update the subject field of this instance to
|
||||||
// have the new credentials (pass it to the LoginContext constructor)
|
// have the new credentials (pass it to the LoginContext constructor)
|
||||||
login = newLoginContext(
|
login = newLoginContext(
|
||||||
HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, getSubject());
|
HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, getSubject(),
|
||||||
|
HADOOP_LOGIN_CONFIG);
|
||||||
LOG.info("Initiating re-login for " + keytabPrincipal);
|
LOG.info("Initiating re-login for " + keytabPrincipal);
|
||||||
start = System.currentTimeMillis();
|
start = System.currentTimeMillis();
|
||||||
login.login();
|
login.login();
|
||||||
|
@ -807,7 +908,7 @@ public class UserGroupInformation {
|
||||||
//have the new credentials (pass it to the LoginContext constructor)
|
//have the new credentials (pass it to the LoginContext constructor)
|
||||||
login =
|
login =
|
||||||
newLoginContext(HadoopConfiguration.USER_KERBEROS_CONFIG_NAME,
|
newLoginContext(HadoopConfiguration.USER_KERBEROS_CONFIG_NAME,
|
||||||
getSubject());
|
getSubject(), HADOOP_LOGIN_CONFIG);
|
||||||
LOG.info("Initiating re-login for " + getUserName());
|
LOG.info("Initiating re-login for " + getUserName());
|
||||||
login.login();
|
login.login();
|
||||||
setLogin(login);
|
setLogin(login);
|
||||||
|
@ -842,8 +943,9 @@ public class UserGroupInformation {
|
||||||
keytabPrincipal = user;
|
keytabPrincipal = user;
|
||||||
Subject subject = new Subject();
|
Subject subject = new Subject();
|
||||||
|
|
||||||
LoginContext login =
|
LoginContext login = newLoginContext(
|
||||||
newLoginContext(HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, subject);
|
HadoopConfiguration.KEYTAB_KERBEROS_CONFIG_NAME, subject,
|
||||||
|
HADOOP_LOGIN_CONFIG);
|
||||||
|
|
||||||
start = System.currentTimeMillis();
|
start = System.currentTimeMillis();
|
||||||
login.login();
|
login.login();
|
||||||
|
|
|
@ -113,6 +113,8 @@ Release 2.0.1-alpha - UNRELEASED
|
||||||
HDFS-3555. idle client socket triggers DN ERROR log
|
HDFS-3555. idle client socket triggers DN ERROR log
|
||||||
(should be INFO or DEBUG). (Andy Isaacson via harsh)
|
(should be INFO or DEBUG). (Andy Isaacson via harsh)
|
||||||
|
|
||||||
|
HDFS-3568. fuse_dfs: add support for security. (Colin McCabe via atm)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-2982. Startup performance suffers when there are many edit log
|
HDFS-2982. Startup performance suffers when there are many edit log
|
||||||
|
|
|
@ -21,10 +21,20 @@
|
||||||
#include "fuse_connect.h"
|
#include "fuse_connect.h"
|
||||||
#include "fuse_users.h"
|
#include "fuse_users.h"
|
||||||
|
|
||||||
|
#include <limits.h>
|
||||||
#include <search.h>
|
#include <search.h>
|
||||||
|
|
||||||
|
#define HADOOP_SECURITY_AUTHENTICATION "hadoop.security.authentication"
|
||||||
|
|
||||||
|
enum authConf {
|
||||||
|
AUTH_CONF_UNKNOWN,
|
||||||
|
AUTH_CONF_KERBEROS,
|
||||||
|
AUTH_CONF_OTHER,
|
||||||
|
};
|
||||||
|
|
||||||
#define MAX_ELEMENTS (16 * 1024)
|
#define MAX_ELEMENTS (16 * 1024)
|
||||||
static struct hsearch_data *fsTable = NULL;
|
static struct hsearch_data *fsTable = NULL;
|
||||||
|
static enum authConf hdfsAuthConf = AUTH_CONF_UNKNOWN;
|
||||||
static pthread_mutex_t tableMutex = PTHREAD_MUTEX_INITIALIZER;
|
static pthread_mutex_t tableMutex = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -75,13 +85,96 @@ static int insertFs(char *key, hdfsFS fs) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find out what type of authentication the system administrator
|
||||||
|
* has configured.
|
||||||
|
*
|
||||||
|
* @return the type of authentication, or AUTH_CONF_UNKNOWN on error.
|
||||||
|
*/
|
||||||
|
static enum authConf discoverAuthConf(void)
|
||||||
|
{
|
||||||
|
int ret;
|
||||||
|
char *val = NULL;
|
||||||
|
enum authConf authConf;
|
||||||
|
|
||||||
|
ret = hdfsConfGet(HADOOP_SECURITY_AUTHENTICATION, &val);
|
||||||
|
if (ret)
|
||||||
|
authConf = AUTH_CONF_UNKNOWN;
|
||||||
|
else if (!strcmp(val, "kerberos"))
|
||||||
|
authConf = AUTH_CONF_KERBEROS;
|
||||||
|
else
|
||||||
|
authConf = AUTH_CONF_OTHER;
|
||||||
|
free(val);
|
||||||
|
return authConf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find the Kerberos ticket cache path.
|
||||||
|
*
|
||||||
|
* This function finds the Kerberos ticket cache path from the thread ID and
|
||||||
|
* user ID of the process making the request.
|
||||||
|
*
|
||||||
|
* Normally, the ticket cache path is in a well-known location in /tmp.
|
||||||
|
* However, it's possible that the calling process could set the KRB5CCNAME
|
||||||
|
* environment variable, indicating that its Kerberos ticket cache is at a
|
||||||
|
* non-default location. We try to handle this possibility by reading the
|
||||||
|
* process' environment here. This will be allowed if we have root
|
||||||
|
* capabilities, or if our UID is the same as the remote process' UID.
|
||||||
|
*
|
||||||
|
* Note that we don't check to see if the cache file actually exists or not.
|
||||||
|
* We're just trying to find out where it would be if it did exist.
|
||||||
|
*
|
||||||
|
* @param path (out param) the path to the ticket cache file
|
||||||
|
* @param pathLen length of the path buffer
|
||||||
|
*/
|
||||||
|
static void findKerbTicketCachePath(char *path, size_t pathLen)
|
||||||
|
{
|
||||||
|
struct fuse_context *ctx = fuse_get_context();
|
||||||
|
FILE *fp = NULL;
|
||||||
|
static const char * const KRB5CCNAME = "\0KRB5CCNAME=";
|
||||||
|
int c = '\0', pathIdx = 0, keyIdx = 0;
|
||||||
|
size_t KRB5CCNAME_LEN = strlen(KRB5CCNAME + 1) + 1;
|
||||||
|
|
||||||
|
// /proc/<tid>/environ contains the remote process' environment. It is
|
||||||
|
// exposed to us as a series of KEY=VALUE pairs, separated by NULL bytes.
|
||||||
|
snprintf(path, pathLen, "/proc/%d/environ", ctx->pid);
|
||||||
|
fp = fopen(path, "r");
|
||||||
|
if (!fp)
|
||||||
|
goto done;
|
||||||
|
while (1) {
|
||||||
|
if (c == EOF)
|
||||||
|
goto done;
|
||||||
|
if (keyIdx == KRB5CCNAME_LEN) {
|
||||||
|
if (pathIdx >= pathLen - 1)
|
||||||
|
goto done;
|
||||||
|
if (c == '\0')
|
||||||
|
goto done;
|
||||||
|
path[pathIdx++] = c;
|
||||||
|
} else if (KRB5CCNAME[keyIdx++] != c) {
|
||||||
|
keyIdx = 0;
|
||||||
|
}
|
||||||
|
c = fgetc(fp);
|
||||||
|
}
|
||||||
|
|
||||||
|
done:
|
||||||
|
if (fp)
|
||||||
|
fclose(fp);
|
||||||
|
if (pathIdx == 0) {
|
||||||
|
snprintf(path, pathLen, "/tmp/krb5cc_%d", ctx->uid);
|
||||||
|
} else {
|
||||||
|
path[pathIdx] = '\0';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Connect to the NN as the current user/group.
|
* Connect to the NN as the current user/group.
|
||||||
* Returns a fs handle on success, or NULL on failure.
|
* Returns a fs handle on success, or NULL on failure.
|
||||||
*/
|
*/
|
||||||
hdfsFS doConnectAsUser(const char *hostname, int port) {
|
hdfsFS doConnectAsUser(const char *hostname, int port) {
|
||||||
|
struct hdfsBuilder *bld;
|
||||||
uid_t uid = fuse_get_context()->uid;
|
uid_t uid = fuse_get_context()->uid;
|
||||||
char *user = getUsername(uid);
|
char *user = getUsername(uid);
|
||||||
|
char kpath[PATH_MAX];
|
||||||
int ret;
|
int ret;
|
||||||
hdfsFS fs = NULL;
|
hdfsFS fs = NULL;
|
||||||
if (NULL == user) {
|
if (NULL == user) {
|
||||||
|
@ -93,9 +186,31 @@ hdfsFS doConnectAsUser(const char *hostname, int port) {
|
||||||
|
|
||||||
fs = findFs(user);
|
fs = findFs(user);
|
||||||
if (NULL == fs) {
|
if (NULL == fs) {
|
||||||
fs = hdfsConnectAsUserNewInstance(hostname, port, user);
|
if (hdfsAuthConf == AUTH_CONF_UNKNOWN) {
|
||||||
|
hdfsAuthConf = discoverAuthConf();
|
||||||
|
if (hdfsAuthConf == AUTH_CONF_UNKNOWN) {
|
||||||
|
ERROR("Unable to determine the configured value for %s.",
|
||||||
|
HADOOP_SECURITY_AUTHENTICATION);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bld = hdfsNewBuilder();
|
||||||
|
if (!bld) {
|
||||||
|
ERROR("Unable to create hdfs builder");
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
hdfsBuilderSetForceNewInstance(bld);
|
||||||
|
hdfsBuilderSetNameNode(bld, hostname);
|
||||||
|
hdfsBuilderSetNameNodePort(bld, port);
|
||||||
|
hdfsBuilderSetUserName(bld, user);
|
||||||
|
if (hdfsAuthConf == AUTH_CONF_KERBEROS) {
|
||||||
|
findKerbTicketCachePath(kpath, sizeof(kpath));
|
||||||
|
hdfsBuilderSetKerbTicketCachePath(bld, kpath);
|
||||||
|
}
|
||||||
|
fs = hdfsBuilderConnect(bld);
|
||||||
if (NULL == fs) {
|
if (NULL == fs) {
|
||||||
ERROR("Unable to create fs for user %s", user);
|
int err = errno;
|
||||||
|
ERROR("Unable to create fs for user %s: error code %d", user, err);
|
||||||
goto done;
|
goto done;
|
||||||
}
|
}
|
||||||
if (-1 == insertFs(user, fs)) {
|
if (-1 == insertFs(user, fs)) {
|
||||||
|
@ -106,9 +221,7 @@ hdfsFS doConnectAsUser(const char *hostname, int port) {
|
||||||
done:
|
done:
|
||||||
ret = pthread_mutex_unlock(&tableMutex);
|
ret = pthread_mutex_unlock(&tableMutex);
|
||||||
assert(0 == ret);
|
assert(0 == ret);
|
||||||
if (user) {
|
|
||||||
free(user);
|
free(user);
|
||||||
}
|
|
||||||
return fs;
|
return fs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,6 +45,7 @@
|
||||||
#define JMETHOD2(X, Y, R) "(" X Y ")" R
|
#define JMETHOD2(X, Y, R) "(" X Y ")" R
|
||||||
#define JMETHOD3(X, Y, Z, R) "(" X Y Z")" R
|
#define JMETHOD3(X, Y, Z, R) "(" X Y Z")" R
|
||||||
|
|
||||||
|
#define KERBEROS_TICKET_CACHE_PATH "hadoop.security.kerberos.ticket.cache.path"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* hdfsJniEnv: A wrapper struct to be used as 'value'
|
* hdfsJniEnv: A wrapper struct to be used as 'value'
|
||||||
|
@ -168,268 +169,378 @@ done:
|
||||||
return errnum;
|
return errnum;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set a configuration value.
|
||||||
|
*
|
||||||
|
* @param env The JNI environment
|
||||||
|
* @param jConfiguration The configuration object to modify
|
||||||
|
* @param key The key to modify
|
||||||
|
* @param value The value to set the key to
|
||||||
|
*
|
||||||
|
* @return 0 on success; error code otherwise
|
||||||
|
*/
|
||||||
|
static int hadoopConfSet(JNIEnv *env, jobject jConfiguration,
|
||||||
|
const char *key, const char *value)
|
||||||
|
{
|
||||||
|
int ret;
|
||||||
|
jthrowable jExc = NULL;
|
||||||
|
jstring jkey = NULL, jvalue = NULL;
|
||||||
|
char buf[1024];
|
||||||
|
|
||||||
|
jkey = (*env)->NewStringUTF(env, key);
|
||||||
|
if (!jkey) {
|
||||||
|
ret = ENOMEM;
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
jvalue = (*env)->NewStringUTF(env, value);
|
||||||
|
if (!jvalue) {
|
||||||
|
ret = ENOMEM;
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
ret = invokeMethod(env, NULL, &jExc, INSTANCE, jConfiguration,
|
||||||
|
HADOOP_CONF, "set", JMETHOD2(JPARAM(JAVA_STRING),
|
||||||
|
JPARAM(JAVA_STRING), JAVA_VOID),
|
||||||
|
jkey, jvalue);
|
||||||
|
if (ret) {
|
||||||
|
snprintf(buf, sizeof(buf), "hadoopConfSet(%s, %s)", key, value);
|
||||||
|
ret = errnoFromException(jExc, env, buf);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
done:
|
||||||
|
if (jkey)
|
||||||
|
destroyLocalReference(env, jkey);
|
||||||
|
if (jvalue)
|
||||||
|
destroyLocalReference(env, jvalue);
|
||||||
|
if (ret)
|
||||||
|
errno = ret;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a Java string into a C string.
|
||||||
|
*
|
||||||
|
* @param env The JNI environment
|
||||||
|
* @param jStr The Java string to convert
|
||||||
|
* @param cstr (out param) the C string.
|
||||||
|
* This will be set to a dynamically allocated
|
||||||
|
* UTF-8 C string on success.
|
||||||
|
*
|
||||||
|
* @return 0 on success; error code otherwise
|
||||||
|
*/
|
||||||
|
static int jStrToCstr(JNIEnv *env, jstring jstr, char **cstr)
|
||||||
|
{
|
||||||
|
char *tmp;
|
||||||
|
|
||||||
hdfsFS hdfsConnect(const char* host, tPort port) {
|
tmp = (*env)->GetStringUTFChars(env, jstr, NULL);
|
||||||
// connect with NULL as user name
|
*cstr = strdup(tmp);
|
||||||
return hdfsConnectAsUser(host, port, NULL);
|
(*env)->ReleaseStringUTFChars(env, jstr, tmp);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int hadoopConfGet(JNIEnv *env, jobject jConfiguration,
|
||||||
|
const char *key, char **val)
|
||||||
|
{
|
||||||
|
int ret;
|
||||||
|
jthrowable jExc = NULL;
|
||||||
|
jvalue jVal;
|
||||||
|
jstring jkey = NULL;
|
||||||
|
char buf[1024];
|
||||||
|
|
||||||
|
jkey = (*env)->NewStringUTF(env, key);
|
||||||
|
if (!jkey) {
|
||||||
|
ret = ENOMEM;
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
ret = invokeMethod(env, &jVal, &jExc, INSTANCE, jConfiguration,
|
||||||
|
HADOOP_CONF, "get", JMETHOD1(JPARAM(JAVA_STRING),
|
||||||
|
JPARAM(JAVA_STRING)), jkey);
|
||||||
|
if (ret) {
|
||||||
|
snprintf(buf, sizeof(buf), "hadoopConfGet(%s)", key);
|
||||||
|
ret = errnoFromException(jExc, env, buf);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
if (!jVal.l) {
|
||||||
|
*val = NULL;
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = jStrToCstr(env, jVal.l, val);
|
||||||
|
if (ret)
|
||||||
|
goto done;
|
||||||
|
done:
|
||||||
|
if (jkey)
|
||||||
|
destroyLocalReference(env, jkey);
|
||||||
|
if (ret)
|
||||||
|
errno = ret;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int hdfsConfGet(const char *key, char **val)
|
||||||
|
{
|
||||||
|
JNIEnv *env;
|
||||||
|
int ret;
|
||||||
|
jobject jConfiguration = NULL;
|
||||||
|
|
||||||
|
env = getJNIEnv();
|
||||||
|
if (env == NULL) {
|
||||||
|
ret = EINTERNAL;
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
jConfiguration = constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V");
|
||||||
|
if (jConfiguration == NULL) {
|
||||||
|
fprintf(stderr, "Can't construct instance of class "
|
||||||
|
"org.apache.hadoop.conf.Configuration\n");
|
||||||
|
ret = EINTERNAL;
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
ret = hadoopConfGet(env, jConfiguration, key, val);
|
||||||
|
if (ret)
|
||||||
|
goto done;
|
||||||
|
ret = 0;
|
||||||
|
done:
|
||||||
|
if (jConfiguration)
|
||||||
|
destroyLocalReference(env, jConfiguration);
|
||||||
|
if (ret)
|
||||||
|
errno = ret;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
void hdfsConfFree(char *val)
|
||||||
|
{
|
||||||
|
free(val);
|
||||||
|
}
|
||||||
|
|
||||||
|
struct hdfsBuilder {
|
||||||
|
int forceNewInstance;
|
||||||
|
const char *nn;
|
||||||
|
tPort port;
|
||||||
|
const char *kerbTicketCachePath;
|
||||||
|
const char *userName;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct hdfsBuilder *hdfsNewBuilder(void)
|
||||||
|
{
|
||||||
|
struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder));
|
||||||
|
if (!bld) {
|
||||||
|
errno = ENOMEM;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
return bld;
|
||||||
|
}
|
||||||
|
|
||||||
|
void hdfsFreeBuilder(struct hdfsBuilder *bld)
|
||||||
|
{
|
||||||
|
free(bld);
|
||||||
|
}
|
||||||
|
|
||||||
|
void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld)
|
||||||
|
{
|
||||||
|
bld->forceNewInstance = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
|
||||||
|
{
|
||||||
|
bld->nn = nn;
|
||||||
|
}
|
||||||
|
|
||||||
|
void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
|
||||||
|
{
|
||||||
|
bld->port = port;
|
||||||
|
}
|
||||||
|
|
||||||
|
void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
|
||||||
|
{
|
||||||
|
bld->userName = userName;
|
||||||
|
}
|
||||||
|
|
||||||
|
void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
|
||||||
|
const char *kerbTicketCachePath)
|
||||||
|
{
|
||||||
|
bld->kerbTicketCachePath = kerbTicketCachePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
hdfsFS hdfsConnect(const char* host, tPort port)
|
||||||
|
{
|
||||||
|
struct hdfsBuilder *bld = hdfsNewBuilder();
|
||||||
|
if (!bld)
|
||||||
|
return NULL;
|
||||||
|
hdfsBuilderSetNameNode(bld, host);
|
||||||
|
hdfsBuilderSetNameNodePort(bld, port);
|
||||||
|
return hdfsBuilderConnect(bld);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Always return a new FileSystem handle */
|
/** Always return a new FileSystem handle */
|
||||||
hdfsFS hdfsConnectNewInstance(const char* host, tPort port) {
|
hdfsFS hdfsConnectNewInstance(const char* host, tPort port)
|
||||||
// connect with NULL as user name/groups
|
{
|
||||||
return hdfsConnectAsUserNewInstance(host, port, NULL);
|
struct hdfsBuilder *bld = hdfsNewBuilder();
|
||||||
|
if (!bld)
|
||||||
|
return NULL;
|
||||||
|
hdfsBuilderSetNameNode(bld, host);
|
||||||
|
hdfsBuilderSetNameNodePort(bld, port);
|
||||||
|
hdfsBuilderSetForceNewInstance(bld);
|
||||||
|
return hdfsBuilderConnect(bld);
|
||||||
}
|
}
|
||||||
|
|
||||||
hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user)
|
hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user)
|
||||||
{
|
{
|
||||||
// JAVA EQUIVALENT:
|
struct hdfsBuilder *bld = hdfsNewBuilder();
|
||||||
// FileSystem fs = FileSystem.get(new Configuration());
|
if (!bld)
|
||||||
// return fs;
|
return NULL;
|
||||||
|
hdfsBuilderSetNameNode(bld, host);
|
||||||
|
hdfsBuilderSetNameNodePort(bld, port);
|
||||||
|
hdfsBuilderSetUserName(bld, user);
|
||||||
|
return hdfsBuilderConnect(bld);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Always return a new FileSystem handle */
|
||||||
|
hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port,
|
||||||
|
const char *user)
|
||||||
|
{
|
||||||
|
struct hdfsBuilder *bld = hdfsNewBuilder();
|
||||||
|
if (!bld)
|
||||||
|
return NULL;
|
||||||
|
hdfsBuilderSetNameNode(bld, host);
|
||||||
|
hdfsBuilderSetNameNodePort(bld, port);
|
||||||
|
hdfsBuilderSetForceNewInstance(bld);
|
||||||
|
hdfsBuilderSetUserName(bld, user);
|
||||||
|
return hdfsBuilderConnect(bld);
|
||||||
|
}
|
||||||
|
|
||||||
|
hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld)
|
||||||
|
{
|
||||||
JNIEnv *env = 0;
|
JNIEnv *env = 0;
|
||||||
jobject jConfiguration = NULL;
|
jobject gFsRef = NULL;
|
||||||
jobject jFS = NULL;
|
jobject jConfiguration = NULL, jFS = NULL, jURI = NULL, jCachePath = NULL;
|
||||||
jobject jURI = NULL;
|
jstring jURIString = NULL, jUserString = NULL;
|
||||||
jstring jURIString = NULL;
|
|
||||||
jvalue jVal;
|
jvalue jVal;
|
||||||
jthrowable jExc = NULL;
|
jthrowable jExc = NULL;
|
||||||
|
size_t cURILen;
|
||||||
char *cURI = 0;
|
char *cURI = 0;
|
||||||
jobject gFsRef = NULL;
|
int ret = 0;
|
||||||
jstring jUserString = NULL;
|
|
||||||
|
|
||||||
|
|
||||||
//Get the JNIEnv* corresponding to current thread
|
//Get the JNIEnv* corresponding to current thread
|
||||||
env = getJNIEnv();
|
env = getJNIEnv();
|
||||||
if (env == NULL) {
|
if (env == NULL) {
|
||||||
errno = EINTERNAL;
|
ret = EINTERNAL;
|
||||||
return NULL;
|
goto done;
|
||||||
}
|
}
|
||||||
|
|
||||||
//Create the org.apache.hadoop.conf.Configuration object
|
// jConfiguration = new Configuration();
|
||||||
jConfiguration =
|
jConfiguration = constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V");
|
||||||
constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V");
|
|
||||||
|
|
||||||
if (jConfiguration == NULL) {
|
if (jConfiguration == NULL) {
|
||||||
fprintf(stderr, "Can't construct instance of class "
|
fprintf(stderr, "Can't construct instance of class "
|
||||||
"org.apache.hadoop.conf.Configuration\n");
|
"org.apache.hadoop.conf.Configuration\n");
|
||||||
errno = EINTERNAL;
|
goto done;
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (user != NULL) {
|
|
||||||
jUserString = (*env)->NewStringUTF(env, user);
|
|
||||||
}
|
|
||||||
//Check what type of FileSystem the caller wants...
|
//Check what type of FileSystem the caller wants...
|
||||||
if (host == NULL) {
|
if (bld->nn == NULL) {
|
||||||
|
// Get a local filesystem.
|
||||||
|
if (bld->forceNewInstance) {
|
||||||
|
// fs = FileSytem::newInstanceLocal(conf);
|
||||||
|
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS,
|
||||||
|
"newInstanceLocal", JMETHOD1(JPARAM(HADOOP_CONF),
|
||||||
|
JPARAM(HADOOP_LOCALFS)), jConfiguration)) {
|
||||||
|
ret = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
||||||
|
"FileSystem::newInstanceLocal");
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
jFS = jVal.l;
|
||||||
|
} else {
|
||||||
// fs = FileSytem::getLocal(conf);
|
// fs = FileSytem::getLocal(conf);
|
||||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "getLocal",
|
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "getLocal",
|
||||||
JMETHOD1(JPARAM(HADOOP_CONF),
|
JMETHOD1(JPARAM(HADOOP_CONF),
|
||||||
JPARAM(HADOOP_LOCALFS)),
|
JPARAM(HADOOP_LOCALFS)),
|
||||||
jConfiguration) != 0) {
|
jConfiguration) != 0) {
|
||||||
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
ret = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
||||||
"FileSystem::getLocal");
|
"FileSystem::getLocal");
|
||||||
goto done;
|
goto done;
|
||||||
}
|
}
|
||||||
jFS = jVal.l;
|
jFS = jVal.l;
|
||||||
}
|
}
|
||||||
//FileSystem.get(conf) -> FileSystem.get(FileSystem.getDefaultUri(conf),
|
} else {
|
||||||
// conf, user)
|
if (!strcmp(bld->nn, "default")) {
|
||||||
else if (!strcmp(host, "default") && port == 0) {
|
// jURI = FileSystem.getDefaultUri(conf)
|
||||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS,
|
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS,
|
||||||
"getDefaultUri",
|
"getDefaultUri",
|
||||||
"(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/URI;",
|
"(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/URI;",
|
||||||
jConfiguration) != 0) {
|
jConfiguration) != 0) {
|
||||||
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs.",
|
ret = errnoFromException(jExc, env, "org.apache.hadoop.fs.",
|
||||||
"FileSystem::getDefaultUri");
|
"FileSystem::getDefaultUri");
|
||||||
goto done;
|
goto done;
|
||||||
}
|
}
|
||||||
jURI = jVal.l;
|
jURI = jVal.l;
|
||||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "get",
|
} else {
|
||||||
JMETHOD3(JPARAM(JAVA_NET_URI),
|
|
||||||
JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING),
|
|
||||||
JPARAM(HADOOP_FS)),
|
|
||||||
jURI, jConfiguration, jUserString) != 0) {
|
|
||||||
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
|
||||||
"Filesystem::get(URI, Configuration)");
|
|
||||||
goto done;
|
|
||||||
}
|
|
||||||
|
|
||||||
jFS = jVal.l;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
// fs = FileSystem::get(URI, conf, ugi);
|
// fs = FileSystem::get(URI, conf, ugi);
|
||||||
cURI = malloc(strlen(host)+16);
|
cURILen = strlen(bld->nn) + 16;
|
||||||
sprintf(cURI, "hdfs://%s:%d", host, (int)(port));
|
cURI = malloc(cURILen);
|
||||||
if (cURI == NULL) {
|
if (!cURI) {
|
||||||
fprintf (stderr, "Couldn't allocate an object of size %d",
|
fprintf(stderr, "failed to allocate memory for HDFS URI\n");
|
||||||
(int)(strlen(host) + 16));
|
ret = ENOMEM;
|
||||||
errno = EINTERNAL;
|
|
||||||
goto done;
|
goto done;
|
||||||
}
|
}
|
||||||
|
snprintf(cURI, cURILen, "hdfs://%s:%d", bld->nn, (int)(bld->port));
|
||||||
jURIString = (*env)->NewStringUTF(env, cURI);
|
jURIString = (*env)->NewStringUTF(env, cURI);
|
||||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, JAVA_NET_URI,
|
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, JAVA_NET_URI,
|
||||||
"create", "(Ljava/lang/String;)Ljava/net/URI;",
|
"create", "(Ljava/lang/String;)Ljava/net/URI;",
|
||||||
jURIString) != 0) {
|
jURIString) != 0) {
|
||||||
errno = errnoFromException(jExc, env, "java.net.URI::create");
|
ret = errnoFromException(jExc, env, "java.net.URI::create");
|
||||||
goto done;
|
goto done;
|
||||||
}
|
}
|
||||||
jURI = jVal.l;
|
jURI = jVal.l;
|
||||||
|
}
|
||||||
|
|
||||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "get",
|
if (bld->kerbTicketCachePath) {
|
||||||
JMETHOD3(JPARAM(JAVA_NET_URI),
|
ret = hadoopConfSet(env, jConfiguration,
|
||||||
JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING),
|
KERBEROS_TICKET_CACHE_PATH, bld->kerbTicketCachePath);
|
||||||
JPARAM(HADOOP_FS)),
|
if (ret)
|
||||||
jURI, jConfiguration, jUserString) != 0) {
|
|
||||||
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
|
||||||
"Filesystem::get(URI, Configuration)");
|
|
||||||
goto done;
|
goto done;
|
||||||
}
|
}
|
||||||
|
if (bld->userName) {
|
||||||
jFS = jVal.l;
|
jUserString = (*env)->NewStringUTF(env, bld->userName);
|
||||||
}
|
}
|
||||||
|
if (bld->forceNewInstance) {
|
||||||
done:
|
|
||||||
|
|
||||||
// Release unnecessary local references
|
|
||||||
destroyLocalReference(env, jConfiguration);
|
|
||||||
destroyLocalReference(env, jURIString);
|
|
||||||
destroyLocalReference(env, jURI);
|
|
||||||
destroyLocalReference(env, jUserString);
|
|
||||||
|
|
||||||
if (cURI) free(cURI);
|
|
||||||
|
|
||||||
/* Create a global reference for this fs */
|
|
||||||
if (jFS) {
|
|
||||||
gFsRef = (*env)->NewGlobalRef(env, jFS);
|
|
||||||
destroyLocalReference(env, jFS);
|
|
||||||
}
|
|
||||||
|
|
||||||
return gFsRef;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/** Always return a new FileSystem handle */
|
|
||||||
hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *user)
|
|
||||||
{
|
|
||||||
// JAVA EQUIVALENT:
|
|
||||||
// FileSystem fs = FileSystem.get(new Configuration());
|
|
||||||
// return fs;
|
|
||||||
|
|
||||||
JNIEnv *env = 0;
|
|
||||||
jobject jConfiguration = NULL;
|
|
||||||
jobject jFS = NULL;
|
|
||||||
jobject jURI = NULL;
|
|
||||||
jstring jURIString = NULL;
|
|
||||||
jvalue jVal;
|
|
||||||
jthrowable jExc = NULL;
|
|
||||||
char *cURI = 0;
|
|
||||||
jobject gFsRef = NULL;
|
|
||||||
jstring jUserString = NULL;
|
|
||||||
|
|
||||||
//Get the JNIEnv* corresponding to current thread
|
|
||||||
env = getJNIEnv();
|
|
||||||
if (env == NULL) {
|
|
||||||
errno = EINTERNAL;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
//Create the org.apache.hadoop.conf.Configuration object
|
|
||||||
jConfiguration =
|
|
||||||
constructNewObjectOfClass(env, NULL, HADOOP_CONF, "()V");
|
|
||||||
|
|
||||||
if (jConfiguration == NULL) {
|
|
||||||
fprintf(stderr, "Can't construct instance of class "
|
|
||||||
"org.apache.hadoop.conf.Configuration\n");
|
|
||||||
errno = EINTERNAL;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (user != NULL) {
|
|
||||||
jUserString = (*env)->NewStringUTF(env, user);
|
|
||||||
}
|
|
||||||
//Check what type of FileSystem the caller wants...
|
|
||||||
if (host == NULL) {
|
|
||||||
// fs = FileSytem::newInstanceLocal(conf);
|
|
||||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "newInstanceLocal",
|
|
||||||
JMETHOD1(JPARAM(HADOOP_CONF),
|
|
||||||
JPARAM(HADOOP_LOCALFS)),
|
|
||||||
jConfiguration) != 0) {
|
|
||||||
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
|
||||||
"FileSystem::newInstanceLocal");
|
|
||||||
goto done;
|
|
||||||
}
|
|
||||||
jFS = jVal.l;
|
|
||||||
}
|
|
||||||
else if (!strcmp(host, "default") && port == 0) {
|
|
||||||
//fs = FileSystem::get(conf);
|
|
||||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS,
|
|
||||||
"getDefaultUri",
|
|
||||||
"(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/URI;",
|
|
||||||
jConfiguration) != 0) {
|
|
||||||
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs.",
|
|
||||||
"FileSystem::getDefaultUri");
|
|
||||||
goto done;
|
|
||||||
}
|
|
||||||
jURI = jVal.l;
|
|
||||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL,
|
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL,
|
||||||
HADOOP_FS, "newInstance",
|
HADOOP_FS, "newInstance",
|
||||||
JMETHOD3(JPARAM(JAVA_NET_URI),
|
JMETHOD3(JPARAM(JAVA_NET_URI), JPARAM(HADOOP_CONF),
|
||||||
JPARAM(HADOOP_CONF),
|
JPARAM(JAVA_STRING), JPARAM(HADOOP_FS)), jURI,
|
||||||
JPARAM(JAVA_STRING),
|
jConfiguration, jUserString)) {
|
||||||
JPARAM(HADOOP_FS)),
|
ret = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
||||||
jURI, jConfiguration, jUserString) != 0) {
|
|
||||||
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
|
||||||
"FileSystem::newInstance");
|
|
||||||
goto done;
|
|
||||||
}
|
|
||||||
jFS = jVal.l;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
// fs = FileSystem::newInstance(URI, conf);
|
|
||||||
cURI = malloc(strlen(host)+16);
|
|
||||||
sprintf(cURI, "hdfs://%s:%d", host, (int)(port));
|
|
||||||
|
|
||||||
jURIString = (*env)->NewStringUTF(env, cURI);
|
|
||||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, JAVA_NET_URI,
|
|
||||||
"create", "(Ljava/lang/String;)Ljava/net/URI;",
|
|
||||||
jURIString) != 0) {
|
|
||||||
errno = errnoFromException(jExc, env, "java.net.URI::create");
|
|
||||||
goto done;
|
|
||||||
}
|
|
||||||
jURI = jVal.l;
|
|
||||||
|
|
||||||
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "newInstance",
|
|
||||||
JMETHOD3(JPARAM(JAVA_NET_URI),
|
|
||||||
JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING),
|
|
||||||
JPARAM(HADOOP_FS)),
|
|
||||||
jURI, jConfiguration, jUserString) != 0) {
|
|
||||||
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
|
||||||
"Filesystem::newInstance(URI, Configuration)");
|
"Filesystem::newInstance(URI, Configuration)");
|
||||||
goto done;
|
goto done;
|
||||||
}
|
}
|
||||||
|
|
||||||
jFS = jVal.l;
|
jFS = jVal.l;
|
||||||
|
} else {
|
||||||
|
if (invokeMethod(env, &jVal, &jExc, STATIC, NULL, HADOOP_FS, "get",
|
||||||
|
JMETHOD3(JPARAM(JAVA_NET_URI), JPARAM(HADOOP_CONF),
|
||||||
|
JPARAM(JAVA_STRING), JPARAM(HADOOP_FS)),
|
||||||
|
jURI, jConfiguration, jUserString)) {
|
||||||
|
ret = errnoFromException(jExc, env, "org.apache.hadoop.fs."
|
||||||
|
"Filesystem::get(URI, Configuration, String)");
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
jFS = jVal.l;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
done:
|
done:
|
||||||
|
if (jFS) {
|
||||||
|
/* Create a global reference for this fs */
|
||||||
|
gFsRef = (*env)->NewGlobalRef(env, jFS);
|
||||||
|
}
|
||||||
|
|
||||||
// Release unnecessary local references
|
// Release unnecessary local references
|
||||||
destroyLocalReference(env, jConfiguration);
|
destroyLocalReference(env, jConfiguration);
|
||||||
destroyLocalReference(env, jURIString);
|
|
||||||
destroyLocalReference(env, jURI);
|
|
||||||
destroyLocalReference(env, jUserString);
|
|
||||||
|
|
||||||
if (cURI) free(cURI);
|
|
||||||
|
|
||||||
/* Create a global reference for this fs */
|
|
||||||
if (jFS) {
|
|
||||||
gFsRef = (*env)->NewGlobalRef(env, jFS);
|
|
||||||
destroyLocalReference(env, jFS);
|
destroyLocalReference(env, jFS);
|
||||||
}
|
destroyLocalReference(env, jURI);
|
||||||
|
destroyLocalReference(env, jCachePath);
|
||||||
|
destroyLocalReference(env, jURIString);
|
||||||
|
destroyLocalReference(env, jUserString);
|
||||||
|
free(cURI);
|
||||||
|
free(bld);
|
||||||
|
|
||||||
|
if (ret)
|
||||||
|
errno = ret;
|
||||||
return gFsRef;
|
return gFsRef;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,7 +53,7 @@ extern "C" {
|
||||||
/**
|
/**
|
||||||
* Some utility decls used in libhdfs.
|
* Some utility decls used in libhdfs.
|
||||||
*/
|
*/
|
||||||
|
struct hdfsBuilder;
|
||||||
typedef int32_t tSize; /// size of data for read/write io ops
|
typedef int32_t tSize; /// size of data for read/write io ops
|
||||||
typedef time_t tTime; /// time type in seconds
|
typedef time_t tTime; /// time type in seconds
|
||||||
typedef int64_t tOffset;/// offset within the file
|
typedef int64_t tOffset;/// offset within the file
|
||||||
|
@ -97,39 +97,148 @@ extern "C" {
|
||||||
/**
|
/**
|
||||||
* hdfsConnectAsUser - Connect to a hdfs file system as a specific user
|
* hdfsConnectAsUser - Connect to a hdfs file system as a specific user
|
||||||
* Connect to the hdfs.
|
* Connect to the hdfs.
|
||||||
* @param host A string containing either a host name, or an ip address
|
* @param nn The NameNode. See hdfsBuilderSetNameNode for details.
|
||||||
* of the namenode of a hdfs cluster. 'host' should be passed as NULL if
|
|
||||||
* you want to connect to local filesystem. 'host' should be passed as
|
|
||||||
* 'default' (and port as 0) to used the 'configured' filesystem
|
|
||||||
* (core-site/core-default.xml).
|
|
||||||
* @param port The port on which the server is listening.
|
* @param port The port on which the server is listening.
|
||||||
* @param user the user name (this is hadoop domain user). Or NULL is equivelant to hhdfsConnect(host, port)
|
* @param user the user name (this is hadoop domain user). Or NULL is equivelant to hhdfsConnect(host, port)
|
||||||
* @return Returns a handle to the filesystem or NULL on error.
|
* @return Returns a handle to the filesystem or NULL on error.
|
||||||
|
* @deprecated Use hdfsBuilderConnect instead.
|
||||||
*/
|
*/
|
||||||
hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user);
|
hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* hdfsConnect - Connect to a hdfs file system.
|
* hdfsConnect - Connect to a hdfs file system.
|
||||||
* Connect to the hdfs.
|
* Connect to the hdfs.
|
||||||
* @param host A string containing either a host name, or an ip address
|
* @param nn The NameNode. See hdfsBuilderSetNameNode for details.
|
||||||
* of the namenode of a hdfs cluster. 'host' should be passed as NULL if
|
|
||||||
* you want to connect to local filesystem. 'host' should be passed as
|
|
||||||
* 'default' (and port as 0) to used the 'configured' filesystem
|
|
||||||
* (core-site/core-default.xml).
|
|
||||||
* @param port The port on which the server is listening.
|
* @param port The port on which the server is listening.
|
||||||
* @return Returns a handle to the filesystem or NULL on error.
|
* @return Returns a handle to the filesystem or NULL on error.
|
||||||
|
* @deprecated Use hdfsBuilderConnect instead.
|
||||||
*/
|
*/
|
||||||
hdfsFS hdfsConnect(const char* host, tPort port);
|
hdfsFS hdfsConnect(const char* nn, tPort port);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This are the same as hdfsConnectAsUser except that every invocation returns a new FileSystem handle.
|
* hdfsConnect - Connect to an hdfs file system.
|
||||||
* Applications should call a hdfsDisconnect for every call to hdfsConnectAsUserNewInstance.
|
*
|
||||||
|
* Forces a new instance to be created
|
||||||
|
*
|
||||||
|
* @param nn The NameNode. See hdfsBuilderSetNameNode for details.
|
||||||
|
* @param port The port on which the server is listening.
|
||||||
|
* @param user The user name to use when connecting
|
||||||
|
* @return Returns a handle to the filesystem or NULL on error.
|
||||||
|
* @deprecated Use hdfsBuilderConnect instead.
|
||||||
*/
|
*/
|
||||||
hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port, const char *user );
|
hdfsFS hdfsConnectAsUserNewInstance(const char* nn, tPort port, const char *user );
|
||||||
hdfsFS hdfsConnectNewInstance(const char* host, tPort port);
|
|
||||||
hdfsFS hdfsConnectPath(const char* uri);
|
/**
|
||||||
|
* hdfsConnect - Connect to an hdfs file system.
|
||||||
|
*
|
||||||
|
* Forces a new instance to be created
|
||||||
|
*
|
||||||
|
* @param nn The NameNode. See hdfsBuilderSetNameNode for details.
|
||||||
|
* @param port The port on which the server is listening.
|
||||||
|
* @return Returns a handle to the filesystem or NULL on error.
|
||||||
|
* @deprecated Use hdfsBuilderConnect instead.
|
||||||
|
*/
|
||||||
|
hdfsFS hdfsConnectNewInstance(const char* nn, tPort port);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connect to HDFS using the parameters defined by the builder.
|
||||||
|
*
|
||||||
|
* The HDFS builder will be freed, whether or not the connection was
|
||||||
|
* successful.
|
||||||
|
*
|
||||||
|
* Every successful call to hdfsBuilderConnect should be matched with a call
|
||||||
|
* to hdfsDisconnect, when the hdfsFS is no longer needed.
|
||||||
|
*
|
||||||
|
* @param bld The HDFS builder
|
||||||
|
* @return Returns a handle to the filesystem, or NULL on error.
|
||||||
|
*/
|
||||||
|
hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an HDFS builder.
|
||||||
|
*
|
||||||
|
* @return The HDFS builder, or NULL on error.
|
||||||
|
*/
|
||||||
|
struct hdfsBuilder *hdfsNewBuilder(void);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Force the builder to always create a new instance of the FileSystem,
|
||||||
|
* rather than possibly finding one in the cache.
|
||||||
|
*
|
||||||
|
* @param bld The HDFS builder
|
||||||
|
*/
|
||||||
|
void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the HDFS NameNode to connect to.
|
||||||
|
*
|
||||||
|
* @param bld The HDFS builder
|
||||||
|
* @param nn The NameNode to use.
|
||||||
|
* If the string given is 'default', the default NameNode
|
||||||
|
* configuration will be used (from the XML configuration files)
|
||||||
|
* If NULL is given, a LocalFileSystem will be created.
|
||||||
|
* Otherwise, the string will be interpreted as a hostname or IP
|
||||||
|
* address.
|
||||||
|
*/
|
||||||
|
void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the port of the HDFS NameNode to connect to.
|
||||||
|
*
|
||||||
|
* @param bld The HDFS builder
|
||||||
|
* @param port The port.
|
||||||
|
*/
|
||||||
|
void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the username to use when connecting to the HDFS cluster.
|
||||||
|
*
|
||||||
|
* @param bld The HDFS builder
|
||||||
|
* @param userName The user name. The string will be shallow-copied.
|
||||||
|
*/
|
||||||
|
void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the path to the Kerberos ticket cache to use when connecting to
|
||||||
|
* the HDFS cluster.
|
||||||
|
*
|
||||||
|
* @param bld The HDFS builder
|
||||||
|
* @param kerbTicketCachePath The Kerberos ticket cache path. The string
|
||||||
|
* will be shallow-copied.
|
||||||
|
*/
|
||||||
|
void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
|
||||||
|
const char *kerbTicketCachePath);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Free an HDFS builder.
|
||||||
|
*
|
||||||
|
* It is normally not necessary to call this function since
|
||||||
|
* hdfsBuilderConnect frees the builder.
|
||||||
|
*
|
||||||
|
* @param bld The HDFS builder
|
||||||
|
*/
|
||||||
|
void hdfsFreeBuilder(struct hdfsBuilder *bld);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a configuration string.
|
||||||
|
*
|
||||||
|
* @param key The key to find
|
||||||
|
* @param val (out param) The value. This will be NULL if the
|
||||||
|
* key isn't found. You must free this string with
|
||||||
|
* hdfsConfFree.
|
||||||
|
*
|
||||||
|
* @return 0 on success; nonzero error code otherwise.
|
||||||
|
* Failure to find the key is not an error.
|
||||||
|
*/
|
||||||
|
int hdfsConfGet(const char *key, char **val);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Free a configuration string found with hdfsConfGet.
|
||||||
|
*
|
||||||
|
* @param val A configuration string obtained from hdfsConfGet
|
||||||
|
*/
|
||||||
|
void hdfsConfFree(char *val);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* hdfsDisconnect - Disconnect from the hdfs file system.
|
* hdfsDisconnect - Disconnect from the hdfs file system.
|
||||||
|
|
Loading…
Reference in New Issue