From 6ba16566283be26b7a814fe644661f6947cc9592 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Wed, 19 Mar 2014 17:33:51 +0000 Subject: [PATCH] HDFS-6100. Merge r1579301 from trunk. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1579303 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../web/resources/DatanodeWebHdfsMethods.java | 145 ++++++++++-------- .../hadoop/hdfs/server/namenode/NameNode.java | 17 +- .../web/resources/NamenodeWebHdfsMethods.java | 4 +- .../web/resources/InetSocketAddressParam.java | 93 ----------- ...ssParam.java => NamenodeAddressParam.java} | 10 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 20 ++- .../hadoop/hdfs/web/TestWebHDFSForHA.java | 71 ++++++--- .../web/TestWebHdfsFileSystemContract.java | 15 +- 9 files changed, 185 insertions(+), 193 deletions(-) delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java rename hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/{NamenodeRpcAddressParam.java => NamenodeAddressParam.java} (83%) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 9fa093b4cfb..da393c6bc53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -405,6 +405,9 @@ Release 2.4.0 - UNRELEASED HDFS-6099. HDFS file system limits not enforced on renames. (cnauroth) + HDFS-6100. DataNodeWebHdfsMethods does not failover in HA mode. (Haohui Mai + via jing9) + BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java index 6e8d605d767..ea0a08f6957 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java @@ -19,13 +19,13 @@ package org.apache.hadoop.hdfs.server.datanode.web.resources; import java.io.IOException; import java.io.InputStream; -import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; import java.util.EnumSet; import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; @@ -40,6 +40,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -48,12 +49,14 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.web.JsonUtil; import org.apache.hadoop.hdfs.web.ParamFilter; +import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.resources.BlockSizeParam; import org.apache.hadoop.hdfs.web.resources.BufferSizeParam; @@ -61,7 +64,7 @@ import org.apache.hadoop.hdfs.web.resources.DelegationParam; import org.apache.hadoop.hdfs.web.resources.GetOpParam; import org.apache.hadoop.hdfs.web.resources.HttpOpParam; import org.apache.hadoop.hdfs.web.resources.LengthParam; -import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam; +import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam; import org.apache.hadoop.hdfs.web.resources.OffsetParam; import org.apache.hadoop.hdfs.web.resources.OverwriteParam; import org.apache.hadoop.hdfs.web.resources.Param; @@ -71,6 +74,7 @@ import org.apache.hadoop.hdfs.web.resources.PutOpParam; import org.apache.hadoop.hdfs.web.resources.ReplicationParam; import org.apache.hadoop.hdfs.web.resources.UriFsPathParam; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -86,18 +90,19 @@ public class DatanodeWebHdfsMethods { private static final UriFsPathParam ROOT = new UriFsPathParam(""); private @Context ServletContext context; + private @Context HttpServletRequest request; private @Context HttpServletResponse response; private void init(final UserGroupInformation ugi, - final DelegationParam delegation, final InetSocketAddress nnRpcAddr, + final DelegationParam delegation, final String nnId, final UriFsPathParam path, final HttpOpParam op, final Param... parameters) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path + ", ugi=" + ugi + Param.toSortedString(", ", parameters)); } - if (nnRpcAddr == null) { - throw new IllegalArgumentException(NamenodeRpcAddressParam.NAME + if (nnId == null) { + throw new IllegalArgumentException(NamenodeAddressParam.NAME + " is not specified."); } @@ -106,15 +111,32 @@ public class DatanodeWebHdfsMethods { if (UserGroupInformation.isSecurityEnabled()) { //add a token for RPC. - final Token token = - new Token(); - token.decodeFromUrlString(delegation.getValue()); - SecurityUtil.setTokenService(token, nnRpcAddr); - token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND); + final Token token = deserializeToken + (delegation.getValue(), nnId); ugi.addToken(token); } } + @VisibleForTesting + Token deserializeToken + (String delegation,String nnId) throws IOException { + final DataNode datanode = (DataNode) context.getAttribute("datanode"); + final Configuration conf = datanode.getConf(); + final Token token = new + Token(); + token.decodeFromUrlString(delegation); + URI nnUri = URI.create(HdfsConstants.HDFS_URI_SCHEME + + "://" + nnId); + boolean isHA = HAUtil.isLogicalUri(conf, nnUri); + if (isHA) { + token.setService(HAUtil.buildTokenServiceForLogicalUri(nnUri)); + } else { + token.setService(SecurityUtil.buildTokenService(nnUri)); + } + token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND); + return token; + } + /** Handle HTTP PUT request for the root. */ @PUT @Path("/") @@ -125,9 +147,9 @@ public class DatanodeWebHdfsMethods { @Context final UserGroupInformation ugi, @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) final DelegationParam delegation, - @QueryParam(NamenodeRpcAddressParam.NAME) - @DefaultValue(NamenodeRpcAddressParam.DEFAULT) - final NamenodeRpcAddressParam namenodeRpcAddress, + @QueryParam(NamenodeAddressParam.NAME) + @DefaultValue(NamenodeAddressParam.DEFAULT) + final NamenodeAddressParam namenode, @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT) final PutOpParam op, @QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT) @@ -141,8 +163,8 @@ public class DatanodeWebHdfsMethods { @QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT) final BlockSizeParam blockSize ) throws IOException, InterruptedException { - return put(in, ugi, delegation, namenodeRpcAddress, ROOT, op, permission, - overwrite, bufferSize, replication, blockSize); + return put(in, ugi, delegation, namenode, ROOT, op, permission, + overwrite, bufferSize, replication, blockSize); } /** Handle HTTP PUT request. */ @@ -155,9 +177,9 @@ public class DatanodeWebHdfsMethods { @Context final UserGroupInformation ugi, @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) final DelegationParam delegation, - @QueryParam(NamenodeRpcAddressParam.NAME) - @DefaultValue(NamenodeRpcAddressParam.DEFAULT) - final NamenodeRpcAddressParam namenodeRpcAddress, + @QueryParam(NamenodeAddressParam.NAME) + @DefaultValue(NamenodeAddressParam.DEFAULT) + final NamenodeAddressParam namenode, @PathParam(UriFsPathParam.NAME) final UriFsPathParam path, @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT) final PutOpParam op, @@ -173,24 +195,22 @@ public class DatanodeWebHdfsMethods { final BlockSizeParam blockSize ) throws IOException, InterruptedException { - final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue(); - init(ugi, delegation, nnRpcAddr, path, op, permission, + final String nnId = namenode.getValue(); + init(ugi, delegation, nnId, path, op, permission, overwrite, bufferSize, replication, blockSize); return ugi.doAs(new PrivilegedExceptionAction() { @Override public Response run() throws IOException, URISyntaxException { - return put(in, ugi, delegation, nnRpcAddr, path.getAbsolutePath(), op, - permission, overwrite, bufferSize, replication, blockSize); + return put(in, nnId, path.getAbsolutePath(), op, + permission, overwrite, bufferSize, replication, blockSize); } }); } private Response put( final InputStream in, - final UserGroupInformation ugi, - final DelegationParam delegation, - final InetSocketAddress nnRpcAddr, + final String nnId, final String fullpath, final PutOpParam op, final PermissionParam permission, @@ -208,7 +228,7 @@ public class DatanodeWebHdfsMethods { conf.set(FsPermission.UMASK_LABEL, "000"); final int b = bufferSize.getValue(conf); - DFSClient dfsclient = new DFSClient(nnRpcAddr, conf); + DFSClient dfsclient = newDfsClient(nnId, conf); FSDataOutputStream out = null; try { out = new FSDataOutputStream(dfsclient.create( @@ -225,9 +245,10 @@ public class DatanodeWebHdfsMethods { IOUtils.cleanup(LOG, out); IOUtils.cleanup(LOG, dfsclient); } - final InetSocketAddress nnHttpAddr = NameNode.getHttpAddress(conf); - final URI uri = new URI(WebHdfsFileSystem.SCHEME, null, - nnHttpAddr.getHostName(), nnHttpAddr.getPort(), fullpath, null, null); + final String scheme = "http".equals(request.getScheme()) ? + WebHdfsFileSystem.SCHEME : SWebHdfsFileSystem.SCHEME; + final URI uri = URI.create(String.format("%s://%s/%s", scheme, + nnId, fullpath)); return Response.created(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } default: @@ -245,15 +266,15 @@ public class DatanodeWebHdfsMethods { @Context final UserGroupInformation ugi, @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) final DelegationParam delegation, - @QueryParam(NamenodeRpcAddressParam.NAME) - @DefaultValue(NamenodeRpcAddressParam.DEFAULT) - final NamenodeRpcAddressParam namenodeRpcAddress, + @QueryParam(NamenodeAddressParam.NAME) + @DefaultValue(NamenodeAddressParam.DEFAULT) + final NamenodeAddressParam namenode, @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT) final PostOpParam op, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) final BufferSizeParam bufferSize ) throws IOException, InterruptedException { - return post(in, ugi, delegation, namenodeRpcAddress, ROOT, op, bufferSize); + return post(in, ugi, delegation, namenode, ROOT, op, bufferSize); } /** Handle HTTP POST request. */ @@ -266,9 +287,9 @@ public class DatanodeWebHdfsMethods { @Context final UserGroupInformation ugi, @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) final DelegationParam delegation, - @QueryParam(NamenodeRpcAddressParam.NAME) - @DefaultValue(NamenodeRpcAddressParam.DEFAULT) - final NamenodeRpcAddressParam namenodeRpcAddress, + @QueryParam(NamenodeAddressParam.NAME) + @DefaultValue(NamenodeAddressParam.DEFAULT) + final NamenodeAddressParam namenode, @PathParam(UriFsPathParam.NAME) final UriFsPathParam path, @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT) final PostOpParam op, @@ -276,23 +297,21 @@ public class DatanodeWebHdfsMethods { final BufferSizeParam bufferSize ) throws IOException, InterruptedException { - final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue(); - init(ugi, delegation, nnRpcAddr, path, op, bufferSize); + final String nnId = namenode.getValue(); + init(ugi, delegation, nnId, path, op, bufferSize); return ugi.doAs(new PrivilegedExceptionAction() { @Override public Response run() throws IOException { - return post(in, ugi, delegation, nnRpcAddr, path.getAbsolutePath(), op, - bufferSize); + return post(in, nnId, path.getAbsolutePath(), op, + bufferSize); } }); } private Response post( final InputStream in, - final UserGroupInformation ugi, - final DelegationParam delegation, - final InetSocketAddress nnRpcAddr, + final String nnId, final String fullpath, final PostOpParam op, final BufferSizeParam bufferSize @@ -304,7 +323,7 @@ public class DatanodeWebHdfsMethods { { final Configuration conf = new Configuration(datanode.getConf()); final int b = bufferSize.getValue(conf); - DFSClient dfsclient = new DFSClient(nnRpcAddr, conf); + DFSClient dfsclient = newDfsClient(nnId, conf); FSDataOutputStream out = null; try { out = dfsclient.append(fullpath, b, null, null); @@ -332,9 +351,9 @@ public class DatanodeWebHdfsMethods { @Context final UserGroupInformation ugi, @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) final DelegationParam delegation, - @QueryParam(NamenodeRpcAddressParam.NAME) - @DefaultValue(NamenodeRpcAddressParam.DEFAULT) - final NamenodeRpcAddressParam namenodeRpcAddress, + @QueryParam(NamenodeAddressParam.NAME) + @DefaultValue(NamenodeAddressParam.DEFAULT) + final NamenodeAddressParam namenode, @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT) final GetOpParam op, @QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT) @@ -344,7 +363,7 @@ public class DatanodeWebHdfsMethods { @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) final BufferSizeParam bufferSize ) throws IOException, InterruptedException { - return get(ugi, delegation, namenodeRpcAddress, ROOT, op, offset, length, + return get(ugi, delegation, namenode, ROOT, op, offset, length, bufferSize); } @@ -356,9 +375,9 @@ public class DatanodeWebHdfsMethods { @Context final UserGroupInformation ugi, @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) final DelegationParam delegation, - @QueryParam(NamenodeRpcAddressParam.NAME) - @DefaultValue(NamenodeRpcAddressParam.DEFAULT) - final NamenodeRpcAddressParam namenodeRpcAddress, + @QueryParam(NamenodeAddressParam.NAME) + @DefaultValue(NamenodeAddressParam.DEFAULT) + final NamenodeAddressParam namenode, @PathParam(UriFsPathParam.NAME) final UriFsPathParam path, @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT) final GetOpParam op, @@ -370,22 +389,20 @@ public class DatanodeWebHdfsMethods { final BufferSizeParam bufferSize ) throws IOException, InterruptedException { - final InetSocketAddress nnRpcAddr = namenodeRpcAddress.getValue(); - init(ugi, delegation, nnRpcAddr, path, op, offset, length, bufferSize); + final String nnId = namenode.getValue(); + init(ugi, delegation, nnId, path, op, offset, length, bufferSize); return ugi.doAs(new PrivilegedExceptionAction() { @Override public Response run() throws IOException { - return get(ugi, delegation, nnRpcAddr, path.getAbsolutePath(), op, - offset, length, bufferSize); + return get(nnId, path.getAbsolutePath(), op, offset, + length, bufferSize); } }); } private Response get( - final UserGroupInformation ugi, - final DelegationParam delegation, - final InetSocketAddress nnRpcAddr, + final String nnId, final String fullpath, final GetOpParam op, final OffsetParam offset, @@ -399,7 +416,7 @@ public class DatanodeWebHdfsMethods { case OPEN: { final int b = bufferSize.getValue(conf); - final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf); + final DFSClient dfsclient = newDfsClient(nnId, conf); HdfsDataInputStream in = null; try { in = new HdfsDataInputStream(dfsclient.open(fullpath, b, true)); @@ -426,7 +443,7 @@ public class DatanodeWebHdfsMethods { case GETFILECHECKSUM: { MD5MD5CRC32FileChecksum checksum = null; - DFSClient dfsclient = new DFSClient(nnRpcAddr, conf); + DFSClient dfsclient = newDfsClient(nnId, conf); try { checksum = dfsclient.getFileChecksum(fullpath); dfsclient.close(); @@ -441,4 +458,10 @@ public class DatanodeWebHdfsMethods { throw new UnsupportedOperationException(op + " is not supported"); } } + + private static DFSClient newDfsClient(String nnId, + Configuration conf) throws IOException { + URI uri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://" + nnId); + return new DFSClient(uri, conf); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 03a0e537088..06e3f1e2f3b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -126,7 +126,7 @@ public class NameNode implements NameNodeStatusMXBean { static{ HdfsConfiguration.init(); } - + /** * Categories of operations supported by the namenode. */ @@ -269,6 +269,11 @@ public class NameNode implements NameNodeStatusMXBean { private JvmPauseMonitor pauseMonitor; private ObjectName nameNodeStatusBeanName; + /** + * The service name of the delegation token issued by the namenode. It is + * the name service id in HA mode, or the rpc address in non-HA mode. + */ + private String tokenServiceName; /** Format a new filesystem. Destroys any filesystem that may already * exist at this location. **/ @@ -306,6 +311,13 @@ public class NameNode implements NameNodeStatusMXBean { return startupProgress; } + /** + * Return the service name of the issued delegation token. + * + * @return The name service id in HA-mode, or the rpc address in non-HA mode + */ + public String getTokenServiceName() { return tokenServiceName; } + public static InetSocketAddress getAddress(String address) { return NetUtils.createSocketAddr(address, DEFAULT_PORT); } @@ -499,6 +511,9 @@ public class NameNode implements NameNodeStatusMXBean { loadNamesystem(conf); rpcServer = createRpcServer(conf); + final String nsId = getNameServiceId(conf); + tokenServiceName = HAUtil.isHAEnabled(conf, nsId) ? nsId : NetUtils + .getHostPortString(rpcServer.getRpcAddress()); if (NamenodeRole.NAMENODE == role) { httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setFSImage(getFSImage()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java index c163ebaca1d..5005dfc805b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java @@ -86,7 +86,7 @@ import org.apache.hadoop.hdfs.web.resources.GroupParam; import org.apache.hadoop.hdfs.web.resources.HttpOpParam; import org.apache.hadoop.hdfs.web.resources.LengthParam; import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam; -import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam; +import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam; import org.apache.hadoop.hdfs.web.resources.OffsetParam; import org.apache.hadoop.hdfs.web.resources.OverwriteParam; import org.apache.hadoop.hdfs.web.resources.OwnerParam; @@ -275,7 +275,7 @@ public class NamenodeWebHdfsMethods { delegationQuery = "&" + new DelegationParam(t.encodeToUrlString()); } final String query = op.toQueryString() + delegationQuery - + "&" + new NamenodeRpcAddressParam(namenode) + + "&" + new NamenodeAddressParam(namenode) + Param.toSortedString("&", parameters); final String uripath = WebHdfsFileSystem.PATH_PREFIX + path; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java deleted file mode 100644 index f7c09d17641..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/InetSocketAddressParam.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.web.resources; - -import java.net.InetSocketAddress; - -/** InetSocketAddressParam parameter. */ -abstract class InetSocketAddressParam - extends Param { - InetSocketAddressParam(final Domain domain, final InetSocketAddress value) { - super(domain, value); - } - - @Override - public String toString() { - return getName() + "=" + Domain.toString(getValue()); - } - - /** @return the parameter value as a string */ - @Override - public String getValueString() { - return Domain.toString(getValue()); - } - - /** The domain of the parameter. */ - static final class Domain extends Param.Domain { - Domain(final String paramName) { - super(paramName); - } - - @Override - public String getDomain() { - return ""; - } - - @Override - InetSocketAddress parse(final String str) { - if (str == null) { - throw new IllegalArgumentException("The input string is null: expect " - + getDomain()); - } - final int i = str.indexOf(':'); - if (i < 0) { - throw new IllegalArgumentException("Failed to parse \"" + str - + "\" as " + getDomain() + ": the ':' character not found."); - } else if (i == 0) { - throw new IllegalArgumentException("Failed to parse \"" + str - + "\" as " + getDomain() + ": HOST is empty."); - } else if (i == str.length() - 1) { - throw new IllegalArgumentException("Failed to parse \"" + str - + "\" as " + getDomain() + ": PORT is empty."); - } - - final String host = str.substring(0, i); - final int port; - try { - port = Integer.parseInt(str.substring(i + 1)); - } catch(NumberFormatException e) { - throw new IllegalArgumentException("Failed to parse \"" + str - + "\" as " + getDomain() + ": the ':' position is " + i - + " but failed to parse PORT.", e); - } - - try { - return new InetSocketAddress(host, port); - } catch(Exception e) { - throw new IllegalArgumentException("Failed to parse \"" + str - + "\": cannot create InetSocketAddress(host=" + host - + ", port=" + port + ")", e); - } - } - - /** Convert an InetSocketAddress to a HOST:PORT String. */ - static String toString(final InetSocketAddress addr) { - return addr.getHostName() + ":" + addr.getPort(); - } - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeRpcAddressParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeAddressParam.java similarity index 83% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeRpcAddressParam.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeAddressParam.java index 431454c93d4..6ca95731a86 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeRpcAddressParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/NamenodeAddressParam.java @@ -20,27 +20,27 @@ package org.apache.hadoop.hdfs.web.resources; import org.apache.hadoop.hdfs.server.namenode.NameNode; /** Namenode RPC address parameter. */ -public class NamenodeRpcAddressParam extends InetSocketAddressParam { +public class NamenodeAddressParam extends StringParam { /** Parameter name. */ public static final String NAME = "namenoderpcaddress"; /** Default parameter value. */ public static final String DEFAULT = ""; - private static final Domain DOMAIN = new Domain(NAME); + private static final Domain DOMAIN = new Domain(NAME, null); /** * Constructor. * @param str a string representation of the parameter value. */ - public NamenodeRpcAddressParam(final String str) { + public NamenodeAddressParam(final String str) { super(DOMAIN, str == null || str.equals(DEFAULT)? null: DOMAIN.parse(str)); } /** * Construct an object using the RPC address of the given namenode. */ - public NamenodeRpcAddressParam(final NameNode namenode) { - super(DOMAIN, namenode.getNameNodeAddress()); + public NamenodeAddressParam(final NameNode namenode) { + super(DOMAIN, namenode.getTokenServiceName()); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 3ac14e86c08..b136a5e9910 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -53,8 +53,11 @@ import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.ha + .ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.web.TestWebHDFSForHA; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.NetUtils; @@ -136,7 +139,22 @@ public class DFSTestUtil { NameNode.format(conf); } - + + /** + * Create a new HA-enabled configuration. + */ + public static Configuration newHAConfiguration(final String logicalName) { + Configuration conf = new Configuration(); + conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName); + conf.set(DFSUtil.addKeySuffixes(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX, + logicalName), "nn1,nn2"); + conf.set(DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "" + + "." + logicalName, + ConfiguredFailoverProxyProvider.class.getName()); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + return conf; + } + /** class MyFile contains enough information to recreate the contents of * a single file. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSForHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSForHA.java index 55caafbce78..8c2b5c05c63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSForHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSForHA.java @@ -18,23 +18,25 @@ package org.apache.hadoop.hdfs.web; -import java.io.IOException; -import java.net.URI; - import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.token.Token; import org.junit.Assert; import org.junit.Test; -/** Test whether WebHDFS can connect to an HA cluster */ +import java.io.IOException; +import java.net.URI; + public class TestWebHDFSForHA { private static final String LOGICAL_NAME = "minidfs"; + private static final URI WEBHDFS_URI = URI.create(WebHdfsFileSystem.SCHEME + + "://" + LOGICAL_NAME); private static final MiniDFSNNTopology topo = new MiniDFSNNTopology() .addNameservice(new MiniDFSNNTopology.NSConf(LOGICAL_NAME).addNN( new MiniDFSNNTopology.NNConf("nn1")).addNN( @@ -42,8 +44,7 @@ public class TestWebHDFSForHA { @Test public void testHA() throws IOException { - Configuration conf = new Configuration(); - conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME); MiniDFSCluster cluster = null; FileSystem fs = null; try { @@ -54,8 +55,7 @@ public class TestWebHDFSForHA { cluster.waitActive(); - final String uri = WebHdfsFileSystem.SCHEME + "://" + LOGICAL_NAME; - fs = FileSystem.get(URI.create(uri), conf); + fs = FileSystem.get(WEBHDFS_URI, conf); cluster.transitionToActive(0); final Path dir = new Path("/test"); @@ -67,9 +67,7 @@ public class TestWebHDFSForHA { final Path dir2 = new Path("/test2"); Assert.assertTrue(fs.mkdirs(dir2)); } finally { - if (fs != null) { - fs.close(); - } + IOUtils.cleanup(null, fs); if (cluster != null) { cluster.shutdown(); } @@ -78,10 +76,9 @@ public class TestWebHDFSForHA { @Test public void testSecureHA() throws IOException { - Configuration conf = new Configuration(); + Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true); - conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); MiniDFSCluster cluster = null; WebHdfsFileSystem fs = null; @@ -92,8 +89,7 @@ public class TestWebHDFSForHA { HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME); cluster.waitActive(); - final String uri = WebHdfsFileSystem.SCHEME + "://" + LOGICAL_NAME; - fs = (WebHdfsFileSystem) FileSystem.get(URI.create(uri), conf); + fs = (WebHdfsFileSystem) FileSystem.get(WEBHDFS_URI, conf); cluster.transitionToActive(0); Token token = fs.getDelegationToken(null); @@ -104,9 +100,44 @@ public class TestWebHDFSForHA { fs.renewDelegationToken(token); fs.cancelDelegationToken(token); } finally { - if (fs != null) { - fs.close(); + IOUtils.cleanup(null, fs); + if (cluster != null) { + cluster.shutdown(); } + } + } + + @Test + public void testFailoverAfterOpen() throws IOException { + Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME); + MiniDFSCluster cluster = null; + FileSystem fs = null; + final Path p = new Path("/test"); + final byte[] data = "Hello".getBytes(); + + try { + cluster = new MiniDFSCluster.Builder(conf).nnTopology(topo) + .numDataNodes(1).build(); + + HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME); + + cluster.waitActive(); + + fs = FileSystem.get(WEBHDFS_URI, conf); + cluster.transitionToActive(1); + + FSDataOutputStream out = fs.create(p); + cluster.shutdownNameNode(1); + cluster.transitionToActive(0); + + out.write(data); + out.close(); + FSDataInputStream in = fs.open(p); + byte[] buf = new byte[data.length]; + IOUtils.readFully(in, buf, 0, buf.length); + Assert.assertArrayEquals(data, buf); + } finally { + IOUtils.cleanup(null, fs); if (cluster != null) { cluster.shutdown(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java index 687eddc8da3..da23a593da1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java @@ -43,13 +43,8 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.AppendTestUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.web.resources.DoAsParam; -import org.apache.hadoop.hdfs.web.resources.GetOpParam; -import org.apache.hadoop.hdfs.web.resources.HttpOpParam; -import org.apache.hadoop.hdfs.web.resources.LengthParam; -import org.apache.hadoop.hdfs.web.resources.NamenodeRpcAddressParam; -import org.apache.hadoop.hdfs.web.resources.OffsetParam; -import org.apache.hadoop.hdfs.web.resources.PutOpParam; +import org.apache.hadoop.hdfs.web.resources.*; +import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; @@ -465,7 +460,7 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest { AppendTestUtil.testAppend(fs, new Path(dir, "append")); } - {//test NamenodeRpcAddressParam not set. + {//test NamenodeAddressParam not set. final HttpOpParam.Op op = PutOpParam.Op.CREATE; final URL url = webhdfs.toUrl(op, dir); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); @@ -476,9 +471,9 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest { final String redirect = conn.getHeaderField("Location"); conn.disconnect(); - //remove NamenodeRpcAddressParam + //remove NamenodeAddressParam WebHdfsFileSystem.LOG.info("redirect = " + redirect); - final int i = redirect.indexOf(NamenodeRpcAddressParam.NAME); + final int i = redirect.indexOf(NamenodeAddressParam.NAME); final int j = redirect.indexOf("&", i); String modified = redirect.substring(0, i - 1) + redirect.substring(j); WebHdfsFileSystem.LOG.info("modified = " + modified);