From 84cbd72afda6344e220526fac5c560f00f84e374 Mon Sep 17 00:00:00 2001 From: Jitendra Pandey Date: Tue, 13 Oct 2015 09:12:04 -0700 Subject: [PATCH] HDFS-8855. Webhdfs client leaks active NameNode connections. Contributed by Xiaobing Zhou. --- .../apache/hadoop/security/token/Token.java | 11 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + .../web/webhdfs/DataNodeUGIProvider.java | 108 ++++++-- .../datanode/web/webhdfs/WebHdfsHandler.java | 2 +- .../src/main/resources/hdfs-default.xml | 8 + .../web/webhdfs/TestDataNodeUGIProvider.java | 231 ++++++++++++++++++ 7 files changed, 347 insertions(+), 20 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/TestDataNodeUGIProvider.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java index 2420155424e..f189a966cd7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java @@ -19,6 +19,8 @@ package org.apache.hadoop.security.token; import com.google.common.collect.Maps; +import com.google.common.primitives.Bytes; + import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,9 +31,11 @@ import org.apache.hadoop.io.*; import org.apache.hadoop.util.ReflectionUtils; import java.io.*; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Map; import java.util.ServiceLoader; +import java.util.UUID; /** * The client-side form of the token. @@ -337,7 +341,12 @@ public class Token implements Writable { identifierToString(buffer); return buffer.toString(); } - + + public String buildCacheKey() { + return UUID.nameUUIDFromBytes( + Bytes.concat(kind.getBytes(), identifier, password)).toString(); + } + private static ServiceLoader renewers = ServiceLoader.load(TokenRenewer.class); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d7b6bd3ea82..9751a3a0eb2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -2033,6 +2033,9 @@ Release 2.8.0 - UNRELEASED HDFS-9160. [OIV-Doc] : Missing details of 'delimited' for processor options (nijel via vinayakumarb) + HDFS-8855. Webhdfs client leaks active NameNode connections. + (Xiaobing Zhou via jitendra) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 8510c9a2d79..cb05fa9f83f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -70,6 +70,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_WEBHDFS_NETTY_HIGH_WATERMARK = "dfs.webhdfs.netty.high.watermark"; public static final int DFS_WEBHDFS_NETTY_HIGH_WATERMARK_DEFAULT = 65535; + public static final String DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_KEY = + "dfs.webhdfs.ugi.expire.after.access"; + public static final int DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_DEFAULT = + 10*60*1000; //10 minutes // HA related configuration public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java index ea1c29f5fb0..064def8701c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java @@ -13,14 +13,26 @@ */ package org.apache.hadoop.hdfs.server.datanode.web.webhdfs; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.JspHelper; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.ipc.Client; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; /** * Create UGI from the request for the WebHDFS requests for the DNs. Note that @@ -29,34 +41,75 @@ import java.io.IOException; */ class DataNodeUGIProvider { private final ParameterParser params; + private static Cache ugiCache; + public static final Log LOG = LogFactory.getLog(Client.class); - DataNodeUGIProvider(ParameterParser params) { + DataNodeUGIProvider(ParameterParser params, Configuration conf) { this.params = params; + if (ugiCache == null) { + synchronized (DataNodeUGIProvider.class) { + if (ugiCache == null) { + ugiCache = CacheBuilder + .newBuilder() + .expireAfterAccess( + conf.getInt( + DFSConfigKeys.DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_KEY, + DFSConfigKeys.DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_DEFAULT), + TimeUnit.MILLISECONDS).build(); + } + } + } } UserGroupInformation ugi() throws IOException { - if (UserGroupInformation.isSecurityEnabled()) { - return tokenUGI(); + UserGroupInformation ugi; + + try { + if (UserGroupInformation.isSecurityEnabled()) { + final Token token = params.delegationToken(); + + ugi = ugiCache.get(buildTokenCacheKey(token), + new Callable() { + @Override + public UserGroupInformation call() throws Exception { + return tokenUGI(token); + } + }); + } else { + final String usernameFromQuery = params.userName(); + final String doAsUserFromQuery = params.doAsUser(); + final String remoteUser = usernameFromQuery == null ? JspHelper + .getDefaultWebUserName(params.conf()) // not specified in request + : usernameFromQuery; + + ugi = ugiCache.get( + buildNonTokenCacheKey(doAsUserFromQuery, remoteUser), + new Callable() { + @Override + public UserGroupInformation call() throws Exception { + return nonTokenUGI(usernameFromQuery, doAsUserFromQuery, + remoteUser); + } + }); + } + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } else { + throw new IOException(cause); + } } - final String usernameFromQuery = params.userName(); - final String doAsUserFromQuery = params.doAsUser(); - final String remoteUser = usernameFromQuery == null - ? JspHelper.getDefaultWebUserName(params.conf()) // not specified in - // request - : usernameFromQuery; - - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser); - JspHelper.checkUsername(ugi.getShortUserName(), usernameFromQuery); - if (doAsUserFromQuery != null) { - // create and attempt to authorize a proxy user - ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi); - } return ugi; } - private UserGroupInformation tokenUGI() throws IOException { - Token token = params.delegationToken(); + private String buildTokenCacheKey(Token token) { + return token.buildCacheKey(); + } + + private UserGroupInformation tokenUGI(Token token) + throws IOException { ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); DataInputStream in = new DataInputStream(buf); @@ -67,4 +120,23 @@ class DataNodeUGIProvider { return ugi; } + private String buildNonTokenCacheKey(String doAsUserFromQuery, + String remoteUser) throws IOException { + String key = doAsUserFromQuery == null ? String.format("{%s}", remoteUser) + : String.format("{%s}:{%s}", remoteUser, doAsUserFromQuery); + return key; + } + + private UserGroupInformation nonTokenUGI(String usernameFromQuery, + String doAsUserFromQuery, String remoteUser) throws IOException { + + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser(remoteUser); + JspHelper.checkUsername(ugi.getShortUserName(), usernameFromQuery); + if (doAsUserFromQuery != null) { + // create and attempt to authorize a proxy user + ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi); + } + return ugi; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java index dffe34d234f..f8ddce14e50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java @@ -107,7 +107,7 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler { Preconditions.checkArgument(req.uri().startsWith(WEBHDFS_PREFIX)); QueryStringDecoder queryString = new QueryStringDecoder(req.uri()); params = new ParameterParser(queryString, conf); - DataNodeUGIProvider ugiProvider = new DataNodeUGIProvider(params); + DataNodeUGIProvider ugiProvider = new DataNodeUGIProvider(params, conf); ugi = ugiProvider.ugi(); path = params.path(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index cc4f146b033..cd88e52c12f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2405,6 +2405,14 @@ + + dfs.webhdfs.ugi.expire.after.access + 600000 + How long in milliseconds after the last access + the cached UGI will expire. With 0, never expire. + + + dfs.namenode.blocks.per.postponedblocks.rescan 10000 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/TestDataNodeUGIProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/TestDataNodeUGIProvider.java new file mode 100644 index 00000000000..b87371ee2b1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/TestDataNodeUGIProvider.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.web.webhdfs; + +import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS; +import static org.mockito.Mockito.mock; +import io.netty.handler.codec.http.QueryStringDecoder; + +import java.io.IOException; +import java.net.URI; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.web.WebHdfsConstants; +import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; +import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; +import org.apache.hadoop.hdfs.web.resources.DelegationParam; +import org.apache.hadoop.hdfs.web.resources.LengthParam; +import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam; +import org.apache.hadoop.hdfs.web.resources.OffsetParam; +import org.apache.hadoop.hdfs.web.resources.Param; +import org.apache.hadoop.hdfs.web.resources.UserParam; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TestDataNodeUGIProvider { + private final URI uri = URI.create(WebHdfsConstants.WEBHDFS_SCHEME + "://" + + "127.0.0.1:0"); + private final String PATH = "/foo"; + private final int OFFSET = 42; + private final int LENGTH = 512; + private final int EXPIRE_AFTER_ACCESS = 5*1000; + + @Test + public void testUGICacheSecure() throws Exception { + + final Configuration conf = WebHdfsTestUtil.createConf(); + conf.setInt(DFSConfigKeys.DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_KEY, + EXPIRE_AFTER_ACCESS); + + // fake turning on security so api thinks it should use tokens + SecurityUtil.setAuthenticationMethod(KERBEROS, conf); + UserGroupInformation.setConfiguration(conf); + + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser("test-user"); + ugi.setAuthenticationMethod(KERBEROS); + ugi = UserGroupInformation.createProxyUser("test-proxy-user", ugi); + UserGroupInformation.setLoginUser(ugi); + + List> tokens = Lists.newArrayList(); + getWebHdfsFileSystem(ugi, conf, tokens); + + String uri1 = WebHdfsFileSystem.PATH_PREFIX + + PATH + + "?op=OPEN" + + Param.toSortedString("&", new NamenodeAddressParam("127.0.0.1:1010"), + new OffsetParam((long) OFFSET), new LengthParam((long) LENGTH), + new DelegationParam(tokens.get(0).encodeToUrlString())); + + String uri2 = WebHdfsFileSystem.PATH_PREFIX + + PATH + + "?op=OPEN" + + Param.toSortedString("&", new NamenodeAddressParam("127.0.0.1:1010"), + new OffsetParam((long) OFFSET), new LengthParam((long) LENGTH), + new DelegationParam(tokens.get(1).encodeToUrlString())); + + DataNodeUGIProvider ugiProvider1 = new DataNodeUGIProvider( + new ParameterParser(new QueryStringDecoder(URI.create(uri1)), conf), + conf); + UserGroupInformation ugi11 = ugiProvider1.ugi(); + UserGroupInformation ugi12 = ugiProvider1.ugi(); + + Assert.assertEquals( + "With UGI cache, two UGIs returned by the same token should be same", + ugi11, ugi12); + + DataNodeUGIProvider ugiProvider2 = new DataNodeUGIProvider( + new ParameterParser(new QueryStringDecoder(URI.create(uri2)), conf), + conf); + UserGroupInformation url21 = ugiProvider2.ugi(); + UserGroupInformation url22 = ugiProvider2.ugi(); + + Assert.assertEquals( + "With UGI cache, two UGIs returned by the same token should be same", + url21, url22); + + Assert.assertNotEquals( + "With UGI cache, two UGIs for the different token should not be same", + ugi11, url22); + + Thread.sleep(EXPIRE_AFTER_ACCESS); + ugi12 = ugiProvider1.ugi(); + url22 = ugiProvider2.ugi(); + + Assert + .assertNotEquals( + "With cache eviction, two UGIs returned by the same token should not be same", + ugi11, ugi12); + + Assert + .assertNotEquals( + "With cache eviction, two UGIs returned by the same token should not be same", + url21, url22); + + Assert.assertNotEquals( + "With UGI cache, two UGIs for the different token should not be same", + ugi11, url22); + } + + @Test + public void testUGICacheInSecure() throws Exception { + + final Configuration conf = WebHdfsTestUtil.createConf(); + conf.setInt(DFSConfigKeys.DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_KEY, + EXPIRE_AFTER_ACCESS); + + String uri1 = WebHdfsFileSystem.PATH_PREFIX + + PATH + + "?op=OPEN" + + Param.toSortedString("&", new OffsetParam((long) OFFSET), + new LengthParam((long) LENGTH), new UserParam("root")); + + String uri2 = WebHdfsFileSystem.PATH_PREFIX + + PATH + + "?op=OPEN" + + Param.toSortedString("&", new OffsetParam((long) OFFSET), + new LengthParam((long) LENGTH), new UserParam("hdfs")); + + DataNodeUGIProvider ugiProvider1 = new DataNodeUGIProvider( + new ParameterParser(new QueryStringDecoder(URI.create(uri1)), conf), + conf); + UserGroupInformation ugi11 = ugiProvider1.ugi(); + UserGroupInformation ugi12 = ugiProvider1.ugi(); + + Assert.assertEquals( + "With UGI cache, two UGIs for the same user should be same", ugi11, + ugi12); + + DataNodeUGIProvider ugiProvider2 = new DataNodeUGIProvider( + new ParameterParser(new QueryStringDecoder(URI.create(uri2)), conf), + conf); + UserGroupInformation url21 = ugiProvider2.ugi(); + UserGroupInformation url22 = ugiProvider2.ugi(); + + Assert.assertEquals( + "With UGI cache, two UGIs for the same user should be same", url21, + url22); + + Assert.assertNotEquals( + "With UGI cache, two UGIs for the different user should not be same", + ugi11, url22); + + Thread.sleep(EXPIRE_AFTER_ACCESS); + ugi12 = ugiProvider1.ugi(); + url22 = ugiProvider2.ugi(); + + Assert + .assertNotEquals( + "With cache eviction, two UGIs returned by the same user should not be same", + ugi11, ugi12); + + Assert + .assertNotEquals( + "With cache eviction, two UGIs returned by the same user should not be same", + url21, url22); + + Assert.assertNotEquals( + "With UGI cache, two UGIs for the different user should not be same", + ugi11, url22); + } + + private WebHdfsFileSystem getWebHdfsFileSystem(UserGroupInformation ugi, + Configuration conf, List> tokens) + throws IOException { + if (UserGroupInformation.isSecurityEnabled()) { + DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(new Text( + ugi.getUserName()), null, null); + FSNamesystem namesystem = mock(FSNamesystem.class); + DelegationTokenSecretManager dtSecretManager = new DelegationTokenSecretManager( + 86400000, 86400000, 86400000, 86400000, namesystem); + dtSecretManager.startThreads(); + Token token1 = new Token( + dtId, dtSecretManager); + Token token2 = new Token( + dtId, dtSecretManager); + SecurityUtil.setTokenService(token1, + NetUtils.createSocketAddr(uri.getAuthority())); + SecurityUtil.setTokenService(token2, + NetUtils.createSocketAddr(uri.getAuthority())); + token1.setKind(WebHdfsConstants.WEBHDFS_TOKEN_KIND); + token2.setKind(WebHdfsConstants.WEBHDFS_TOKEN_KIND); + + tokens.add(token1); + tokens.add(token2); + + ugi.addToken(token1); + ugi.addToken(token2); + } + return (WebHdfsFileSystem) FileSystem.get(uri, conf); + } +}