svn merge -c 1452978 Merging from trunk to branch-2 to fix HDFS-4542.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1452980 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Kihwal Lee 2013-03-05 20:20:16 +00:00
parent 8f300dabc6
commit 3c241b2494
3 changed files with 281 additions and 105 deletions

View File

@ -2104,6 +2104,9 @@ Release 0.23.7 - UNRELEASED
HDFS-4128. 2NN gets stuck in inconsistent state if edit log replay fails
in the middle (kihwal via daryn)
HDFS-4542. Webhdfs doesn't support secure proxy users (Daryn Sharp via
kihwal)
Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -62,33 +62,8 @@
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam;
import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
import org.apache.hadoop.hdfs.web.resources.DestinationParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.GroupParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
import org.apache.hadoop.hdfs.web.resources.OwnerParam;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.hdfs.web.resources.PermissionParam;
import org.apache.hadoop.hdfs.web.resources.PostOpParam;
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
import org.apache.hadoop.hdfs.web.resources.RecursiveParam;
import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
import org.apache.hadoop.hdfs.web.resources.RenewerParam;
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
import org.apache.hadoop.hdfs.web.resources.UserParam;
import org.apache.hadoop.hdfs.web.resources.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryUtils;
@ -110,6 +85,7 @@
import org.mortbay.util.ajax.JSON;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
/** A FileSystem for HDFS over the web. */
public class WebHdfsFileSystem extends FileSystem
@ -148,7 +124,7 @@ public static boolean isEnabled(final Configuration conf, final Log log) {
return b;
}
private final UserGroupInformation ugi;
private UserGroupInformation ugi;
private InetSocketAddress nnAddr;
private URI uri;
private Token<?> delegationToken;
@ -156,14 +132,6 @@ public static boolean isEnabled(final Configuration conf, final Log log) {
private RetryPolicy retryPolicy = null;
private Path workingDir;
{
try {
ugi = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Return the protocol scheme for the FileSystem.
* <p/>
@ -180,6 +148,7 @@ public synchronized void initialize(URI uri, Configuration conf
) throws IOException {
super.initialize(uri, conf);
setConf(conf);
ugi = UserGroupInformation.getCurrentUser();
try {
this.uri = new URI(uri.getScheme(), uri.getAuthority(), null, null, null);
} catch (URISyntaxException e) {
@ -365,16 +334,32 @@ private URL getNamenodeURL(String path, String query) throws IOException {
return url;
}
private String addDt2Query(String query) throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
Param<?,?>[] getAuthParameters(final HttpOpParam.Op op) throws IOException {
List<Param<?,?>> authParams = Lists.newArrayList();
// Skip adding delegation token for token operations because these
// operations require authentication.
boolean hasToken = false;
if (UserGroupInformation.isSecurityEnabled() &&
op != GetOpParam.Op.GETDELEGATIONTOKEN &&
op != PutOpParam.Op.RENEWDELEGATIONTOKEN) {
synchronized (this) {
if (delegationToken != null) {
hasToken = (delegationToken != null);
if (hasToken) {
final String encoded = delegationToken.encodeToUrlString();
return query + JspHelper.getDelegationTokenUrlParam(encoded);
authParams.add(new DelegationParam(encoded));
} // else we are talking to an insecure cluster
}
}
return query;
UserGroupInformation userUgi = ugi;
if (!hasToken) {
UserGroupInformation realUgi = userUgi.getRealUser();
if (realUgi != null) { // proxy user
authParams.add(new DoAsParam(userUgi.getShortUserName()));
userUgi = realUgi;
}
}
authParams.add(new UserParam(userUgi.getShortUserName()));
return authParams.toArray(new Param<?,?>[0]);
}
URL toUrl(final HttpOpParam.Op op, final Path fspath,
@ -383,17 +368,9 @@ URL toUrl(final HttpOpParam.Op op, final Path fspath,
final String path = PATH_PREFIX
+ (fspath == null? "/": makeQualified(fspath).toUri().getPath());
final String query = op.toQueryString()
+ '&' + new UserParam(ugi)
+ Param.toSortedString("&", getAuthParameters(op))
+ Param.toSortedString("&", parameters);
final URL url;
if (op == PutOpParam.Op.RENEWDELEGATIONTOKEN
|| op == GetOpParam.Op.GETDELEGATIONTOKEN) {
// Skip adding delegation token for getting or renewing delegation token,
// because these operations require kerberos authentication.
url = getNamenodeURL(path, query);
} else {
url = getNamenodeURL(path, addDt2Query(query));
}
final URL url = getNamenodeURL(path, query);
if (LOG.isTraceEnabled()) {
LOG.trace("url=" + url);
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.web;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@ -26,78 +27,273 @@
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
import org.apache.hadoop.hdfs.web.resources.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.SecurityUtilTestHelper;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.junit.Assert;
import org.junit.Test;
import org.junit.*;
public class TestWebHdfsUrl {
// NOTE: port is never used
final URI uri = URI.create(WebHdfsFileSystem.SCHEME + "://" + "127.0.0.1:0");
@Test
public void testDelegationTokenInUrl() throws IOException {
Configuration conf = new Configuration();
final String uri = WebHdfsFileSystem.SCHEME + "://" + "127.0.0.1:9071";
// Turn on security
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(new Text(
ugi.getUserName()), null, null);
FSNamesystem namesystem = mock(FSNamesystem.class);
DelegationTokenSecretManager dtSecretManager = new DelegationTokenSecretManager(
86400000, 86400000, 86400000, 86400000, namesystem);
dtSecretManager.startThreads();
Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(
dtId, dtSecretManager);
token.setService(new Text("127.0.0.1:9071"));
token.setKind(WebHdfsFileSystem.TOKEN_KIND);
ugi.addToken(token);
final WebHdfsFileSystem webhdfs = (WebHdfsFileSystem) FileSystem.get(
URI.create(uri), conf);
String tokenString = token.encodeToUrlString();
Path fsPath = new Path("/");
URL renewTokenUrl = webhdfs.toUrl(PutOpParam.Op.RENEWDELEGATIONTOKEN,
fsPath, new TokenArgumentParam(tokenString));
URL cancelTokenUrl = webhdfs.toUrl(PutOpParam.Op.CANCELDELEGATIONTOKEN,
fsPath, new TokenArgumentParam(tokenString));
Assert.assertEquals(
generateUrlQueryPrefix(PutOpParam.Op.RENEWDELEGATIONTOKEN,
ugi.getUserName())
+ "&token=" + tokenString, renewTokenUrl.getQuery());
Token<DelegationTokenIdentifier> delegationToken = new Token<DelegationTokenIdentifier>(
token);
delegationToken.setKind(WebHdfsFileSystem.TOKEN_KIND);
Assert.assertEquals(
generateUrlQueryPrefix(PutOpParam.Op.CANCELDELEGATIONTOKEN,
ugi.getUserName())
+ "&token="
+ tokenString
+ "&"
+ DelegationParam.NAME
+ "="
+ delegationToken.encodeToUrlString(), cancelTokenUrl.getQuery());
}
private String generateUrlQueryPrefix(HttpOpParam.Op op, String username) {
return "op=" + op.toString() + "&user.name=" + username;
@Before
public void resetUGI() {
UserGroupInformation.setConfiguration(new Configuration());
}
@Test
@Test(timeout=4000)
public void testSimpleAuthParamsInUrl() throws IOException {
Configuration conf = new Configuration();
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser("test-user");
UserGroupInformation.setLoginUser(ugi);
WebHdfsFileSystem webhdfs = getWebHdfsFileSystem(ugi, conf);
Path fsPath = new Path("/");
// send user+token
URL fileStatusUrl = webhdfs.toUrl(GetOpParam.Op.GETFILESTATUS, fsPath);
checkQueryParams(
new String[]{
GetOpParam.Op.GETFILESTATUS.toQueryString(),
new UserParam(ugi.getShortUserName()).toString()
},
fileStatusUrl);
}
@Test(timeout=4000)
public void testSimpleProxyAuthParamsInUrl() throws IOException {
Configuration conf = new Configuration();
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser("test-user");
ugi = UserGroupInformation.createProxyUser("test-proxy-user", ugi);
UserGroupInformation.setLoginUser(ugi);
WebHdfsFileSystem webhdfs = getWebHdfsFileSystem(ugi, conf);
Path fsPath = new Path("/");
// send real+effective
URL fileStatusUrl = webhdfs.toUrl(GetOpParam.Op.GETFILESTATUS, fsPath);
checkQueryParams(
new String[]{
GetOpParam.Op.GETFILESTATUS.toQueryString(),
new UserParam(ugi.getRealUser().getShortUserName()).toString(),
new DoAsParam(ugi.getShortUserName()).toString()
},
fileStatusUrl);
}
@Test(timeout=4000)
public void testSecureAuthParamsInUrl() throws IOException {
Configuration conf = new Configuration();
// fake turning on security so api thinks it should use tokens
SecurityUtil.setAuthenticationMethod(KERBEROS, conf);
UserGroupInformation.setConfiguration(conf);
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser("test-user");
ugi.setAuthenticationMethod(KERBEROS);
UserGroupInformation.setLoginUser(ugi);
WebHdfsFileSystem webhdfs = getWebHdfsFileSystem(ugi, conf);
Path fsPath = new Path("/");
String tokenString = webhdfs.getRenewToken().encodeToUrlString();
// send user
URL getTokenUrl = webhdfs.toUrl(GetOpParam.Op.GETDELEGATIONTOKEN, fsPath);
checkQueryParams(
new String[]{
GetOpParam.Op.GETDELEGATIONTOKEN.toQueryString(),
new UserParam(ugi.getShortUserName()).toString()
},
getTokenUrl);
// send user
URL renewTokenUrl = webhdfs.toUrl(PutOpParam.Op.RENEWDELEGATIONTOKEN,
fsPath, new TokenArgumentParam(tokenString));
checkQueryParams(
new String[]{
PutOpParam.Op.RENEWDELEGATIONTOKEN.toQueryString(),
new UserParam(ugi.getShortUserName()).toString(),
new TokenArgumentParam(tokenString).toString(),
},
renewTokenUrl);
// send user+token
URL cancelTokenUrl = webhdfs.toUrl(PutOpParam.Op.CANCELDELEGATIONTOKEN,
fsPath, new TokenArgumentParam(tokenString));
checkQueryParams(
new String[]{
PutOpParam.Op.CANCELDELEGATIONTOKEN.toQueryString(),
new UserParam(ugi.getShortUserName()).toString(),
new TokenArgumentParam(tokenString).toString(),
new DelegationParam(tokenString).toString()
},
cancelTokenUrl);
// send user+token
URL fileStatusUrl = webhdfs.toUrl(GetOpParam.Op.GETFILESTATUS, fsPath);
checkQueryParams(
new String[]{
GetOpParam.Op.GETFILESTATUS.toQueryString(),
new UserParam(ugi.getShortUserName()).toString(),
new DelegationParam(tokenString).toString()
},
fileStatusUrl);
// wipe out internal token to simulate auth always required
webhdfs.setDelegationToken(null);
// send user
cancelTokenUrl = webhdfs.toUrl(PutOpParam.Op.CANCELDELEGATIONTOKEN,
fsPath, new TokenArgumentParam(tokenString));
checkQueryParams(
new String[]{
PutOpParam.Op.CANCELDELEGATIONTOKEN.toQueryString(),
new UserParam(ugi.getShortUserName()).toString(),
new TokenArgumentParam(tokenString).toString(),
},
cancelTokenUrl);
// send user
fileStatusUrl = webhdfs.toUrl(GetOpParam.Op.GETFILESTATUS, fsPath);
checkQueryParams(
new String[]{
GetOpParam.Op.GETFILESTATUS.toQueryString(),
new UserParam(ugi.getShortUserName()).toString()
},
fileStatusUrl);
}
@Test(timeout=4000)
public void testSecureProxyAuthParamsInUrl() throws IOException {
Configuration conf = new Configuration();
// fake turning on security so api thinks it should use tokens
SecurityUtil.setAuthenticationMethod(KERBEROS, conf);
UserGroupInformation.setConfiguration(conf);
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser("test-user");
ugi.setAuthenticationMethod(KERBEROS);
ugi = UserGroupInformation.createProxyUser("test-proxy-user", ugi);
UserGroupInformation.setLoginUser(ugi);
WebHdfsFileSystem webhdfs = getWebHdfsFileSystem(ugi, conf);
Path fsPath = new Path("/");
String tokenString = webhdfs.getRenewToken().encodeToUrlString();
// send real+effective
URL getTokenUrl = webhdfs.toUrl(GetOpParam.Op.GETDELEGATIONTOKEN, fsPath);
checkQueryParams(
new String[]{
GetOpParam.Op.GETDELEGATIONTOKEN.toQueryString(),
new UserParam(ugi.getRealUser().getShortUserName()).toString(),
new DoAsParam(ugi.getShortUserName()).toString()
},
getTokenUrl);
// send real+effective
URL renewTokenUrl = webhdfs.toUrl(PutOpParam.Op.RENEWDELEGATIONTOKEN,
fsPath, new TokenArgumentParam(tokenString));
checkQueryParams(
new String[]{
PutOpParam.Op.RENEWDELEGATIONTOKEN.toQueryString(),
new UserParam(ugi.getRealUser().getShortUserName()).toString(),
new DoAsParam(ugi.getShortUserName()).toString(),
new TokenArgumentParam(tokenString).toString(),
},
renewTokenUrl);
// send effective+token
URL cancelTokenUrl = webhdfs.toUrl(PutOpParam.Op.CANCELDELEGATIONTOKEN,
fsPath, new TokenArgumentParam(tokenString));
checkQueryParams(
new String[]{
PutOpParam.Op.CANCELDELEGATIONTOKEN.toQueryString(),
new UserParam(ugi.getShortUserName()).toString(),
new TokenArgumentParam(tokenString).toString(),
new DelegationParam(tokenString).toString()
},
cancelTokenUrl);
// send effective+token
URL fileStatusUrl = webhdfs.toUrl(GetOpParam.Op.GETFILESTATUS, fsPath);
checkQueryParams(
new String[]{
GetOpParam.Op.GETFILESTATUS.toQueryString(),
new UserParam(ugi.getShortUserName()).toString(),
new DelegationParam(tokenString).toString()
},
fileStatusUrl);
// wipe out internal token to simulate auth always required
webhdfs.setDelegationToken(null);
// send real+effective
cancelTokenUrl = webhdfs.toUrl(PutOpParam.Op.CANCELDELEGATIONTOKEN,
fsPath, new TokenArgumentParam(tokenString));
checkQueryParams(
new String[]{
PutOpParam.Op.CANCELDELEGATIONTOKEN.toQueryString(),
new UserParam(ugi.getRealUser().getShortUserName()).toString(),
new DoAsParam(ugi.getShortUserName()).toString(),
new TokenArgumentParam(tokenString).toString()
},
cancelTokenUrl);
// send real+effective
fileStatusUrl = webhdfs.toUrl(GetOpParam.Op.GETFILESTATUS, fsPath);
checkQueryParams(
new String[]{
GetOpParam.Op.GETFILESTATUS.toQueryString(),
new UserParam(ugi.getRealUser().getShortUserName()).toString(),
new DoAsParam(ugi.getShortUserName()).toString()
},
fileStatusUrl);
}
private void checkQueryParams(String[] expected, URL url) {
Arrays.sort(expected);
String[] query = url.getQuery().split("&");
Arrays.sort(query);
assertEquals(Arrays.toString(expected), Arrays.toString(query));
}
private WebHdfsFileSystem getWebHdfsFileSystem(UserGroupInformation ugi,
Configuration conf) throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(new Text(
ugi.getUserName()), null, null);
FSNamesystem namesystem = mock(FSNamesystem.class);
DelegationTokenSecretManager dtSecretManager = new DelegationTokenSecretManager(
86400000, 86400000, 86400000, 86400000, namesystem);
dtSecretManager.startThreads();
Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(
dtId, dtSecretManager);
SecurityUtil.setTokenService(
token, NetUtils.createSocketAddr(uri.getAuthority()));
token.setKind(WebHdfsFileSystem.TOKEN_KIND);
ugi.addToken(token);
}
return (WebHdfsFileSystem) FileSystem.get(uri, conf);
}
@Test(timeout=4000)
public void testSelectHdfsDelegationToken() throws Exception {
SecurityUtilTestHelper.setTokenServiceUseIp(true);