HDFS-4567. Webhdfs does not need a token for token operations. Contributed by Daryn Sharp.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1454460 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Kihwal Lee 2013-03-08 16:56:34 +00:00
parent 5f2c518c95
commit 94a22114b9
4 changed files with 197 additions and 27 deletions

View File

@ -2378,6 +2378,9 @@ Release 0.23.7 - UNRELEASED
HDFS-4566. Webdhfs token cancelation should use authentication (daryn)
HDFS-4567. Webhdfs does not need a token for token operations (daryn via
kihwal)
Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -84,6 +84,7 @@
import org.apache.hadoop.util.StringUtils;
import org.mortbay.util.ajax.JSON;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
@ -108,7 +109,8 @@ public class WebHdfsFileSystem extends FileSystem
private DelegationTokenRenewer dtRenewer = null;
private synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
@VisibleForTesting
protected synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
if (dtRenewer == null) {
dtRenewer = DelegationTokenRenewer.getInstance();
}
@ -127,6 +129,7 @@ public static boolean isEnabled(final Configuration conf, final Log log) {
private UserGroupInformation ugi;
private InetSocketAddress nnAddr;
private URI uri;
private boolean hasInitedToken;
private Token<?> delegationToken;
private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
private RetryPolicy retryPolicy = null;
@ -173,24 +176,26 @@ public synchronized void initialize(URI uri, Configuration conf
protected void initDelegationToken() throws IOException {
// look for webhdfs token, then try hdfs
Token<?> token = selectDelegationToken(ugi);
//since we don't already have a token, go get one
boolean createdToken = false;
if (token == null) {
token = getDelegationToken(null);
createdToken = (token != null);
}
// security might be disabled
if (token != null) {
LOG.debug("Found existing DT for " + token.getService());
setDelegationToken(token);
if (createdToken) {
hasInitedToken = true;
}
}
protected synchronized Token<?> getDelegationToken() throws IOException {
if (!hasInitedToken) {
//since we don't already have a token, go get one
Token<?> token = getDelegationToken(null);
// security might be disabled
if (token != null) {
setDelegationToken(token);
addRenewAction(this);
LOG.debug("Created new DT for " + token.getService());
} else {
LOG.debug("Found existing DT for " + token.getService());
}
hasInitedToken = true;
}
return delegationToken;
}
protected Token<DelegationTokenIdentifier> selectDelegationToken(
@ -338,20 +343,16 @@ Param<?,?>[] getAuthParameters(final HttpOpParam.Op op) throws IOException {
List<Param<?,?>> authParams = Lists.newArrayList();
// Skip adding delegation token for token operations because these
// operations require authentication.
boolean hasToken = false;
Token<?> token = null;
if (UserGroupInformation.isSecurityEnabled() &&
op != GetOpParam.Op.GETDELEGATIONTOKEN &&
op != PutOpParam.Op.RENEWDELEGATIONTOKEN &&
op != PutOpParam.Op.CANCELDELEGATIONTOKEN) {
synchronized (this) {
hasToken = (delegationToken != null);
if (hasToken) {
final String encoded = delegationToken.encodeToUrlString();
authParams.add(new DelegationParam(encoded));
} // else we are talking to an insecure cluster
}
token = getDelegationToken();
}
if (!hasToken) {
if (token != null) {
authParams.add(new DelegationParam(token.encodeToUrlString()));
} else {
UserGroupInformation userUgi = ugi;
UserGroupInformation realUgi = userUgi.getRealUser();
if (realUgi != null) { // proxy user

View File

@ -0,0 +1,169 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestWebHdfsTokens {
static Configuration conf;
static UserGroupInformation ugi;
@BeforeClass
public static void setup() throws IOException {
conf = new Configuration();
SecurityUtil.setAuthenticationMethod(KERBEROS, conf);
UserGroupInformation.setConfiguration(conf);
ugi = UserGroupInformation.getCurrentUser();
}
@SuppressWarnings("unchecked")
@Test(timeout=1000)
public void testInitWithNoToken() throws IOException {
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
doReturn(null).when(fs).getDelegationToken(anyString());
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
// when not in ugi, don't get one
verify(fs).initDelegationToken();
verify(fs).selectDelegationToken(ugi);
verify(fs, never()).setDelegationToken(any(Token.class));
verify(fs, never()).getDelegationToken();
verify(fs, never()).getDelegationToken(anyString());
}
@SuppressWarnings("unchecked")
@Test(timeout=1000)
public void testInitWithUGIToken() throws IOException {
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
Token<DelegationTokenIdentifier> token = mock(Token.class);
doReturn(token).when(fs).selectDelegationToken(ugi);
doReturn(null).when(fs).getDelegationToken(anyString());
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
// when in the ugi, store it but don't renew it
verify(fs).initDelegationToken();
verify(fs).selectDelegationToken(ugi);
verify(fs).setDelegationToken(token);
verify(fs, never()).getDelegationToken();
verify(fs, never()).getDelegationToken(anyString());
verify(fs, never()).addRenewAction(fs);
}
@SuppressWarnings("unchecked")
@Test(timeout=1000)
public void testInternalGetDelegationToken() throws IOException {
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
Token<DelegationTokenIdentifier> token = mock(Token.class);
doReturn(null).when(fs).selectDelegationToken(ugi);
doReturn(token).when(fs).getDelegationToken(anyString());
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
// get token, store it, and renew it
Token<?> token2 = fs.getDelegationToken();
assertEquals(token2, token);
verify(fs).getDelegationToken(null);
verify(fs).setDelegationToken(token);
verify(fs).addRenewAction(fs);
reset(fs);
// just return token, don't get/set/renew
token2 = fs.getDelegationToken();
assertEquals(token2, token);
verify(fs, never()).getDelegationToken(null);
verify(fs, never()).setDelegationToken(any(Token.class));
verify(fs, never()).addRenewAction(fs);
}
@SuppressWarnings("unchecked")
@Test(timeout=1000)
public void testTokenForNonTokenOp() throws IOException {
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
Token<DelegationTokenIdentifier> token = mock(Token.class);
doReturn(null).when(fs).selectDelegationToken(ugi);
doReturn(token).when(fs).getDelegationToken(null);
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
// should get/set/renew token
fs.toUrl(GetOpParam.Op.OPEN, null);
verify(fs).getDelegationToken();
verify(fs).getDelegationToken(null);
verify(fs).setDelegationToken(token);
verify(fs).addRenewAction(fs);
reset(fs);
// should return prior token
fs.toUrl(GetOpParam.Op.OPEN, null);
verify(fs).getDelegationToken();
verify(fs, never()).getDelegationToken(null);
verify(fs, never()).setDelegationToken(token);
verify(fs, never()).addRenewAction(fs);
}
@Test(timeout=1000)
public void testNoTokenForGetToken() throws IOException {
checkNoTokenForOperation(GetOpParam.Op.GETDELEGATIONTOKEN);
}
@Test(timeout=1000)
public void testNoTokenForCanclToken() throws IOException {
checkNoTokenForOperation(PutOpParam.Op.RENEWDELEGATIONTOKEN);
}
@Test(timeout=1000)
public void testNoTokenForCancelToken() throws IOException {
checkNoTokenForOperation(PutOpParam.Op.CANCELDELEGATIONTOKEN);
}
@SuppressWarnings("unchecked")
private void checkNoTokenForOperation(HttpOpParam.Op op) throws IOException {
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
doReturn(null).when(fs).selectDelegationToken(ugi);
doReturn(null).when(fs).getDelegationToken(null);
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
// do not get a token!
fs.toUrl(op, null);
verify(fs, never()).getDelegationToken();
verify(fs, never()).getDelegationToken(null);
verify(fs, never()).setDelegationToken(any(Token.class));
verify(fs, never()).addRenewAction(fs);
}
}

View File

@ -112,7 +112,7 @@ public void testSecureAuthParamsInUrl() throws IOException {
WebHdfsFileSystem webhdfs = getWebHdfsFileSystem(ugi, conf);
Path fsPath = new Path("/");
String tokenString = webhdfs.getRenewToken().encodeToUrlString();
String tokenString = webhdfs.getDelegationToken().encodeToUrlString();
// send user
URL getTokenUrl = webhdfs.toUrl(GetOpParam.Op.GETDELEGATIONTOKEN, fsPath);
@ -193,7 +193,7 @@ public void testSecureProxyAuthParamsInUrl() throws IOException {
WebHdfsFileSystem webhdfs = getWebHdfsFileSystem(ugi, conf);
Path fsPath = new Path("/");
String tokenString = webhdfs.getRenewToken().encodeToUrlString();
String tokenString = webhdfs.getDelegationToken().encodeToUrlString();
// send real+effective
URL getTokenUrl = webhdfs.toUrl(GetOpParam.Op.GETDELEGATIONTOKEN, fsPath);
@ -379,8 +379,5 @@ public URI getCanonicalUri() {
public int getDefaultPort() {
return super.getDefaultPort();
}
// don't automatically get a token
@Override
protected void initDelegationToken() throws IOException {}
}
}