diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 044ff44ddf2..5589f10d21c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -477,6 +477,9 @@ Release 2.3.0 - UNRELEASED HDFS-5495. Remove further JUnit3 usages from HDFS. (Jarek Jarcec Cecho via wang) + HDFS-5440. Extract the logic of handling delegation tokens in HftpFileSystem + to the TokenAspect class. (Haohui Mai via jing9) + OPTIMIZATIONS HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java index b3336a5202d..696f8326357 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java @@ -31,7 +31,6 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Collection; import java.util.TimeZone; import org.apache.hadoop.classification.InterfaceAudience; @@ -50,10 +49,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher; -import org.apache.hadoop.hdfs.web.ByteRangeInputStream.URLOpener; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; @@ -62,8 +59,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.token.TokenRenewer; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ServletUtil; import org.xml.sax.Attributes; @@ -83,7 +78,9 @@ @InterfaceAudience.Private @InterfaceStability.Evolving public class HftpFileSystem extends FileSystem - implements DelegationTokenRenewer.Renewable { + implements DelegationTokenRenewer.Renewable, TokenAspect.TokenManagementDelegator { + public static final String SCHEME = "hftp"; + static { HttpURLConnection.setFollowRedirects(true); } @@ -100,19 +97,13 @@ public class HftpFileSystem extends FileSystem public static final String HFTP_TIMEZONE = "UTC"; public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ"; + private TokenAspect tokenAspect = new TokenAspect(this, TOKEN_KIND); private Token delegationToken; private Token renewToken; - private static final HftpDelegationTokenSelector hftpTokenSelector = - new HftpDelegationTokenSelector(); - private DelegationTokenRenewer dtRenewer = null; - - private synchronized void addRenewAction(final HftpFileSystem hftpFs) { - if (dtRenewer == null) { - dtRenewer = DelegationTokenRenewer.getInstance(); - } - - dtRenewer.addRenewAction(hftpFs); + @Override + public URI getCanonicalUri() { + return super.getCanonicalUri(); } public static final SimpleDateFormat getDateFormat() { @@ -177,7 +168,7 @@ protected URI canonicalizeUri(URI uri) { */ @Override public String getScheme() { - return "hftp"; + return SCHEME; } @Override @@ -195,39 +186,10 @@ public void initialize(final URI name, final Configuration conf) } if (UserGroupInformation.isSecurityEnabled()) { - initDelegationToken(); + tokenAspect.initDelegationToken(ugi); } } - protected void initDelegationToken() throws IOException { - // look for hftp token, then try hdfs - Token token = selectDelegationToken(ugi); - - // if we don't already have a token, go get one over https - boolean createdToken = false; - if (token == null) { - token = getDelegationToken(null); - createdToken = (token != null); - } - - // we already had a token or getDelegationToken() didn't fail. - if (token != null) { - setDelegationToken(token); - if (createdToken) { - addRenewAction(this); - LOG.debug("Created new DT for " + token.getService()); - } else { - LOG.debug("Found existing DT for " + token.getService()); - } - } - } - - protected Token selectDelegationToken( - UserGroupInformation ugi) { - return hftpTokenSelector.selectToken(nnUri, ugi.getTokens(), getConf()); - } - - @Override public Token getRenewToken() { return renewToken; @@ -242,16 +204,19 @@ protected String getUnderlyingProtocol() { @Override public synchronized void setDelegationToken(Token token) { + /** + * XXX The kind of the token has been changed by DelegationTokenFetcher. We + * use the token for renewal, since the reflection utilities needs the value + * of the kind field to correctly renew the token. + * + * For other operations, however, the client has to send a + * HDFS_DELEGATION_KIND token over the wire so that it can talk to Hadoop + * 0.20.3 clusters. Later releases fix this problem. See HDFS-5440 for more + * details. + */ renewToken = token; - // emulate the 203 usage of the tokens - // by setting the kind and service as if they were hdfs tokens delegationToken = new Token(token); - // NOTE: the remote nn must be configured to use hdfs delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND); - // no need to change service because we aren't exactly sure what it - // should be. we can guess, but it might be wrong if the local conf - // value is incorrect. the service is a client side field, so the remote - // end does not care about the value } @Override @@ -350,6 +315,7 @@ protected String addDelegationTokenParam(String query) throws IOException { String tokenString = null; if (UserGroupInformation.isSecurityEnabled()) { synchronized (this) { + tokenAspect.ensureTokenInitialized(); if (delegationToken != null) { tokenString = delegationToken.encodeToUrlString(); return (query + JspHelper.getDelegationTokenUrlParam(tokenString)); @@ -419,9 +385,7 @@ public FSDataInputStream open(Path f, int buffersize) throws IOException { @Override public void close() throws IOException { super.close(); - if (dtRenewer != null) { - dtRenewer.removeRenewAction(this); // blocks - } + tokenAspect.removeRenewAction(); } /** Class to parse and store a listing reply from the server. */ @@ -696,67 +660,26 @@ public ContentSummary getContentSummary(Path f) throws IOException { return cs != null? cs: super.getContentSummary(f); } - @InterfaceAudience.Private - public static class TokenManager extends TokenRenewer { - - @Override - public boolean handleKind(Text kind) { - return kind.equals(TOKEN_KIND); - } - - @Override - public boolean isManaged(Token token) throws IOException { - return true; - } - - protected String getUnderlyingProtocol() { - return "http"; - } - - @SuppressWarnings("unchecked") - @Override - public long renew(Token token, - Configuration conf) throws IOException { - // update the kerberos credentials, if they are coming from a keytab - UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); - InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token); - return - DelegationTokenFetcher.renewDelegationToken - (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(), - (Token) token); - } - - @SuppressWarnings("unchecked") - @Override - public void cancel(Token token, - Configuration conf) throws IOException { - // update the kerberos credentials, if they are coming from a keytab - UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); - InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token); - DelegationTokenFetcher.cancelDelegationToken - (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(), - (Token) token); - } + @SuppressWarnings("unchecked") + @Override + public long renewDelegationToken(Token token) throws IOException { + // update the kerberos credentials, if they are coming from a keytab + UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); + InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token); + return + DelegationTokenFetcher.renewDelegationToken + (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(), + (Token) token); } - private static class HftpDelegationTokenSelector - extends AbstractDelegationTokenSelector { - private static final DelegationTokenSelector hdfsTokenSelector = - new DelegationTokenSelector(); - - public HftpDelegationTokenSelector() { - super(TOKEN_KIND); - } - - Token selectToken(URI nnUri, - Collection> tokens, Configuration conf) { - Token token = - selectToken(SecurityUtil.buildTokenService(nnUri), tokens); - if (token == null) { - // try to get a HDFS token - token = hdfsTokenSelector.selectToken(nnUri, tokens, conf); - } - return token; - } + @SuppressWarnings("unchecked") + @Override + public void cancelDelegationToken(Token token) throws IOException { + // update the kerberos credentials, if they are coming from a keytab + UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); + InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token); + DelegationTokenFetcher.cancelDelegationToken + (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(), + (Token) token); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java new file mode 100644 index 00000000000..5464ba0c753 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.DelegationTokenRenewer; +import org.apache.hadoop.fs.DelegationTokenRenewer.Renewable; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenRenewer; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This class implements the aspects that relate to delegation tokens for all + * HTTP-based file system. + */ +final class TokenAspect { + @InterfaceAudience.Private + public static class TokenManager extends TokenRenewer { + + @Override + public void cancel(Token token, Configuration conf) throws IOException { + getInstance(token, conf).cancelDelegationToken(token); + } + + @Override + public boolean handleKind(Text kind) { + return kind.equals(HftpFileSystem.TOKEN_KIND) + || kind.equals(WebHdfsFileSystem.TOKEN_KIND); + } + + @Override + public boolean isManaged(Token token) throws IOException { + return true; + } + + @Override + public long renew(Token token, Configuration conf) throws IOException { + return getInstance(token, conf).renewDelegationToken(token); + } + + private TokenManagementDelegator getInstance(Token token, + Configuration conf) throws IOException { + final InetSocketAddress address = SecurityUtil.getTokenServiceAddr(token); + Text kind = token.getKind(); + final URI uri; + if (kind.equals(HftpFileSystem.TOKEN_KIND)) { + uri = DFSUtil.createUri(HftpFileSystem.SCHEME, address); + } else if (kind.equals(WebHdfsFileSystem.TOKEN_KIND)) { + uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, address); + } else { + throw new IllegalArgumentException("Unsupported scheme"); + } + return (TokenManagementDelegator) FileSystem.get(uri, conf); + } + } + + static class DTSelecorByKind extends + AbstractDelegationTokenSelector { + private static final DelegationTokenSelector selector = new DelegationTokenSelector(); + + public DTSelecorByKind(final Text kind) { + super(kind); + } + + Token selectToken(URI nnUri, + Collection> tokens, Configuration conf) { + Token token = selectToken( + SecurityUtil.buildTokenService(nnUri), tokens); + if (token == null) { + token = selector.selectToken(nnUri, tokens, conf); + } + return token; + } + } + + /** + * Callbacks for token management + */ + interface TokenManagementDelegator { + void cancelDelegationToken(final Token token) throws IOException; + + URI getCanonicalUri(); + + long renewDelegationToken(final Token token) throws IOException; + } + + private DelegationTokenRenewer.RenewAction action; + private DelegationTokenRenewer dtRenewer = null; + private final DTSelecorByKind dtSelector; + private final T fs; + private boolean hasInitedToken; + private final Log LOG; + + TokenAspect(T fs, final Text kind) { + this.LOG = LogFactory.getLog(fs.getClass()); + this.fs = fs; + this.dtSelector = new DTSelecorByKind(kind); + } + + synchronized void ensureTokenInitialized() throws IOException { + // we haven't inited yet, or we used to have a token but it expired + if (!hasInitedToken || (action != null && !action.isValid())) { + //since we don't already have a token, go get one + Token token = fs.getDelegationToken(null); + // security might be disabled + if (token != null) { + fs.setDelegationToken(token); + addRenewAction(fs); + LOG.debug("Created new DT for " + token.getService()); + } + hasInitedToken = true; + } + } + + synchronized void initDelegationToken(UserGroupInformation ugi) { + Token token = selectDelegationToken(ugi); + if (token != null) { + LOG.debug("Found existing DT for " + token.getService()); + fs.setDelegationToken(token); + hasInitedToken = true; + } + } + + synchronized void removeRenewAction() throws IOException { + if (dtRenewer != null) { + dtRenewer.removeRenewAction(fs); + } + } + + @VisibleForTesting + Token selectDelegationToken( + UserGroupInformation ugi) { + return dtSelector.selectToken( + ((TokenManagementDelegator)fs).getCanonicalUri(), ugi.getTokens(), + fs.getConf()); + } + + private synchronized void addRenewAction(final T webhdfs) { + if (dtRenewer == null) { + dtRenewer = DelegationTokenRenewer.getInstance(); + } + + action = dtRenewer.addRenewAction(webhdfs); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 9b623c056f1..a7c5c61558e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -30,7 +30,6 @@ import java.net.URISyntaxException; import java.net.URL; import java.security.PrivilegedExceptionAction; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.StringTokenizer; @@ -56,8 +55,8 @@ import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; +import org.apache.hadoop.hdfs.web.TokenAspect.DTSelecorByKind; import org.apache.hadoop.hdfs.web.resources.AccessTimeParam; import org.apache.hadoop.hdfs.web.resources.BlockSizeParam; import org.apache.hadoop.hdfs.web.resources.BufferSizeParam; @@ -96,8 +95,6 @@ import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.token.TokenRenewer; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector; import org.apache.hadoop.util.Progressable; import org.mortbay.util.ajax.JSON; @@ -107,7 +104,7 @@ /** A FileSystem for HDFS over the web. */ public class WebHdfsFileSystem extends FileSystem - implements DelegationTokenRenewer.Renewable { + implements DelegationTokenRenewer.Renewable, TokenAspect.TokenManagementDelegator { public static final Log LOG = LogFactory.getLog(WebHdfsFileSystem.class); /** File System URI: {SCHEME}://namenode:port/path/to/file */ public static final String SCHEME = "webhdfs"; @@ -122,13 +119,18 @@ public class WebHdfsFileSystem extends FileSystem /** Delegation token kind */ public static final Text TOKEN_KIND = new Text("WEBHDFS delegation"); /** Token selector */ - public static final WebHdfsDelegationTokenSelector DT_SELECTOR - = new WebHdfsDelegationTokenSelector(); + public static final DTSelecorByKind DT_SELECTOR + = new DTSelecorByKind(TOKEN_KIND); private DelegationTokenRenewer dtRenewer = null; @VisibleForTesting DelegationTokenRenewer.RenewAction action; + @Override + public URI getCanonicalUri() { + return super.getCanonicalUri(); + } + @VisibleForTesting protected synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) { if (dtRenewer == null) { @@ -142,7 +144,6 @@ protected synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) { public static boolean isEnabled(final Configuration conf, final Log log) { final boolean b = conf.getBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT); - log.info(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY + " = " + b); return b; } @@ -986,7 +987,8 @@ public void setDelegationToken( } } - private synchronized long renewDelegationToken(final Token token + @Override + public synchronized long renewDelegationToken(final Token token ) throws IOException { final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN; TokenArgumentParam dtargParam = new TokenArgumentParam( @@ -995,7 +997,8 @@ private synchronized long renewDelegationToken(final Token token return (Long) m.get("long"); } - private synchronized void cancelDelegationToken(final Token token + @Override + public synchronized void cancelDelegationToken(final Token token ) throws IOException { final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN; TokenArgumentParam dtargParam = new TokenArgumentParam( @@ -1041,57 +1044,4 @@ public MD5MD5CRC32FileChecksum getFileChecksum(final Path p final Map m = run(op, p); return JsonUtil.toMD5MD5CRC32FileChecksum(m); } - - /** Delegation token renewer. */ - public static class DtRenewer extends TokenRenewer { - @Override - public boolean handleKind(Text kind) { - return kind.equals(TOKEN_KIND); - } - - @Override - public boolean isManaged(Token token) throws IOException { - return true; - } - - private static WebHdfsFileSystem getWebHdfs( - final Token token, final Configuration conf) throws IOException { - - final InetSocketAddress nnAddr = SecurityUtil.getTokenServiceAddr(token); - final URI uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, nnAddr); - return (WebHdfsFileSystem)FileSystem.get(uri, conf); - } - - @Override - public long renew(final Token token, final Configuration conf - ) throws IOException, InterruptedException { - return getWebHdfs(token, conf).renewDelegationToken(token); - } - - @Override - public void cancel(final Token token, final Configuration conf - ) throws IOException, InterruptedException { - getWebHdfs(token, conf).cancelDelegationToken(token); - } - } - - private static class WebHdfsDelegationTokenSelector - extends AbstractDelegationTokenSelector { - private static final DelegationTokenSelector hdfsTokenSelector = - new DelegationTokenSelector(); - - public WebHdfsDelegationTokenSelector() { - super(TOKEN_KIND); - } - - Token selectToken(URI nnUri, - Collection> tokens, Configuration conf) { - Token token = - selectToken(SecurityUtil.buildTokenService(nnUri), tokens); - if (token == null) { - token = hdfsTokenSelector.selectToken(nnUri, tokens, conf); - } - return token; - } - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer index dee43fcd08c..7efd684d696 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer @@ -13,5 +13,4 @@ # org.apache.hadoop.hdfs.DFSClient$Renewer org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier$Renewer -org.apache.hadoop.hdfs.web.HftpFileSystem$TokenManager -org.apache.hadoop.hdfs.web.WebHdfsFileSystem$DtRenewer +org.apache.hadoop.hdfs.web.TokenAspect$TokenManager diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java index 9e0a6c6063b..acb1faff55c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java @@ -22,7 +22,6 @@ import static org.junit.Assert.*; import java.io.IOException; -import java.lang.reflect.Field; import java.net.ServerSocket; import java.net.Socket; import java.net.URI; @@ -40,6 +39,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; public class TestHftpDelegationToken { @@ -71,9 +71,8 @@ public FileSystem run() throws Exception { }); assertSame("wrong kind of file system", HftpFileSystem.class, fs.getClass()); - Field renewToken = HftpFileSystem.class.getDeclaredField("renewToken"); - renewToken.setAccessible(true); - assertSame("wrong token", token, renewToken.get(fs)); + assertSame("wrong token", token, + Whitebox.getInternalState(fs, "renewToken")); } @Test @@ -81,7 +80,7 @@ public void testSelectHftpDelegationToken() throws Exception { SecurityUtilTestHelper.setTokenServiceUseIp(true); Configuration conf = new Configuration(); - conf.setClass("fs.hftp.impl", MyHftpFileSystem.class, FileSystem.class); + conf.setClass("fs.hftp.impl", HftpFileSystem.class, FileSystem.class); int httpPort = 80; int httpsPort = 443; @@ -90,21 +89,21 @@ public void testSelectHftpDelegationToken() throws Exception { // test with implicit default port URI fsUri = URI.create("hftp://localhost"); - MyHftpFileSystem fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf); + HftpFileSystem fs = (HftpFileSystem) FileSystem.newInstance(fsUri, conf); assertEquals(httpPort, fs.getCanonicalUri().getPort()); checkTokenSelection(fs, httpPort, conf); // test with explicit default port // Make sure it uses the port from the hftp URI. fsUri = URI.create("hftp://localhost:"+httpPort); - fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf); + fs = (HftpFileSystem) FileSystem.newInstance(fsUri, conf); assertEquals(httpPort, fs.getCanonicalUri().getPort()); checkTokenSelection(fs, httpPort, conf); // test with non-default port // Make sure it uses the port from the hftp URI. fsUri = URI.create("hftp://localhost:"+(httpPort+1)); - fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf); + fs = (HftpFileSystem) FileSystem.newInstance(fsUri, conf); assertEquals(httpPort+1, fs.getCanonicalUri().getPort()); checkTokenSelection(fs, httpPort + 1, conf); @@ -116,7 +115,7 @@ public void testSelectHsftpDelegationToken() throws Exception { SecurityUtilTestHelper.setTokenServiceUseIp(true); Configuration conf = new Configuration(); - conf.setClass("fs.hsftp.impl", MyHsftpFileSystem.class, FileSystem.class); + conf.setClass("fs.hsftp.impl", HsftpFileSystem.class, FileSystem.class); int httpPort = 80; int httpsPort = 443; @@ -125,19 +124,19 @@ public void testSelectHsftpDelegationToken() throws Exception { // test with implicit default port URI fsUri = URI.create("hsftp://localhost"); - MyHsftpFileSystem fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf); + HsftpFileSystem fs = (HsftpFileSystem) FileSystem.newInstance(fsUri, conf); assertEquals(httpsPort, fs.getCanonicalUri().getPort()); checkTokenSelection(fs, httpsPort, conf); // test with explicit default port fsUri = URI.create("hsftp://localhost:"+httpsPort); - fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf); + fs = (HsftpFileSystem) FileSystem.newInstance(fsUri, conf); assertEquals(httpsPort, fs.getCanonicalUri().getPort()); checkTokenSelection(fs, httpsPort, conf); // test with non-default port fsUri = URI.create("hsftp://localhost:"+(httpsPort+1)); - fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf); + fs = (HsftpFileSystem) FileSystem.newInstance(fsUri, conf); assertEquals(httpsPort+1, fs.getCanonicalUri().getPort()); checkTokenSelection(fs, httpsPort+1, conf); @@ -197,6 +196,9 @@ private void checkTokenSelection(HftpFileSystem fs, UserGroupInformation ugi = UserGroupInformation.createUserForTesting(fs.getUri().getAuthority(), new String[]{}); + @SuppressWarnings("unchecked") + TokenAspect aspect = (TokenAspect) Whitebox.getInternalState(fs, "tokenAspect"); + // use ip-based tokens SecurityUtilTestHelper.setTokenServiceUseIp(true); @@ -208,7 +210,7 @@ private void checkTokenSelection(HftpFileSystem fs, ugi.addToken(hdfsToken); // test fallback to hdfs token - Token token = fs.selectDelegationToken(ugi); + Token token = aspect.selectDelegationToken(ugi); assertNotNull(token); assertEquals(hdfsToken, token); @@ -217,13 +219,13 @@ private void checkTokenSelection(HftpFileSystem fs, new byte[0], new byte[0], HftpFileSystem.TOKEN_KIND, new Text("127.0.0.1:"+port)); ugi.addToken(hftpToken); - token = fs.selectDelegationToken(ugi); + token = aspect.selectDelegationToken(ugi); assertNotNull(token); assertEquals(hftpToken, token); // switch to using host-based tokens, no token should match SecurityUtilTestHelper.setTokenServiceUseIp(false); - token = fs.selectDelegationToken(ugi); + token = aspect.selectDelegationToken(ugi); assertNull(token); // test fallback to hdfs token @@ -232,7 +234,7 @@ private void checkTokenSelection(HftpFileSystem fs, DelegationTokenIdentifier.HDFS_DELEGATION_KIND, new Text("localhost:8020")); ugi.addToken(hdfsToken); - token = fs.selectDelegationToken(ugi); + token = aspect.selectDelegationToken(ugi); assertNotNull(token); assertEquals(hdfsToken, token); @@ -241,36 +243,8 @@ private void checkTokenSelection(HftpFileSystem fs, new byte[0], new byte[0], HftpFileSystem.TOKEN_KIND, new Text("localhost:"+port)); ugi.addToken(hftpToken); - token = fs.selectDelegationToken(ugi); + token = aspect.selectDelegationToken(ugi); assertNotNull(token); assertEquals(hftpToken, token); } - - static class MyHftpFileSystem extends HftpFileSystem { - @Override - public URI getCanonicalUri() { - return super.getCanonicalUri(); - } - @Override - public int getDefaultPort() { - return super.getDefaultPort(); - } - // don't automatically get a token - @Override - protected void initDelegationToken() throws IOException {} - } - - static class MyHsftpFileSystem extends HsftpFileSystem { - @Override - public URI getCanonicalUri() { - return super.getCanonicalUri(); - } - @Override - public int getDefaultPort() { - return super.getDefaultPort(); - } - // don't automatically get a token - @Override - protected void initDelegationToken() throws IOException {} - } }