From fe5624b85d71720ae9da90a01cad9a3d1ea41160 Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Tue, 24 Nov 2015 12:41:08 -0800 Subject: [PATCH] HDFS-8855. Webhdfs client leaks active NameNode connections. Contributed by Xiaobing Zhou. --- .../apache/hadoop/security/token/Token.java | 10 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + .../datanode/web/DatanodeHttpServer.java | 4 +- .../web/webhdfs/DataNodeUGIProvider.java | 108 ++++++-- .../src/main/resources/hdfs-default.xml | 8 + .../web/webhdfs/TestDataNodeUGIProvider.java | 235 ++++++++++++++++++ 7 files changed, 351 insertions(+), 21 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/TestDataNodeUGIProvider.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java index 2420155424e..f8b7355b245 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java @@ -19,6 +19,8 @@ package org.apache.hadoop.security.token; import com.google.common.collect.Maps; +import com.google.common.primitives.Bytes; + import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,6 +34,7 @@ import java.io.*; import java.util.Arrays; import java.util.Map; import java.util.ServiceLoader; +import java.util.UUID; /** * The client-side form of the token. @@ -337,7 +340,12 @@ public class Token implements Writable { identifierToString(buffer); return buffer.toString(); } - + + public String buildCacheKey() { + return UUID.nameUUIDFromBytes( + Bytes.concat(kind.getBytes(), identifier, password)).toString(); + } + private static ServiceLoader renewers = ServiceLoader.load(TokenRenewer.class); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 95dfbcf0a72..7d9df2e678a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -2373,6 +2373,9 @@ Release 2.8.0 - UNRELEASED HDFS-6101. TestReplaceDatanodeOnFailure fails occasionally. (Wei-Chiu Chuang via cnauroth) + HDFS-8855. Webhdfs client leaks active NameNode connections. + (Xiaobing Zhou via xyao) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 54e0d10659d..6986896b1d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -70,6 +70,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_WEBHDFS_NETTY_HIGH_WATERMARK = "dfs.webhdfs.netty.high.watermark"; public static final int DFS_WEBHDFS_NETTY_HIGH_WATERMARK_DEFAULT = 65535; + public static final String DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_KEY = + "dfs.webhdfs.ugi.expire.after.access"; + public static final int DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_DEFAULT = + 10*60*1000; //10 minutes // HA related configuration public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java index 441d5200579..fc24fae2e6d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.web.webhdfs.DataNodeUGIProvider; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.net.NetUtils; @@ -74,7 +75,6 @@ public class DatanodeHttpServer implements Closeable { private final Configuration confForCreate; private InetSocketAddress httpAddress; private InetSocketAddress httpsAddress; - static final Log LOG = LogFactory.getLog(DatanodeHttpServer.class); public DatanodeHttpServer(final Configuration conf, @@ -99,7 +99,7 @@ public class DatanodeHttpServer implements Closeable { this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf); this.infoServer.addServlet(null, "/blockScannerReport", BlockScanner.Servlet.class); - + DataNodeUGIProvider.init(conf); this.infoServer.start(); final InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java index ea1c29f5fb0..233ba69e4be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java @@ -13,50 +13,103 @@ */ package org.apache.hadoop.hdfs.server.datanode.web.webhdfs; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.JspHelper; +import org.apache.hadoop.ipc.Client; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; /** * Create UGI from the request for the WebHDFS requests for the DNs. Note that * the DN does not authenticate the UGI -- the NN will authenticate them in * subsequent operations. */ -class DataNodeUGIProvider { +public class DataNodeUGIProvider { private final ParameterParser params; + @VisibleForTesting + static Cache ugiCache; + public static final Log LOG = LogFactory.getLog(Client.class); DataNodeUGIProvider(ParameterParser params) { this.params = params; } + public static synchronized void init(Configuration conf) { + if (ugiCache == null) { + ugiCache = CacheBuilder + .newBuilder() + .expireAfterAccess( + conf.getInt( + DFSConfigKeys.DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_KEY, + DFSConfigKeys.DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_DEFAULT), + TimeUnit.MILLISECONDS).build(); + } + } + UserGroupInformation ugi() throws IOException { - if (UserGroupInformation.isSecurityEnabled()) { - return tokenUGI(); + UserGroupInformation ugi; + + try { + if (UserGroupInformation.isSecurityEnabled()) { + final Token token = params.delegationToken(); + + ugi = ugiCache.get(buildTokenCacheKey(token), + new Callable() { + @Override + public UserGroupInformation call() throws Exception { + return tokenUGI(token); + } + }); + } else { + final String usernameFromQuery = params.userName(); + final String doAsUserFromQuery = params.doAsUser(); + final String remoteUser = usernameFromQuery == null ? JspHelper + .getDefaultWebUserName(params.conf()) // not specified in request + : usernameFromQuery; + + ugi = ugiCache.get( + buildNonTokenCacheKey(doAsUserFromQuery, remoteUser), + new Callable() { + @Override + public UserGroupInformation call() throws Exception { + return nonTokenUGI(usernameFromQuery, doAsUserFromQuery, + remoteUser); + } + }); + } + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } else { + throw new IOException(cause); + } } - final String usernameFromQuery = params.userName(); - final String doAsUserFromQuery = params.doAsUser(); - final String remoteUser = usernameFromQuery == null - ? JspHelper.getDefaultWebUserName(params.conf()) // not specified in - // request - : usernameFromQuery; - - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser); - JspHelper.checkUsername(ugi.getShortUserName(), usernameFromQuery); - if (doAsUserFromQuery != null) { - // create and attempt to authorize a proxy user - ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi); - } return ugi; } - private UserGroupInformation tokenUGI() throws IOException { - Token token = params.delegationToken(); + private String buildTokenCacheKey(Token token) { + return token.buildCacheKey(); + } + + private UserGroupInformation tokenUGI(Token token) + throws IOException { ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); DataInputStream in = new DataInputStream(buf); @@ -67,4 +120,23 @@ class DataNodeUGIProvider { return ugi; } + private String buildNonTokenCacheKey(String doAsUserFromQuery, + String remoteUser) throws IOException { + String key = doAsUserFromQuery == null ? String.format("{%s}", remoteUser) + : String.format("{%s}:{%s}", remoteUser, doAsUserFromQuery); + return key; + } + + private UserGroupInformation nonTokenUGI(String usernameFromQuery, + String doAsUserFromQuery, String remoteUser) throws IOException { + + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(remoteUser); + JspHelper.checkUsername(ugi.getShortUserName(), usernameFromQuery); + if (doAsUserFromQuery != null) { + // create and attempt to authorize a proxy user + ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi); + } + return ugi; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 70dc56a66c9..24371df81cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2481,6 +2481,14 @@ + + dfs.webhdfs.ugi.expire.after.access + 600000 + How long in milliseconds after the last access + the cached UGI will expire. With 0, never expire. + + + dfs.namenode.blocks.per.postponedblocks.rescan 10000 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/TestDataNodeUGIProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/TestDataNodeUGIProvider.java new file mode 100644 index 00000000000..bce54227d04 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/TestDataNodeUGIProvider.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.web.webhdfs; + +import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS; +import static org.mockito.Mockito.mock; +import io.netty.handler.codec.http.QueryStringDecoder; + +import java.io.IOException; +import java.net.URI; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.web.WebHdfsConstants; +import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; +import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; +import org.apache.hadoop.hdfs.web.resources.DelegationParam; +import org.apache.hadoop.hdfs.web.resources.LengthParam; +import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam; +import org.apache.hadoop.hdfs.web.resources.OffsetParam; +import org.apache.hadoop.hdfs.web.resources.Param; +import org.apache.hadoop.hdfs.web.resources.UserParam; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; + +public class TestDataNodeUGIProvider { + private final URI uri = URI.create(WebHdfsConstants.WEBHDFS_SCHEME + "://" + + "127.0.0.1:0"); + private final String PATH = "/foo"; + private final int OFFSET = 42; + private final int LENGTH = 512; + private final static int EXPIRE_AFTER_ACCESS = 5*1000; + private Configuration conf; + @Before + public void setUp(){ + conf = WebHdfsTestUtil.createConf(); + conf.setInt(DFSConfigKeys.DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_KEY, + EXPIRE_AFTER_ACCESS); + DataNodeUGIProvider.init(conf); + } + + @Test + public void testUGICacheSecure() throws Exception { + // fake turning on security so api thinks it should use tokens + SecurityUtil.setAuthenticationMethod(KERBEROS, conf); + UserGroupInformation.setConfiguration(conf); + + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser("test-user"); + ugi.setAuthenticationMethod(KERBEROS); + ugi = UserGroupInformation.createProxyUser("test-proxy-user", ugi); + UserGroupInformation.setLoginUser(ugi); + + List> tokens = Lists.newArrayList(); + getWebHdfsFileSystem(ugi, conf, tokens); + + String uri1 = WebHdfsFileSystem.PATH_PREFIX + + PATH + + "?op=OPEN" + + Param.toSortedString("&", new NamenodeAddressParam("127.0.0.1:1010"), + new OffsetParam((long) OFFSET), new LengthParam((long) LENGTH), + new DelegationParam(tokens.get(0).encodeToUrlString())); + + String uri2 = WebHdfsFileSystem.PATH_PREFIX + + PATH + + "?op=OPEN" + + Param.toSortedString("&", new NamenodeAddressParam("127.0.0.1:1010"), + new OffsetParam((long) OFFSET), new LengthParam((long) LENGTH), + new DelegationParam(tokens.get(1).encodeToUrlString())); + + DataNodeUGIProvider ugiProvider1 = new DataNodeUGIProvider( + new ParameterParser(new QueryStringDecoder(URI.create(uri1)), conf)); + UserGroupInformation ugi11 = ugiProvider1.ugi(); + UserGroupInformation ugi12 = ugiProvider1.ugi(); + + Assert.assertEquals( + "With UGI cache, two UGIs returned by the same token should be same", + ugi11, ugi12); + + DataNodeUGIProvider ugiProvider2 = new DataNodeUGIProvider( + new ParameterParser(new QueryStringDecoder(URI.create(uri2)), conf)); + UserGroupInformation url21 = ugiProvider2.ugi(); + UserGroupInformation url22 = ugiProvider2.ugi(); + + Assert.assertEquals( + "With UGI cache, two UGIs returned by the same token should be same", + url21, url22); + + Assert.assertNotEquals( + "With UGI cache, two UGIs for the different token should not be same", + ugi11, url22); + + awaitCacheEmptyDueToExpiration(); + ugi12 = ugiProvider1.ugi(); + url22 = ugiProvider2.ugi(); + + String msg = "With cache eviction, two UGIs returned" + + " by the same token should not be same"; + Assert.assertNotEquals(msg, ugi11, ugi12); + Assert.assertNotEquals(msg, url21, url22); + + Assert.assertNotEquals( + "With UGI cache, two UGIs for the different token should not be same", + ugi11, url22); + } + + @Test + public void testUGICacheInSecure() throws Exception { + String uri1 = WebHdfsFileSystem.PATH_PREFIX + + PATH + + "?op=OPEN" + + Param.toSortedString("&", new OffsetParam((long) OFFSET), + new LengthParam((long) LENGTH), new UserParam("root")); + + String uri2 = WebHdfsFileSystem.PATH_PREFIX + + PATH + + "?op=OPEN" + + Param.toSortedString("&", new OffsetParam((long) OFFSET), + new LengthParam((long) LENGTH), new UserParam("hdfs")); + + DataNodeUGIProvider ugiProvider1 = new DataNodeUGIProvider( + new ParameterParser(new QueryStringDecoder(URI.create(uri1)), conf)); + UserGroupInformation ugi11 = ugiProvider1.ugi(); + UserGroupInformation ugi12 = ugiProvider1.ugi(); + + Assert.assertEquals( + "With UGI cache, two UGIs for the same user should be same", ugi11, + ugi12); + + DataNodeUGIProvider ugiProvider2 = new DataNodeUGIProvider( + new ParameterParser(new QueryStringDecoder(URI.create(uri2)), conf)); + UserGroupInformation url21 = ugiProvider2.ugi(); + UserGroupInformation url22 = ugiProvider2.ugi(); + + Assert.assertEquals( + "With UGI cache, two UGIs for the same user should be same", url21, + url22); + + Assert.assertNotEquals( + "With UGI cache, two UGIs for the different user should not be same", + ugi11, url22); + + awaitCacheEmptyDueToExpiration(); + ugi12 = ugiProvider1.ugi(); + url22 = ugiProvider2.ugi(); + + String msg = "With cache eviction, two UGIs returned by" + + " the same user should not be same"; + Assert.assertNotEquals(msg, ugi11, ugi12); + Assert.assertNotEquals(msg, url21, url22); + + Assert.assertNotEquals( + "With UGI cache, two UGIs for the different user should not be same", + ugi11, url22); + } + + /** + * Wait for expiration of entries from the UGI cache. We need to be careful + * not to touch the entries in the cache while we're waiting for expiration. + * If we did, then that would reset the clock on expiration for those entries. + * Instead, we trigger internal clean-up of the cache and check for size 0. + * + * @throws Exception if there is any error + */ + private void awaitCacheEmptyDueToExpiration() throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + DataNodeUGIProvider.ugiCache.cleanUp(); + return DataNodeUGIProvider.ugiCache.size() == 0; + } + }, EXPIRE_AFTER_ACCESS, 10 * EXPIRE_AFTER_ACCESS); + } + + private WebHdfsFileSystem getWebHdfsFileSystem(UserGroupInformation ugi, + Configuration conf, List> tokens) + throws IOException { + if (UserGroupInformation.isSecurityEnabled()) { + DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(new Text( + ugi.getUserName()), null, null); + FSNamesystem namesystem = mock(FSNamesystem.class); + DelegationTokenSecretManager dtSecretManager = new DelegationTokenSecretManager( + 86400000, 86400000, 86400000, 86400000, namesystem); + dtSecretManager.startThreads(); + Token token1 = new Token( + dtId, dtSecretManager); + Token token2 = new Token( + dtId, dtSecretManager); + SecurityUtil.setTokenService(token1, + NetUtils.createSocketAddr(uri.getAuthority())); + SecurityUtil.setTokenService(token2, + NetUtils.createSocketAddr(uri.getAuthority())); + token1.setKind(WebHdfsConstants.WEBHDFS_TOKEN_KIND); + token2.setKind(WebHdfsConstants.WEBHDFS_TOKEN_KIND); + + tokens.add(token1); + tokens.add(token2); + + ugi.addToken(token1); + ugi.addToken(token2); + } + return (WebHdfsFileSystem) FileSystem.get(uri, conf); + } +}