HDFS-6127. Merge r1579546 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1579548 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Haohui Mai 2014-03-20 07:01:35 +00:00
parent 31a3a99f05
commit d955e84f39
6 changed files with 128 additions and 54 deletions

View File

@ -414,6 +414,8 @@ Release 2.4.0 - UNRELEASED
HDFS-6105. NN web UI for DN list loads the same jmx page multiple times. HDFS-6105. NN web UI for DN list loads the same jmx page multiple times.
(wheat9) (wheat9)
HDFS-6127. WebHDFS tokens cannot be renewed in HA setup. (wheat9)
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)

View File

@ -1072,7 +1072,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
private static ClientProtocol getNNProxy( private static ClientProtocol getNNProxy(
Token<DelegationTokenIdentifier> token, Configuration conf) Token<DelegationTokenIdentifier> token, Configuration conf)
throws IOException { throws IOException {
URI uri = HAUtil.getServiceUriFromToken(token); URI uri = HAUtil.getServiceUriFromToken(HdfsConstants.HDFS_URI_SCHEME,
token);
if (HAUtil.isTokenForLogicalUri(token) && if (HAUtil.isTokenForLogicalUri(token) &&
!HAUtil.isLogicalUri(conf, uri)) { !HAUtil.isLogicalUri(conf, uri)) {
// If the token is for a logical nameservice, but the configuration // If the token is for a logical nameservice, but the configuration

View File

@ -17,24 +17,11 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import com.google.common.base.Joiner; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
import com.google.common.base.Preconditions; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
import com.google.common.collect.Lists; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import org.apache.commons.logging.Log; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
import org.apache.commons.logging.LogFactory; import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -42,10 +29,30 @@ import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Map; import java.util.Map;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import org.apache.commons.logging.Log;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
public class HAUtil { public class HAUtil {
@ -209,25 +216,16 @@ public class HAUtil {
} }
/** /**
* Parse the HDFS URI out of the provided token. * Parse the file system URI out of the provided token.
* @throws IOException if the token is invalid
*/ */
public static URI getServiceUriFromToken( public static URI getServiceUriFromToken(final String scheme,
Token<DelegationTokenIdentifier> token) Token<?> token) {
throws IOException {
String tokStr = token.getService().toString(); String tokStr = token.getService().toString();
if (tokStr.startsWith(HA_DT_SERVICE_PREFIX)) { if (tokStr.startsWith(HA_DT_SERVICE_PREFIX)) {
tokStr = tokStr.replaceFirst(HA_DT_SERVICE_PREFIX, ""); tokStr = tokStr.replaceFirst(HA_DT_SERVICE_PREFIX, "");
} }
return URI.create(scheme + "://" + tokStr);
try {
return new URI(HdfsConstants.HDFS_URI_SCHEME + "://" +
tokStr);
} catch (URISyntaxException e) {
throw new IOException("Invalid token contents: '" +
tokStr + "'");
}
} }
/** /**
@ -244,8 +242,7 @@ public class HAUtil {
* @return true if this token corresponds to a logical nameservice * @return true if this token corresponds to a logical nameservice
* rather than a specific namenode. * rather than a specific namenode.
*/ */
public static boolean isTokenForLogicalUri( public static boolean isTokenForLogicalUri(Token<?> token) {
Token<DelegationTokenIdentifier> token) {
return token.getService().toString().startsWith(HA_DT_SERVICE_PREFIX); return token.getService().toString().startsWith(HA_DT_SERVICE_PREFIX);
} }
@ -293,7 +290,6 @@ public class HAUtil {
* @return the internet address of the currently-active NN. * @return the internet address of the currently-active NN.
* @throws IOException if an error occurs while resolving the active NN. * @throws IOException if an error occurs while resolving the active NN.
*/ */
@SuppressWarnings("deprecation")
public static InetSocketAddress getAddressOfActive(FileSystem fs) public static InetSocketAddress getAddressOfActive(FileSystem fs)
throws IOException { throws IOException {
if (!(fs instanceof DistributedFileSystem)) { if (!(fs instanceof DistributedFileSystem)) {

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hdfs.web; package org.apache.hadoop.hdfs.web;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
@ -28,9 +30,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DelegationTokenRenewer; import org.apache.hadoop.fs.DelegationTokenRenewer;
import org.apache.hadoop.fs.DelegationTokenRenewer.Renewable; import org.apache.hadoop.fs.DelegationTokenRenewer.Renewable;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -71,23 +74,32 @@ final class TokenAspect<T extends FileSystem & Renewable> {
} }
private TokenManagementDelegator getInstance(Token<?> token, private TokenManagementDelegator getInstance(Token<?> token,
Configuration conf) throws IOException { Configuration conf)
final InetSocketAddress address = SecurityUtil.getTokenServiceAddr(token); throws IOException {
Text kind = token.getKind();
final URI uri; final URI uri;
final String scheme = getSchemeByKind(token.getKind());
if (HAUtil.isTokenForLogicalUri(token)) {
uri = HAUtil.getServiceUriFromToken(scheme, token);
} else {
final InetSocketAddress address = SecurityUtil.getTokenServiceAddr
(token);
uri = URI.create(scheme + "://" + NetUtils.getHostPortString(address));
}
return (TokenManagementDelegator) FileSystem.get(uri, conf);
}
private static String getSchemeByKind(Text kind) {
if (kind.equals(HftpFileSystem.TOKEN_KIND)) { if (kind.equals(HftpFileSystem.TOKEN_KIND)) {
uri = DFSUtil.createUri(HftpFileSystem.SCHEME, address); return HftpFileSystem.SCHEME;
} else if (kind.equals(HsftpFileSystem.TOKEN_KIND)) { } else if (kind.equals(HsftpFileSystem.TOKEN_KIND)) {
uri = DFSUtil.createUri(HsftpFileSystem.SCHEME, address); return HsftpFileSystem.SCHEME;
} else if (kind.equals(WebHdfsFileSystem.TOKEN_KIND)) { } else if (kind.equals(WebHdfsFileSystem.TOKEN_KIND)) {
uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, address); return WebHdfsFileSystem.SCHEME;
} else if (kind.equals(SWebHdfsFileSystem.TOKEN_KIND)) { } else if (kind.equals(SWebHdfsFileSystem.TOKEN_KIND)) {
uri = DFSUtil.createUri(SWebHdfsFileSystem.SCHEME, address); return SWebHdfsFileSystem.SCHEME;
} else { } else {
throw new IllegalArgumentException("Unsupported scheme"); throw new IllegalArgumentException("Unsupported scheme");
} }
return (TokenManagementDelegator) FileSystem.get(uri, conf);
} }
} }

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.web.resources;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
import javax.servlet.ServletContext;
import java.io.IOException;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
public class TestDatanodeWebHdfsMethods {
private static final String LOGICAL_NAME = "minidfs";
@Test
public void testDeserializeHAToken() throws IOException {
Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME);
DataNode dn = mock(DataNode.class);
doReturn(conf).when(dn).getConf();
ServletContext context = mock(ServletContext.class);
doReturn(dn).when(context).getAttribute("datanode");
final Token<DelegationTokenIdentifier> token = new
Token<DelegationTokenIdentifier>();
DatanodeWebHdfsMethods method = new DatanodeWebHdfsMethods();
Whitebox.setInternalState(method, "context", context);
final Token<DelegationTokenIdentifier> tok2 = method.deserializeToken
(token.encodeToUrlString(), LOGICAL_NAME);
Assert.assertTrue(HAUtil.isTokenForLogicalUri(tok2));
}
}

View File

@ -22,8 +22,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -33,6 +37,9 @@ import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
public class TestWebHDFSForHA { public class TestWebHDFSForHA {
private static final String LOGICAL_NAME = "minidfs"; private static final String LOGICAL_NAME = "minidfs";
private static final URI WEBHDFS_URI = URI.create(WebHdfsFileSystem.SCHEME + private static final URI WEBHDFS_URI = URI.create(WebHdfsFileSystem.SCHEME +
@ -75,10 +82,10 @@ public class TestWebHDFSForHA {
} }
@Test @Test
public void testSecureHA() throws IOException { public void testSecureHAToken() throws IOException, InterruptedException {
Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME); Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, conf.setBoolean(DFSConfigKeys
true); .DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
WebHdfsFileSystem fs = null; WebHdfsFileSystem fs = null;
@ -89,16 +96,18 @@ public class TestWebHDFSForHA {
HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME); HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME);
cluster.waitActive(); cluster.waitActive();
fs = (WebHdfsFileSystem) FileSystem.get(WEBHDFS_URI, conf); fs = spy((WebHdfsFileSystem) FileSystem.get(WEBHDFS_URI, conf));
FileSystemTestHelper.addFileSystemForTesting(WEBHDFS_URI, conf, fs);
cluster.transitionToActive(0); cluster.transitionToActive(0);
Token<?> token = fs.getDelegationToken(null); Token<?> token = fs.getDelegationToken(null);
cluster.shutdownNameNode(0); cluster.shutdownNameNode(0);
cluster.transitionToActive(1); cluster.transitionToActive(1);
token.renew(conf);
fs.renewDelegationToken(token); token.cancel(conf);
fs.cancelDelegationToken(token); verify(fs).renewDelegationToken(token);
verify(fs).cancelDelegationToken(token);
} finally { } finally {
IOUtils.cleanup(null, fs); IOUtils.cleanup(null, fs);
if (cluster != null) { if (cluster != null) {