svn merge -c 1454460. Merging from trunk to branch-2 to fix HDFS-4567.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1454468 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3401855c1a
commit
5a12614da7
|
@ -2138,6 +2138,9 @@ Release 0.23.7 - UNRELEASED
|
|||
|
||||
HDFS-4566. Webdhfs token cancelation should use authentication (daryn)
|
||||
|
||||
HDFS-4567. Webhdfs does not need a token for token operations (daryn via
|
||||
kihwal)
|
||||
|
||||
Release 0.23.6 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -84,6 +84,7 @@ import org.apache.hadoop.util.Progressable;
|
|||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.mortbay.util.ajax.JSON;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
|
@ -108,7 +109,8 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
|
||||
private DelegationTokenRenewer dtRenewer = null;
|
||||
|
||||
private synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
|
||||
@VisibleForTesting
|
||||
protected synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
|
||||
if (dtRenewer == null) {
|
||||
dtRenewer = DelegationTokenRenewer.getInstance();
|
||||
}
|
||||
|
@ -127,6 +129,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
private UserGroupInformation ugi;
|
||||
private InetSocketAddress nnAddr;
|
||||
private URI uri;
|
||||
private boolean hasInitedToken;
|
||||
private Token<?> delegationToken;
|
||||
private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
|
||||
private RetryPolicy retryPolicy = null;
|
||||
|
@ -173,24 +176,26 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
protected void initDelegationToken() throws IOException {
|
||||
// look for webhdfs token, then try hdfs
|
||||
Token<?> token = selectDelegationToken(ugi);
|
||||
|
||||
//since we don't already have a token, go get one
|
||||
boolean createdToken = false;
|
||||
if (token == null) {
|
||||
token = getDelegationToken(null);
|
||||
createdToken = (token != null);
|
||||
}
|
||||
|
||||
// security might be disabled
|
||||
if (token != null) {
|
||||
LOG.debug("Found existing DT for " + token.getService());
|
||||
setDelegationToken(token);
|
||||
if (createdToken) {
|
||||
hasInitedToken = true;
|
||||
}
|
||||
}
|
||||
|
||||
protected synchronized Token<?> getDelegationToken() throws IOException {
|
||||
if (!hasInitedToken) {
|
||||
//since we don't already have a token, go get one
|
||||
Token<?> token = getDelegationToken(null);
|
||||
// security might be disabled
|
||||
if (token != null) {
|
||||
setDelegationToken(token);
|
||||
addRenewAction(this);
|
||||
LOG.debug("Created new DT for " + token.getService());
|
||||
} else {
|
||||
LOG.debug("Found existing DT for " + token.getService());
|
||||
}
|
||||
hasInitedToken = true;
|
||||
}
|
||||
return delegationToken;
|
||||
}
|
||||
|
||||
protected Token<DelegationTokenIdentifier> selectDelegationToken(
|
||||
|
@ -338,20 +343,16 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
List<Param<?,?>> authParams = Lists.newArrayList();
|
||||
// Skip adding delegation token for token operations because these
|
||||
// operations require authentication.
|
||||
boolean hasToken = false;
|
||||
Token<?> token = null;
|
||||
if (UserGroupInformation.isSecurityEnabled() &&
|
||||
op != GetOpParam.Op.GETDELEGATIONTOKEN &&
|
||||
op != PutOpParam.Op.RENEWDELEGATIONTOKEN &&
|
||||
op != PutOpParam.Op.CANCELDELEGATIONTOKEN) {
|
||||
synchronized (this) {
|
||||
hasToken = (delegationToken != null);
|
||||
if (hasToken) {
|
||||
final String encoded = delegationToken.encodeToUrlString();
|
||||
authParams.add(new DelegationParam(encoded));
|
||||
} // else we are talking to an insecure cluster
|
||||
}
|
||||
token = getDelegationToken();
|
||||
}
|
||||
if (!hasToken) {
|
||||
if (token != null) {
|
||||
authParams.add(new DelegationParam(token.encodeToUrlString()));
|
||||
} else {
|
||||
UserGroupInformation userUgi = ugi;
|
||||
UserGroupInformation realUgi = userUgi.getRealUser();
|
||||
if (realUgi != null) { // proxy user
|
||||
|
|
|
@ -0,0 +1,169 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Matchers.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestWebHdfsTokens {
|
||||
static Configuration conf;
|
||||
static UserGroupInformation ugi;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws IOException {
|
||||
conf = new Configuration();
|
||||
SecurityUtil.setAuthenticationMethod(KERBEROS, conf);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
ugi = UserGroupInformation.getCurrentUser();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test(timeout=1000)
|
||||
public void testInitWithNoToken() throws IOException {
|
||||
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
|
||||
doReturn(null).when(fs).getDelegationToken(anyString());
|
||||
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
|
||||
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
|
||||
|
||||
// when not in ugi, don't get one
|
||||
verify(fs).initDelegationToken();
|
||||
verify(fs).selectDelegationToken(ugi);
|
||||
verify(fs, never()).setDelegationToken(any(Token.class));
|
||||
verify(fs, never()).getDelegationToken();
|
||||
verify(fs, never()).getDelegationToken(anyString());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test(timeout=1000)
|
||||
public void testInitWithUGIToken() throws IOException {
|
||||
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
|
||||
Token<DelegationTokenIdentifier> token = mock(Token.class);
|
||||
doReturn(token).when(fs).selectDelegationToken(ugi);
|
||||
doReturn(null).when(fs).getDelegationToken(anyString());
|
||||
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
|
||||
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
|
||||
|
||||
// when in the ugi, store it but don't renew it
|
||||
verify(fs).initDelegationToken();
|
||||
verify(fs).selectDelegationToken(ugi);
|
||||
verify(fs).setDelegationToken(token);
|
||||
verify(fs, never()).getDelegationToken();
|
||||
verify(fs, never()).getDelegationToken(anyString());
|
||||
verify(fs, never()).addRenewAction(fs);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test(timeout=1000)
|
||||
public void testInternalGetDelegationToken() throws IOException {
|
||||
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
|
||||
Token<DelegationTokenIdentifier> token = mock(Token.class);
|
||||
doReturn(null).when(fs).selectDelegationToken(ugi);
|
||||
doReturn(token).when(fs).getDelegationToken(anyString());
|
||||
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
|
||||
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
|
||||
|
||||
// get token, store it, and renew it
|
||||
Token<?> token2 = fs.getDelegationToken();
|
||||
assertEquals(token2, token);
|
||||
verify(fs).getDelegationToken(null);
|
||||
verify(fs).setDelegationToken(token);
|
||||
verify(fs).addRenewAction(fs);
|
||||
reset(fs);
|
||||
|
||||
// just return token, don't get/set/renew
|
||||
token2 = fs.getDelegationToken();
|
||||
assertEquals(token2, token);
|
||||
verify(fs, never()).getDelegationToken(null);
|
||||
verify(fs, never()).setDelegationToken(any(Token.class));
|
||||
verify(fs, never()).addRenewAction(fs);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test(timeout=1000)
|
||||
public void testTokenForNonTokenOp() throws IOException {
|
||||
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
|
||||
Token<DelegationTokenIdentifier> token = mock(Token.class);
|
||||
doReturn(null).when(fs).selectDelegationToken(ugi);
|
||||
doReturn(token).when(fs).getDelegationToken(null);
|
||||
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
|
||||
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
|
||||
|
||||
// should get/set/renew token
|
||||
fs.toUrl(GetOpParam.Op.OPEN, null);
|
||||
verify(fs).getDelegationToken();
|
||||
verify(fs).getDelegationToken(null);
|
||||
verify(fs).setDelegationToken(token);
|
||||
verify(fs).addRenewAction(fs);
|
||||
reset(fs);
|
||||
|
||||
// should return prior token
|
||||
fs.toUrl(GetOpParam.Op.OPEN, null);
|
||||
verify(fs).getDelegationToken();
|
||||
verify(fs, never()).getDelegationToken(null);
|
||||
verify(fs, never()).setDelegationToken(token);
|
||||
verify(fs, never()).addRenewAction(fs);
|
||||
}
|
||||
|
||||
@Test(timeout=1000)
|
||||
public void testNoTokenForGetToken() throws IOException {
|
||||
checkNoTokenForOperation(GetOpParam.Op.GETDELEGATIONTOKEN);
|
||||
}
|
||||
|
||||
@Test(timeout=1000)
|
||||
public void testNoTokenForCanclToken() throws IOException {
|
||||
checkNoTokenForOperation(PutOpParam.Op.RENEWDELEGATIONTOKEN);
|
||||
}
|
||||
|
||||
@Test(timeout=1000)
|
||||
public void testNoTokenForCancelToken() throws IOException {
|
||||
checkNoTokenForOperation(PutOpParam.Op.CANCELDELEGATIONTOKEN);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void checkNoTokenForOperation(HttpOpParam.Op op) throws IOException {
|
||||
WebHdfsFileSystem fs = spy(new WebHdfsFileSystem());
|
||||
doReturn(null).when(fs).selectDelegationToken(ugi);
|
||||
doReturn(null).when(fs).getDelegationToken(null);
|
||||
doNothing().when(fs).addRenewAction(any(WebHdfsFileSystem.class));
|
||||
fs.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
|
||||
|
||||
// do not get a token!
|
||||
fs.toUrl(op, null);
|
||||
verify(fs, never()).getDelegationToken();
|
||||
verify(fs, never()).getDelegationToken(null);
|
||||
verify(fs, never()).setDelegationToken(any(Token.class));
|
||||
verify(fs, never()).addRenewAction(fs);
|
||||
}
|
||||
}
|
|
@ -112,7 +112,7 @@ public class TestWebHdfsUrl {
|
|||
|
||||
WebHdfsFileSystem webhdfs = getWebHdfsFileSystem(ugi, conf);
|
||||
Path fsPath = new Path("/");
|
||||
String tokenString = webhdfs.getRenewToken().encodeToUrlString();
|
||||
String tokenString = webhdfs.getDelegationToken().encodeToUrlString();
|
||||
|
||||
// send user
|
||||
URL getTokenUrl = webhdfs.toUrl(GetOpParam.Op.GETDELEGATIONTOKEN, fsPath);
|
||||
|
@ -193,7 +193,7 @@ public class TestWebHdfsUrl {
|
|||
|
||||
WebHdfsFileSystem webhdfs = getWebHdfsFileSystem(ugi, conf);
|
||||
Path fsPath = new Path("/");
|
||||
String tokenString = webhdfs.getRenewToken().encodeToUrlString();
|
||||
String tokenString = webhdfs.getDelegationToken().encodeToUrlString();
|
||||
|
||||
// send real+effective
|
||||
URL getTokenUrl = webhdfs.toUrl(GetOpParam.Op.GETDELEGATIONTOKEN, fsPath);
|
||||
|
@ -379,8 +379,5 @@ public class TestWebHdfsUrl {
|
|||
public int getDefaultPort() {
|
||||
return super.getDefaultPort();
|
||||
}
|
||||
// don't automatically get a token
|
||||
@Override
|
||||
protected void initDelegationToken() throws IOException {}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue