HDFS-8855. Webhdfs client leaks active NameNode connections. Contributed by Xiaobing Zhou.

(cherry picked from commit fe5624b85d)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java
This commit is contained in:
Xiaoyu Yao 2015-11-24 12:41:08 -08:00 committed by cnauroth
parent e226dbbc8a
commit ad4fcd1f9f
7 changed files with 351 additions and 21 deletions

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.security.token; package org.apache.hadoop.security.token;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.primitives.Bytes;
import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -32,6 +34,7 @@ import java.io.*;
import java.util.Arrays; import java.util.Arrays;
import java.util.Map; import java.util.Map;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import java.util.UUID;
/** /**
* The client-side form of the token. * The client-side form of the token.
@ -338,6 +341,11 @@ public class Token<T extends TokenIdentifier> implements Writable {
return buffer.toString(); return buffer.toString();
} }
public String buildCacheKey() {
return UUID.nameUUIDFromBytes(
Bytes.concat(kind.getBytes(), identifier, password)).toString();
}
private static ServiceLoader<TokenRenewer> renewers = private static ServiceLoader<TokenRenewer> renewers =
ServiceLoader.load(TokenRenewer.class); ServiceLoader.load(TokenRenewer.class);

View File

@ -1716,6 +1716,9 @@ Release 2.8.0 - UNRELEASED
HDFS-6101. TestReplaceDatanodeOnFailure fails occasionally. HDFS-6101. TestReplaceDatanodeOnFailure fails occasionally.
(Wei-Chiu Chuang via cnauroth) (Wei-Chiu Chuang via cnauroth)
HDFS-8855. Webhdfs client leaks active NameNode connections.
(Xiaobing Zhou via xyao)
HDFS-8335. FSNamesystem should construct FSPermissionChecker only if HDFS-8335. FSNamesystem should construct FSPermissionChecker only if
permission is enabled. (Gabor Liptak via wheat9) permission is enabled. (Gabor Liptak via wheat9)

View File

@ -69,6 +69,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_WEBHDFS_NETTY_HIGH_WATERMARK = public static final String DFS_WEBHDFS_NETTY_HIGH_WATERMARK =
"dfs.webhdfs.netty.high.watermark"; "dfs.webhdfs.netty.high.watermark";
public static final int DFS_WEBHDFS_NETTY_HIGH_WATERMARK_DEFAULT = 65535; public static final int DFS_WEBHDFS_NETTY_HIGH_WATERMARK_DEFAULT = 65535;
public static final String DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_KEY =
"dfs.webhdfs.ugi.expire.after.access";
public static final int DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_DEFAULT =
10*60*1000; //10 minutes
// HA related configuration // HA related configuration
public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration"; public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration";

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets; import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
import org.apache.hadoop.hdfs.server.namenode.StreamFile; import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import org.apache.hadoop.hdfs.server.datanode.web.webhdfs.DataNodeUGIProvider;
import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -76,7 +77,6 @@ public class DatanodeHttpServer implements Closeable {
private final Configuration confForCreate; private final Configuration confForCreate;
private InetSocketAddress httpAddress; private InetSocketAddress httpAddress;
private InetSocketAddress httpsAddress; private InetSocketAddress httpsAddress;
static final Log LOG = LogFactory.getLog(DatanodeHttpServer.class); static final Log LOG = LogFactory.getLog(DatanodeHttpServer.class);
public DatanodeHttpServer(final Configuration conf, public DatanodeHttpServer(final Configuration conf,
@ -105,7 +105,7 @@ public class DatanodeHttpServer implements Closeable {
this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf); this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
this.infoServer.addServlet(null, "/blockScannerReport", this.infoServer.addServlet(null, "/blockScannerReport",
BlockScanner.Servlet.class); BlockScanner.Servlet.class);
DataNodeUGIProvider.init(conf);
this.infoServer.start(); this.infoServer.start();
final InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0); final InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0);

View File

@ -13,50 +13,103 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs; package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/** /**
* Create UGI from the request for the WebHDFS requests for the DNs. Note that * Create UGI from the request for the WebHDFS requests for the DNs. Note that
* the DN does not authenticate the UGI -- the NN will authenticate them in * the DN does not authenticate the UGI -- the NN will authenticate them in
* subsequent operations. * subsequent operations.
*/ */
class DataNodeUGIProvider { public class DataNodeUGIProvider {
private final ParameterParser params; private final ParameterParser params;
@VisibleForTesting
static Cache<String, UserGroupInformation> ugiCache;
public static final Log LOG = LogFactory.getLog(Client.class);
DataNodeUGIProvider(ParameterParser params) { DataNodeUGIProvider(ParameterParser params) {
this.params = params; this.params = params;
} }
UserGroupInformation ugi() throws IOException { public static synchronized void init(Configuration conf) {
if (UserGroupInformation.isSecurityEnabled()) { if (ugiCache == null) {
return tokenUGI(); ugiCache = CacheBuilder
.newBuilder()
.expireAfterAccess(
conf.getInt(
DFSConfigKeys.DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_KEY,
DFSConfigKeys.DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_DEFAULT),
TimeUnit.MILLISECONDS).build();
}
} }
UserGroupInformation ugi() throws IOException {
UserGroupInformation ugi;
try {
if (UserGroupInformation.isSecurityEnabled()) {
final Token<DelegationTokenIdentifier> token = params.delegationToken();
ugi = ugiCache.get(buildTokenCacheKey(token),
new Callable<UserGroupInformation>() {
@Override
public UserGroupInformation call() throws Exception {
return tokenUGI(token);
}
});
} else {
final String usernameFromQuery = params.userName(); final String usernameFromQuery = params.userName();
final String doAsUserFromQuery = params.doAsUser(); final String doAsUserFromQuery = params.doAsUser();
final String remoteUser = usernameFromQuery == null final String remoteUser = usernameFromQuery == null ? JspHelper
? JspHelper.getDefaultWebUserName(params.conf()) // not specified in .getDefaultWebUserName(params.conf()) // not specified in request
// request
: usernameFromQuery; : usernameFromQuery;
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser); ugi = ugiCache.get(
JspHelper.checkUsername(ugi.getShortUserName(), usernameFromQuery); buildNonTokenCacheKey(doAsUserFromQuery, remoteUser),
if (doAsUserFromQuery != null) { new Callable<UserGroupInformation>() {
// create and attempt to authorize a proxy user @Override
ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi); public UserGroupInformation call() throws Exception {
return nonTokenUGI(usernameFromQuery, doAsUserFromQuery,
remoteUser);
} }
});
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
} else {
throw new IOException(cause);
}
}
return ugi; return ugi;
} }
private UserGroupInformation tokenUGI() throws IOException { private String buildTokenCacheKey(Token<DelegationTokenIdentifier> token) {
Token<DelegationTokenIdentifier> token = params.delegationToken(); return token.buildCacheKey();
}
private UserGroupInformation tokenUGI(Token<DelegationTokenIdentifier> token)
throws IOException {
ByteArrayInputStream buf = ByteArrayInputStream buf =
new ByteArrayInputStream(token.getIdentifier()); new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf); DataInputStream in = new DataInputStream(buf);
@ -67,4 +120,23 @@ class DataNodeUGIProvider {
return ugi; return ugi;
} }
private String buildNonTokenCacheKey(String doAsUserFromQuery,
String remoteUser) throws IOException {
String key = doAsUserFromQuery == null ? String.format("{%s}", remoteUser)
: String.format("{%s}:{%s}", remoteUser, doAsUserFromQuery);
return key;
}
private UserGroupInformation nonTokenUGI(String usernameFromQuery,
String doAsUserFromQuery, String remoteUser) throws IOException {
UserGroupInformation ugi = UserGroupInformation
.createRemoteUser(remoteUser);
JspHelper.checkUsername(ugi.getShortUserName(), usernameFromQuery);
if (doAsUserFromQuery != null) {
// create and attempt to authorize a proxy user
ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi);
}
return ugi;
}
} }

