From 0c5128969522cf754010c32cdcbfcfa5ebe5b3b0 Mon Sep 17 00:00:00 2001 From: Chris Nauroth Date: Fri, 20 Jun 2014 23:58:23 +0000 Subject: [PATCH] HDFS-6222. Remove background token renewer from webhdfs. Contributed by Rushabh Shah and Daryn Sharp. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1604300 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../delegation/DelegationTokenIdentifier.java | 23 ++ .../hadoop/hdfs/web/SWebHdfsFileSystem.java | 4 +- .../hadoop/hdfs/web/WebHdfsFileSystem.java | 140 ++++++-- ...ache.hadoop.security.token.TokenIdentifier | 2 + .../hadoop/hdfs/web/TestWebHdfsTokens.java | 309 +++++++++++++++++- 6 files changed, 439 insertions(+), 42 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 44020ead059..5cbea0dd7cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -684,6 +684,9 @@ Release 2.5.0 - UNRELEASED HDFS-6535. HDFS quota update is wrong when file is appended. (George Wong via jing9) + HDFS-6222. Remove background token renewer from webhdfs. + (Rushabh Shah and Daryn Sharp via cnauroth) + BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java index a5e14cba945..07052ddf760 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java @@ -23,6 +23,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem; +import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; @@ -75,4 +77,25 @@ public static String stringifyToken(final Token token) throws IOException { return ident.toString(); } } + + public static class WebHdfsDelegationTokenIdentifier + extends DelegationTokenIdentifier { + public WebHdfsDelegationTokenIdentifier() { + super(); + } + @Override + public Text getKind() { + return WebHdfsFileSystem.TOKEN_KIND; + } + } + + public static class SWebHdfsDelegationTokenIdentifier extends WebHdfsDelegationTokenIdentifier { + public SWebHdfsDelegationTokenIdentifier() { + super(); + } + @Override + public Text getKind() { + return SWebHdfsFileSystem.TOKEN_KIND; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java index 3c65ad83e28..fa89ec3551b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java @@ -36,8 +36,8 @@ protected String getTransportScheme() { } @Override - protected synchronized void initializeTokenAspect() { - tokenAspect = new TokenAspect(this, tokenServiceName, TOKEN_KIND); + protected Text getTokenKind() { + return TOKEN_KIND; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index f8dcc398bd2..94c666a3a11 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -69,11 +69,14 @@ import org.apache.hadoop.io.retry.RetryUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.TokenSelector; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector; import org.apache.hadoop.util.Progressable; import org.mortbay.util.ajax.JSON; @@ -98,7 +101,7 @@ public class WebHdfsFileSystem extends FileSystem /** Delegation token kind */ public static final Text TOKEN_KIND = new Text("WEBHDFS delegation"); - protected TokenAspect tokenAspect; + private boolean canRefreshDelegationToken; private UserGroupInformation ugi; private URI uri; @@ -127,13 +130,8 @@ protected String getTransportScheme() { return "http"; } - /** - * Initialize tokenAspect. This function is intended to - * be overridden by SWebHdfsFileSystem. - */ - protected synchronized void initializeTokenAspect() { - tokenAspect = new TokenAspect(this, tokenServiceName, - TOKEN_KIND); + protected Text getTokenKind() { + return TOKEN_KIND; } @Override @@ -162,7 +160,6 @@ public synchronized void initialize(URI uri, Configuration conf this.tokenServiceName = isLogicalUri ? HAUtil.buildTokenServiceForLogicalUri(uri) : SecurityUtil.buildTokenService(getCanonicalUri()); - initializeTokenAspect(); if (!isHA) { this.retryPolicy = @@ -195,10 +192,8 @@ public synchronized void initialize(URI uri, Configuration conf } this.workingDir = getHomeDirectory(); - - if (UserGroupInformation.isSecurityEnabled()) { - tokenAspect.initDelegationToken(ugi); - } + this.canRefreshDelegationToken = UserGroupInformation.isSecurityEnabled(); + this.delegationToken = null; } @Override @@ -213,11 +208,46 @@ public static boolean isEnabled(final Configuration conf, final Log log) { return b; } + TokenSelector tokenSelector = + new AbstractDelegationTokenSelector(getTokenKind()){}; + + // the first getAuthParams() for a non-token op will either get the + // internal token from the ugi or lazy fetch one protected synchronized Token getDelegationToken() throws IOException { - tokenAspect.ensureTokenInitialized(); + if (canRefreshDelegationToken && delegationToken == null) { + Token token = tokenSelector.selectToken( + new Text(getCanonicalServiceName()), ugi.getTokens()); + // ugi tokens are usually indicative of a task which can't + // refetch tokens. even if ugi has credentials, don't attempt + // to get another token to match hdfs/rpc behavior + if (token != null) { + LOG.debug("Using UGI token: " + token); + canRefreshDelegationToken = false; + } else { + token = getDelegationToken(null); + if (token != null) { + LOG.debug("Fetched new token: " + token); + } else { // security is disabled + canRefreshDelegationToken = false; + } + } + setDelegationToken(token); + } return delegationToken; } + @VisibleForTesting + synchronized boolean replaceExpiredDelegationToken() throws IOException { + boolean replaced = false; + if (canRefreshDelegationToken) { + Token token = getDelegationToken(null); + LOG.debug("Replaced expired token: " + token); + setDelegationToken(token); + replaced = (token != null); + } + return replaced; + } + @Override protected int getDefaultPort() { return DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT; @@ -288,8 +318,8 @@ private Path makeAbsolute(Path f) { final int code = conn.getResponseCode(); // server is demanding an authentication we don't support if (code == HttpURLConnection.HTTP_UNAUTHORIZED) { - throw new IOException( - new AuthenticationException(conn.getResponseMessage())); + // match hdfs/rpc exception + throw new AccessControlException(conn.getResponseMessage()); } if (code != op.getExpectedHttpResponseCode()) { final Map m; @@ -309,7 +339,15 @@ private Path makeAbsolute(Path f) { return m; } - final RemoteException re = JsonUtil.toRemoteException(m); + IOException re = JsonUtil.toRemoteException(m); + // extract UGI-related exceptions and unwrap InvalidToken + // the NN mangles these exceptions but the DN does not and may need + // to re-fetch a token if either report the token is expired + if (re.getMessage().startsWith("Failed to obtain user group information:")) { + String[] parts = re.getMessage().split(":\\s+", 3); + re = new RemoteException(parts[1], parts[2]); + re = ((RemoteException)re).unwrapRemoteException(InvalidToken.class); + } throw unwrapException? toIOException(re): re; } return null; @@ -369,7 +407,7 @@ Param[] getAuthParameters(final HttpOpParam.Op op) throws IOException { // Skip adding delegation token for token operations because these // operations require authentication. Token token = null; - if (UserGroupInformation.isSecurityEnabled() && !op.getRequireAuth()) { + if (!op.getRequireAuth()) { token = getDelegationToken(); } if (token != null) { @@ -540,11 +578,17 @@ private T runWithRetry() throws IOException { validateResponse(op, conn, false); } return getResponse(conn); - } catch (IOException ioe) { - Throwable cause = ioe.getCause(); - if (cause != null && cause instanceof AuthenticationException) { - throw ioe; // no retries for auth failures + } catch (AccessControlException ace) { + // no retries for auth failures + throw ace; + } catch (InvalidToken it) { + // try to replace the expired token with a new one. the attempt + // to acquire a new token must be outside this operation's retry + // so if it fails after its own retries, this operation fails too. + if (op.getRequireAuth() || !replaceExpiredDelegationToken()) { + throw it; } + } catch (IOException ioe) { shouldRetry(ioe, retry); } } @@ -712,6 +756,17 @@ public void close() throws IOException { }; } } + + class FsPathConnectionRunner extends AbstractFsPathRunner { + FsPathConnectionRunner(Op op, Path fspath, Param... parameters) { + super(op, fspath, parameters); + } + @Override + HttpURLConnection getResponse(final HttpURLConnection conn) + throws IOException { + return conn; + } + } /** * Used by open() which tracks the resolved url itself @@ -1077,16 +1132,41 @@ public FSDataInputStream open(final Path f, final int buffersize ) throws IOException { statistics.incrementReadOps(1); final HttpOpParam.Op op = GetOpParam.Op.OPEN; - final URL url = toUrl(op, f, new BufferSizeParam(buffersize)); + // use a runner so the open can recover from an invalid token + FsPathConnectionRunner runner = + new FsPathConnectionRunner(op, f, new BufferSizeParam(buffersize)); return new FSDataInputStream(new OffsetUrlInputStream( - new OffsetUrlOpener(url), new OffsetUrlOpener(null))); + new UnresolvedUrlOpener(runner), new OffsetUrlOpener(null))); } @Override - public void close() throws IOException { - super.close(); - synchronized (this) { - tokenAspect.removeRenewAction(); + public synchronized void close() throws IOException { + try { + if (canRefreshDelegationToken && delegationToken != null) { + cancelDelegationToken(delegationToken); + } + } catch (IOException ioe) { + LOG.debug("Token cancel failed: "+ioe); + } finally { + super.close(); + } + } + + // use FsPathConnectionRunner to ensure retries for InvalidTokens + class UnresolvedUrlOpener extends ByteRangeInputStream.URLOpener { + private final FsPathConnectionRunner runner; + UnresolvedUrlOpener(FsPathConnectionRunner runner) { + super(null); + this.runner = runner; + } + + @Override + protected HttpURLConnection connect(long offset, boolean resolved) + throws IOException { + assert offset == 0; + HttpURLConnection conn = runner.run(); + setURL(conn.getURL()); + return conn; } } @@ -1139,7 +1219,7 @@ static URL removeOffsetParam(final URL url) throws MalformedURLException { } static class OffsetUrlInputStream extends ByteRangeInputStream { - OffsetUrlInputStream(OffsetUrlOpener o, OffsetUrlOpener r) + OffsetUrlInputStream(UnresolvedUrlOpener o, OffsetUrlOpener r) throws IOException { super(o, r); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier index 59603a96eb1..b6b6171f944 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier @@ -13,3 +13,5 @@ # org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier +org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier$WebHdfsDelegationTokenIdentifier +org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier$SWebHdfsDelegationTokenIdentifier diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java index 2b3685e9d81..44e4961bb9d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java @@ -19,47 +19,63 @@ package org.apache.hadoop.hdfs.web; import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS; +import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.SIMPLE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; import java.net.URI; +import java.security.PrivilegedExceptionAction; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.web.resources.DeleteOpParam; import org.apache.hadoop.hdfs.web.resources.GetOpParam; import org.apache.hadoop.hdfs.web.resources.HttpOpParam; import org.apache.hadoop.hdfs.web.resources.PostOpParam; import org.apache.hadoop.hdfs.web.resources.PutOpParam; +import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.ssl.KeyStoreTestUtil; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.internal.util.reflection.Whitebox; public class TestWebHdfsTokens { private static Configuration conf; + URI uri = null; @BeforeClass public static void setUp() { conf = new Configuration(); SecurityUtil.setAuthenticationMethod(KERBEROS, conf); UserGroupInformation.setConfiguration(conf); + UserGroupInformation.setLoginUser( + UserGroupInformation.createUserForTesting( + "LoginUser", new String[]{"supergroup"})); } private WebHdfsFileSystem spyWebhdfsInSecureSetup() throws IOException { WebHdfsFileSystem fsOrig = new WebHdfsFileSystem(); fsOrig.initialize(URI.create("webhdfs://127.0.0.1:0"), conf); WebHdfsFileSystem fs = spy(fsOrig); - Whitebox.setInternalState(fsOrig.tokenAspect, "fs", fs); return fs; } @@ -89,7 +105,7 @@ public void testNoTokenForGetToken() throws IOException { } @Test(timeout = 5000) - public void testNoTokenForCanclToken() throws IOException { + public void testNoTokenForRenewToken() throws IOException { checkNoTokenForOperation(PutOpParam.Op.RENEWDELEGATIONTOKEN); } @@ -139,4 +155,277 @@ public void testDeleteOpRequireAuth() { assertFalse(op.getRequireAuth()); } } + + @SuppressWarnings("unchecked") // for any(Token.class) + @Test + public void testLazyTokenFetchForWebhdfs() throws Exception { + MiniDFSCluster cluster = null; + WebHdfsFileSystem fs = null; + try { + final Configuration clusterConf = new HdfsConfiguration(conf); + SecurityUtil.setAuthenticationMethod(SIMPLE, clusterConf); + clusterConf.setBoolean(DFSConfigKeys + .DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true); + + // trick the NN into thinking security is enabled w/o it trying + // to login from a keytab + UserGroupInformation.setConfiguration(clusterConf); + cluster = new MiniDFSCluster.Builder(clusterConf).numDataNodes(1).build(); + cluster.waitActive(); + SecurityUtil.setAuthenticationMethod(KERBEROS, clusterConf); + UserGroupInformation.setConfiguration(clusterConf); + + uri = DFSUtil.createUri( + "webhdfs", cluster.getNameNode().getHttpAddress()); + validateLazyTokenFetch(clusterConf); + } finally { + IOUtils.cleanup(null, fs); + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @SuppressWarnings("unchecked") // for any(Token.class) + @Test + public void testLazyTokenFetchForSWebhdfs() throws Exception { + MiniDFSCluster cluster = null; + SWebHdfsFileSystem fs = null; + try { + final Configuration clusterConf = new HdfsConfiguration(conf); + SecurityUtil.setAuthenticationMethod(SIMPLE, clusterConf); + clusterConf.setBoolean(DFSConfigKeys + .DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true); + String BASEDIR = System.getProperty("test.build.dir", + "target/test-dir") + "/" + TestWebHdfsTokens.class.getSimpleName(); + String keystoresDir; + String sslConfDir; + + clusterConf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true); + clusterConf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name()); + clusterConf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0"); + clusterConf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0"); + + File base = new File(BASEDIR); + FileUtil.fullyDelete(base); + base.mkdirs(); + keystoresDir = new File(BASEDIR).getAbsolutePath(); + sslConfDir = KeyStoreTestUtil.getClasspathDir(TestWebHdfsTokens.class); + KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, clusterConf, false); + + // trick the NN into thinking security is enabled w/o it trying + // to login from a keytab + UserGroupInformation.setConfiguration(clusterConf); + cluster = new MiniDFSCluster.Builder(clusterConf).numDataNodes(1).build(); + cluster.waitActive(); + InetSocketAddress addr = cluster.getNameNode().getHttpsAddress(); + String nnAddr = NetUtils.getHostPortString(addr); + clusterConf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, nnAddr); + SecurityUtil.setAuthenticationMethod(KERBEROS, clusterConf); + UserGroupInformation.setConfiguration(clusterConf); + + uri = DFSUtil.createUri( + "swebhdfs", cluster.getNameNode().getHttpsAddress()); + validateLazyTokenFetch(clusterConf); + } finally { + IOUtils.cleanup(null, fs); + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @SuppressWarnings("unchecked") + private void validateLazyTokenFetch(final Configuration clusterConf) throws Exception{ + final String testUser = "DummyUser"; + UserGroupInformation ugi = UserGroupInformation.createUserForTesting( + testUser, new String[]{"supergroup"}); + WebHdfsFileSystem fs = ugi.doAs(new PrivilegedExceptionAction() { + @Override + public WebHdfsFileSystem run() throws IOException { + return spy((WebHdfsFileSystem) FileSystem.newInstance(uri, clusterConf)); + } + }); + // verify token ops don't get a token + Assert.assertNull(fs.getRenewToken()); + Token token = fs.getDelegationToken(null); + fs.renewDelegationToken(token); + fs.cancelDelegationToken(token); + verify(fs, never()).getDelegationToken(); + verify(fs, never()).replaceExpiredDelegationToken(); + verify(fs, never()).setDelegationToken(any(Token.class)); + Assert.assertNull(fs.getRenewToken()); + reset(fs); + + // verify first non-token op gets a token + final Path p = new Path("/f"); + fs.create(p, (short)1).close(); + verify(fs, times(1)).getDelegationToken(); + verify(fs, never()).replaceExpiredDelegationToken(); + verify(fs, times(1)).getDelegationToken(anyString()); + verify(fs, times(1)).setDelegationToken(any(Token.class)); + token = fs.getRenewToken(); + Assert.assertNotNull(token); + Assert.assertEquals(testUser, getTokenOwner(token)); + Assert.assertEquals(fs.getTokenKind(), token.getKind()); + reset(fs); + + // verify prior token is reused + fs.getFileStatus(p); + verify(fs, times(1)).getDelegationToken(); + verify(fs, never()).replaceExpiredDelegationToken(); + verify(fs, never()).getDelegationToken(anyString()); + verify(fs, never()).setDelegationToken(any(Token.class)); + Token token2 = fs.getRenewToken(); + Assert.assertNotNull(token2); + Assert.assertEquals(fs.getTokenKind(), token.getKind()); + Assert.assertSame(token, token2); + reset(fs); + + // verify renew of expired token fails w/o getting a new token + token = fs.getRenewToken(); + fs.cancelDelegationToken(token); + try { + fs.renewDelegationToken(token); + Assert.fail("should have failed"); + } catch (InvalidToken it) { + } catch (Exception ex) { + Assert.fail("wrong exception:"+ex); + } + verify(fs, never()).getDelegationToken(); + verify(fs, never()).replaceExpiredDelegationToken(); + verify(fs, never()).getDelegationToken(anyString()); + verify(fs, never()).setDelegationToken(any(Token.class)); + token2 = fs.getRenewToken(); + Assert.assertNotNull(token2); + Assert.assertEquals(fs.getTokenKind(), token.getKind()); + Assert.assertSame(token, token2); + reset(fs); + + // verify cancel of expired token fails w/o getting a new token + try { + fs.cancelDelegationToken(token); + Assert.fail("should have failed"); + } catch (InvalidToken it) { + } catch (Exception ex) { + Assert.fail("wrong exception:"+ex); + } + verify(fs, never()).getDelegationToken(); + verify(fs, never()).replaceExpiredDelegationToken(); + verify(fs, never()).getDelegationToken(anyString()); + verify(fs, never()).setDelegationToken(any(Token.class)); + token2 = fs.getRenewToken(); + Assert.assertNotNull(token2); + Assert.assertEquals(fs.getTokenKind(), token.getKind()); + Assert.assertSame(token, token2); + reset(fs); + + // verify an expired token is replaced with a new token + fs.open(p).close(); + verify(fs, times(2)).getDelegationToken(); // first bad, then good + verify(fs, times(1)).replaceExpiredDelegationToken(); + verify(fs, times(1)).getDelegationToken(null); + verify(fs, times(1)).setDelegationToken(any(Token.class)); + token2 = fs.getRenewToken(); + Assert.assertNotNull(token2); + Assert.assertNotSame(token, token2); + Assert.assertEquals(fs.getTokenKind(), token.getKind()); + Assert.assertEquals(testUser, getTokenOwner(token2)); + reset(fs); + + // verify with open because it's a little different in how it + // opens connections + fs.cancelDelegationToken(fs.getRenewToken()); + InputStream is = fs.open(p); + is.read(); + is.close(); + verify(fs, times(2)).getDelegationToken(); // first bad, then good + verify(fs, times(1)).replaceExpiredDelegationToken(); + verify(fs, times(1)).getDelegationToken(null); + verify(fs, times(1)).setDelegationToken(any(Token.class)); + token2 = fs.getRenewToken(); + Assert.assertNotNull(token2); + Assert.assertNotSame(token, token2); + Assert.assertEquals(fs.getTokenKind(), token.getKind()); + Assert.assertEquals(testUser, getTokenOwner(token2)); + reset(fs); + + // verify fs close cancels the token + fs.close(); + verify(fs, never()).getDelegationToken(); + verify(fs, never()).replaceExpiredDelegationToken(); + verify(fs, never()).getDelegationToken(anyString()); + verify(fs, never()).setDelegationToken(any(Token.class)); + verify(fs, times(1)).cancelDelegationToken(eq(token2)); + + // add a token to ugi for a new fs, verify it uses that token + token = fs.getDelegationToken(null); + ugi.addToken(token); + fs = ugi.doAs(new PrivilegedExceptionAction() { + @Override + public WebHdfsFileSystem run() throws IOException { + return spy((WebHdfsFileSystem) FileSystem.newInstance(uri, clusterConf)); + } + }); + Assert.assertNull(fs.getRenewToken()); + fs.getFileStatus(new Path("/")); + verify(fs, times(1)).getDelegationToken(); + verify(fs, never()).replaceExpiredDelegationToken(); + verify(fs, never()).getDelegationToken(anyString()); + verify(fs, times(1)).setDelegationToken(eq(token)); + token2 = fs.getRenewToken(); + Assert.assertNotNull(token2); + Assert.assertEquals(fs.getTokenKind(), token.getKind()); + Assert.assertSame(token, token2); + reset(fs); + + // verify it reuses the prior ugi token + fs.getFileStatus(new Path("/")); + verify(fs, times(1)).getDelegationToken(); + verify(fs, never()).replaceExpiredDelegationToken(); + verify(fs, never()).getDelegationToken(anyString()); + verify(fs, never()).setDelegationToken(any(Token.class)); + token2 = fs.getRenewToken(); + Assert.assertNotNull(token2); + Assert.assertEquals(fs.getTokenKind(), token.getKind()); + Assert.assertSame(token, token2); + reset(fs); + + // verify an expired ugi token is NOT replaced with a new token + fs.cancelDelegationToken(token); + for (int i=0; i<2; i++) { + try { + fs.getFileStatus(new Path("/")); + Assert.fail("didn't fail"); + } catch (InvalidToken it) { + } catch (Exception ex) { + Assert.fail("wrong exception:"+ex); + } + verify(fs, times(1)).getDelegationToken(); + verify(fs, times(1)).replaceExpiredDelegationToken(); + verify(fs, never()).getDelegationToken(anyString()); + verify(fs, never()).setDelegationToken(any(Token.class)); + token2 = fs.getRenewToken(); + Assert.assertNotNull(token2); + Assert.assertEquals(fs.getTokenKind(), token.getKind()); + Assert.assertSame(token, token2); + reset(fs); + } + + // verify fs close does NOT cancel the ugi token + fs.close(); + verify(fs, never()).getDelegationToken(); + verify(fs, never()).replaceExpiredDelegationToken(); + verify(fs, never()).getDelegationToken(anyString()); + verify(fs, never()).setDelegationToken(any(Token.class)); + verify(fs, never()).cancelDelegationToken(any(Token.class)); + } + + private String getTokenOwner(Token token) throws IOException { + // webhdfs doesn't register properly with the class loader + @SuppressWarnings({ "rawtypes", "unchecked" }) + Token clone = new Token(token); + clone.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND); + return clone.decodeIdentifier().getUser().getUserName(); + } }