View File

@ -2624,6 +2624,14 @@
</description> </description>
</property> </property>
<property>
<name>dfs.webhdfs.ugi.expire.after.access</name>
<value>600000</value>
<description>How long in milliseconds after the last access
the cached UGI will expire. With 0, never expire.
</description>
</property>
<property> <property>
<name>dfs.namenode.blocks.per.postponedblocks.rescan</name> <name>dfs.namenode.blocks.per.postponedblocks.rescan</name>
<value>10000</value> <value>10000</value>

View File

@ -0,0 +1,235 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.mockito.Mockito.mock;
import io.netty.handler.codec.http.QueryStringDecoder;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.hdfs.web.resources.UserParam;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
public class TestDataNodeUGIProvider {
private final URI uri = URI.create(WebHdfsConstants.WEBHDFS_SCHEME + "://"
+ "127.0.0.1:0");
private final String PATH = "/foo";
private final int OFFSET = 42;
private final int LENGTH = 512;
private final static int EXPIRE_AFTER_ACCESS = 5*1000;
private Configuration conf;
@Before
public void setUp(){
conf = WebHdfsTestUtil.createConf();
conf.setInt(DFSConfigKeys.DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_KEY,
EXPIRE_AFTER_ACCESS);
DataNodeUGIProvider.init(conf);
}
@Test
public void testUGICacheSecure() throws Exception {
// fake turning on security so api thinks it should use tokens
SecurityUtil.setAuthenticationMethod(KERBEROS, conf);
UserGroupInformation.setConfiguration(conf);
UserGroupInformation ugi = UserGroupInformation
.createRemoteUser("test-user");
ugi.setAuthenticationMethod(KERBEROS);
ugi = UserGroupInformation.createProxyUser("test-proxy-user", ugi);
UserGroupInformation.setLoginUser(ugi);
List<Token<DelegationTokenIdentifier>> tokens = Lists.newArrayList();
getWebHdfsFileSystem(ugi, conf, tokens);
String uri1 = WebHdfsFileSystem.PATH_PREFIX
+ PATH
+ "?op=OPEN"
+ Param.toSortedString("&", new NamenodeAddressParam("127.0.0.1:1010"),
new OffsetParam((long) OFFSET), new LengthParam((long) LENGTH),
new DelegationParam(tokens.get(0).encodeToUrlString()));
String uri2 = WebHdfsFileSystem.PATH_PREFIX
+ PATH
+ "?op=OPEN"
+ Param.toSortedString("&", new NamenodeAddressParam("127.0.0.1:1010"),
new OffsetParam((long) OFFSET), new LengthParam((long) LENGTH),
new DelegationParam(tokens.get(1).encodeToUrlString()));
DataNodeUGIProvider ugiProvider1 = new DataNodeUGIProvider(
new ParameterParser(new QueryStringDecoder(URI.create(uri1)), conf));
UserGroupInformation ugi11 = ugiProvider1.ugi();
UserGroupInformation ugi12 = ugiProvider1.ugi();
Assert.assertEquals(
"With UGI cache, two UGIs returned by the same token should be same",
ugi11, ugi12);
DataNodeUGIProvider ugiProvider2 = new DataNodeUGIProvider(
new ParameterParser(new QueryStringDecoder(URI.create(uri2)), conf));
UserGroupInformation url21 = ugiProvider2.ugi();
UserGroupInformation url22 = ugiProvider2.ugi();
Assert.assertEquals(
"With UGI cache, two UGIs returned by the same token should be same",
url21, url22);
Assert.assertNotEquals(
"With UGI cache, two UGIs for the different token should not be same",
ugi11, url22);
awaitCacheEmptyDueToExpiration();
ugi12 = ugiProvider1.ugi();
url22 = ugiProvider2.ugi();
String msg = "With cache eviction, two UGIs returned" +
" by the same token should not be same";
Assert.assertNotEquals(msg, ugi11, ugi12);
Assert.assertNotEquals(msg, url21, url22);
Assert.assertNotEquals(
"With UGI cache, two UGIs for the different token should not be same",
ugi11, url22);
}
@Test
public void testUGICacheInSecure() throws Exception {
String uri1 = WebHdfsFileSystem.PATH_PREFIX
+ PATH
+ "?op=OPEN"
+ Param.toSortedString("&", new OffsetParam((long) OFFSET),
new LengthParam((long) LENGTH), new UserParam("root"));
String uri2 = WebHdfsFileSystem.PATH_PREFIX
+ PATH
+ "?op=OPEN"
+ Param.toSortedString("&", new OffsetParam((long) OFFSET),
new LengthParam((long) LENGTH), new UserParam("hdfs"));
DataNodeUGIProvider ugiProvider1 = new DataNodeUGIProvider(
new ParameterParser(new QueryStringDecoder(URI.create(uri1)), conf));
UserGroupInformation ugi11 = ugiProvider1.ugi();
UserGroupInformation ugi12 = ugiProvider1.ugi();
Assert.assertEquals(
"With UGI cache, two UGIs for the same user should be same", ugi11,
ugi12);
DataNodeUGIProvider ugiProvider2 = new DataNodeUGIProvider(
new ParameterParser(new QueryStringDecoder(URI.create(uri2)), conf));
UserGroupInformation url21 = ugiProvider2.ugi();
UserGroupInformation url22 = ugiProvider2.ugi();
Assert.assertEquals(
"With UGI cache, two UGIs for the same user should be same", url21,
url22);
Assert.assertNotEquals(
"With UGI cache, two UGIs for the different user should not be same",
ugi11, url22);
awaitCacheEmptyDueToExpiration();
ugi12 = ugiProvider1.ugi();
url22 = ugiProvider2.ugi();
String msg = "With cache eviction, two UGIs returned by" +
" the same user should not be same";
Assert.assertNotEquals(msg, ugi11, ugi12);
Assert.assertNotEquals(msg, url21, url22);
Assert.assertNotEquals(
"With UGI cache, two UGIs for the different user should not be same",
ugi11, url22);
}
/**
* Wait for expiration of entries from the UGI cache. We need to be careful
* not to touch the entries in the cache while we're waiting for expiration.
* If we did, then that would reset the clock on expiration for those entries.
* Instead, we trigger internal clean-up of the cache and check for size 0.
*
* @throws Exception if there is any error
*/
private void awaitCacheEmptyDueToExpiration() throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
DataNodeUGIProvider.ugiCache.cleanUp();
return DataNodeUGIProvider.ugiCache.size() == 0;
}
}, EXPIRE_AFTER_ACCESS, 10 * EXPIRE_AFTER_ACCESS);
}
private WebHdfsFileSystem getWebHdfsFileSystem(UserGroupInformation ugi,
Configuration conf, List<Token<DelegationTokenIdentifier>> tokens)
throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(new Text(
ugi.getUserName()), null, null);
FSNamesystem namesystem = mock(FSNamesystem.class);
DelegationTokenSecretManager dtSecretManager = new DelegationTokenSecretManager(
86400000, 86400000, 86400000, 86400000, namesystem);
dtSecretManager.startThreads();
Token<DelegationTokenIdentifier> token1 = new Token<DelegationTokenIdentifier>(
dtId, dtSecretManager);
Token<DelegationTokenIdentifier> token2 = new Token<DelegationTokenIdentifier>(
dtId, dtSecretManager);
SecurityUtil.setTokenService(token1,
NetUtils.createSocketAddr(uri.getAuthority()));
SecurityUtil.setTokenService(token2,
NetUtils.createSocketAddr(uri.getAuthority()));
token1.setKind(WebHdfsConstants.WEBHDFS_TOKEN_KIND);
token2.setKind(WebHdfsConstants.WEBHDFS_TOKEN_KIND);
tokens.add(token1);
tokens.add(token2);
ugi.addToken(token1);
ugi.addToken(token2);
}
return (WebHdfsFileSystem) FileSystem.get(uri, conf);
}
}