HDFS-6616. Add exclude-datanodes feature to WebHDFS redirection so that it will not redirect retries to the same datanode. Contributed by zhaoyunjiong

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1611750 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-07-18 18:20:20 +00:00
parent 551024915d
commit 7c18f8d55b
5 changed files with 237 additions and 43 deletions

View File

@ -304,6 +304,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6700. BlockPlacementPolicy shoud choose storage but not datanode for
deletion. (szetszwo)
HDFS-6616. Add exclude-datanodes feature to WebHDFS redirection so that it
will not redirect retries to the same datanode. (zhaoyunjiong via szetszwo)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -28,6 +28,7 @@ import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import javax.servlet.ServletContext;
@ -84,6 +85,7 @@ import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
import org.apache.hadoop.hdfs.web.resources.DestinationParam;
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
import org.apache.hadoop.hdfs.web.resources.ExcludeDatanodesParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.GroupParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
@ -113,11 +115,13 @@ import org.apache.hadoop.hdfs.web.resources.XAttrValueParam;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
@ -194,9 +198,23 @@ public class NamenodeWebHdfsMethods {
@VisibleForTesting
static DatanodeInfo chooseDatanode(final NameNode namenode,
final String path, final HttpOpParam.Op op, final long openOffset,
final long blocksize) throws IOException {
final long blocksize, final String excludeDatanodes) throws IOException {
final BlockManager bm = namenode.getNamesystem().getBlockManager();
HashSet<Node> excludes = new HashSet<Node>();
if (excludeDatanodes != null) {
for (String host : StringUtils
.getTrimmedStringCollection(excludeDatanodes)) {
int idx = host.indexOf(":");
if (idx != -1) {
excludes.add(bm.getDatanodeManager().getDatanodeByXferAddr(
host.substring(0, idx), Integer.parseInt(host.substring(idx + 1))));
} else {
excludes.add(bm.getDatanodeManager().getDatanodeByHost(host));
}
}
}
if (op == PutOpParam.Op.CREATE) {
//choose a datanode near to client
final DatanodeDescriptor clientNode = bm.getDatanodeManager(
@ -204,7 +222,7 @@ public class NamenodeWebHdfsMethods {
if (clientNode != null) {
final DatanodeStorageInfo[] storages = bm.getBlockPlacementPolicy()
.chooseTarget(path, 1, clientNode,
new ArrayList<DatanodeStorageInfo>(), false, null, blocksize,
new ArrayList<DatanodeStorageInfo>(), false, excludes, blocksize,
// TODO: get storage type from the file
StorageType.DEFAULT);
if (storages.length > 0) {
@ -233,7 +251,7 @@ public class NamenodeWebHdfsMethods {
final LocatedBlocks locations = np.getBlockLocations(path, offset, 1);
final int count = locations.locatedBlockCount();
if (count > 0) {
return bestNode(locations.get(0).getLocations());
return bestNode(locations.get(0).getLocations(), excludes);
}
}
}
@ -247,11 +265,14 @@ public class NamenodeWebHdfsMethods {
* sorted based on availability and network distances, thus it is sufficient
* to return the first element of the node here.
*/
private static DatanodeInfo bestNode(DatanodeInfo[] nodes) throws IOException {
if (nodes.length == 0 || nodes[0].isDecommissioned()) {
throw new IOException("No active nodes contain this block");
private static DatanodeInfo bestNode(DatanodeInfo[] nodes,
HashSet<Node> excludes) throws IOException {
for (DatanodeInfo dn: nodes) {
if (false == dn.isDecommissioned() && false == excludes.contains(dn)) {
return dn;
}
return nodes[0];
}
throw new IOException("No active nodes contain this block");
}
private Token<? extends TokenIdentifier> generateDelegationToken(
@ -270,11 +291,12 @@ public class NamenodeWebHdfsMethods {
final UserGroupInformation ugi, final DelegationParam delegation,
final UserParam username, final DoAsParam doAsUser,
final String path, final HttpOpParam.Op op, final long openOffset,
final long blocksize,
final long blocksize, final String excludeDatanodes,
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
final DatanodeInfo dn;
try {
dn = chooseDatanode(namenode, path, op, openOffset, blocksize);
dn = chooseDatanode(namenode, path, op, openOffset, blocksize,
excludeDatanodes);
} catch (InvalidTopologyException ite) {
throw new IOException("Failed to find datanode, suggest to check cluster health.", ite);
}
@ -361,13 +383,15 @@ public class NamenodeWebHdfsMethods {
@QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
final SnapshotNameParam snapshotName,
@QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT)
final OldSnapshotNameParam oldSnapshotName
final OldSnapshotNameParam oldSnapshotName,
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
return put(ugi, delegation, username, doAsUser, ROOT, op, destination,
owner, group, permission, overwrite, bufferSize, replication,
blockSize, modificationTime, accessTime, renameOptions, createParent,
delegationTokenArgument, aclPermission, xattrName, xattrValue,
xattrSetFlag, snapshotName, oldSnapshotName);
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes);
}
/** Handle HTTP PUT request. */
@ -423,14 +447,16 @@ public class NamenodeWebHdfsMethods {
@QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
final SnapshotNameParam snapshotName,
@QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT)
final OldSnapshotNameParam oldSnapshotName
final OldSnapshotNameParam oldSnapshotName,
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, destination, owner,
group, permission, overwrite, bufferSize, replication, blockSize,
modificationTime, accessTime, renameOptions, delegationTokenArgument,
aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
oldSnapshotName);
oldSnapshotName, excludeDatanodes);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@ -441,7 +467,7 @@ public class NamenodeWebHdfsMethods {
permission, overwrite, bufferSize, replication, blockSize,
modificationTime, accessTime, renameOptions, createParent,
delegationTokenArgument, aclPermission, xattrName, xattrValue,
xattrSetFlag, snapshotName, oldSnapshotName);
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes);
} finally {
reset();
}
@ -474,7 +500,8 @@ public class NamenodeWebHdfsMethods {
final XAttrValueParam xattrValue,
final XAttrSetFlagParam xattrSetFlag,
final SnapshotNameParam snapshotName,
final OldSnapshotNameParam oldSnapshotName
final OldSnapshotNameParam oldSnapshotName,
final ExcludeDatanodesParam exclDatanodes
) throws IOException, URISyntaxException {
final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
@ -484,9 +511,10 @@ public class NamenodeWebHdfsMethods {
switch(op.getValue()) {
case CREATE:
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), -1L, blockSize.getValue(conf),
permission, overwrite, bufferSize, replication, blockSize);
final URI uri = redirectURI(namenode, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf),
exclDatanodes.getValue(), permission, overwrite, bufferSize,
replication, blockSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case MKDIRS:
@ -619,9 +647,12 @@ public class NamenodeWebHdfsMethods {
@QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
final ConcatSourcesParam concatSrcs,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize
final BufferSizeParam bufferSize,
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs, bufferSize);
return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs,
bufferSize, excludeDatanodes);
}
/** Handle HTTP POST request. */
@ -643,17 +674,21 @@ public class NamenodeWebHdfsMethods {
@QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
final ConcatSourcesParam concatSrcs,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize
final BufferSizeParam bufferSize,
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize);
init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize,
excludeDatanodes);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException, URISyntaxException {
try {
return post(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, concatSrcs, bufferSize);
path.getAbsolutePath(), op, concatSrcs, bufferSize,
excludeDatanodes);
} finally {
reset();
}
@ -669,15 +704,17 @@ public class NamenodeWebHdfsMethods {
final String fullpath,
final PostOpParam op,
final ConcatSourcesParam concatSrcs,
final BufferSizeParam bufferSize
final BufferSizeParam bufferSize,
final ExcludeDatanodesParam excludeDatanodes
) throws IOException, URISyntaxException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
switch(op.getValue()) {
case APPEND:
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), -1L, -1L, bufferSize);
final URI uri = redirectURI(namenode, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), -1L, -1L,
excludeDatanodes.getValue(), bufferSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case CONCAT:
@ -715,10 +752,12 @@ public class NamenodeWebHdfsMethods {
@QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT)
final List<XAttrNameParam> xattrNames,
@QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT)
final XAttrEncodingParam xattrEncoding
final XAttrEncodingParam xattrEncoding,
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
return get(ugi, delegation, username, doAsUser, ROOT, op, offset, length,
renewer, bufferSize, xattrNames, xattrEncoding);
renewer, bufferSize, xattrNames, xattrEncoding, excludeDatanodes);
}
/** Handle HTTP GET request. */
@ -747,11 +786,13 @@ public class NamenodeWebHdfsMethods {
@QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT)
final List<XAttrNameParam> xattrNames,
@QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT)
final XAttrEncodingParam xattrEncoding
final XAttrEncodingParam xattrEncoding,
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, offset, length,
renewer, bufferSize, xattrEncoding);
renewer, bufferSize, xattrEncoding, excludeDatanodes);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@ -759,7 +800,7 @@ public class NamenodeWebHdfsMethods {
try {
return get(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, offset, length, renewer, bufferSize,
xattrNames, xattrEncoding);
xattrNames, xattrEncoding, excludeDatanodes);
} finally {
reset();
}
@ -779,7 +820,8 @@ public class NamenodeWebHdfsMethods {
final RenewerParam renewer,
final BufferSizeParam bufferSize,
final List<XAttrNameParam> xattrNames,
final XAttrEncodingParam xattrEncoding
final XAttrEncodingParam xattrEncoding,
final ExcludeDatanodesParam excludeDatanodes
) throws IOException, URISyntaxException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final NamenodeProtocols np = getRPCServer(namenode);
@ -787,8 +829,9 @@ public class NamenodeWebHdfsMethods {
switch(op.getValue()) {
case OPEN:
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), offset.getValue(), -1L, offset, length, bufferSize);
final URI uri = redirectURI(namenode, ugi, delegation, username,
doAsUser, fullpath, op.getValue(), offset.getValue(), -1L,
excludeDatanodes.getValue(), offset, length, bufferSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case GET_BLOCK_LOCATIONS:
@ -824,7 +867,7 @@ public class NamenodeWebHdfsMethods {
case GETFILECHECKSUM:
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), -1L, -1L);
fullpath, op.getValue(), -1L, -1L, null);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case GETDELEGATIONTOKEN:

View File

@ -448,6 +448,7 @@ public class WebHdfsFileSystem extends FileSystem
protected final HttpOpParam.Op op;
private final boolean redirected;
protected ExcludeDatanodesParam excludeDatanodes = new ExcludeDatanodesParam("");
private boolean checkRetry;
@ -499,6 +500,10 @@ public class WebHdfsFileSystem extends FileSystem
* a DN such as open and checksum
*/
private HttpURLConnection connect(URL url) throws IOException {
//redirect hostname and port
String redirectHost = null;
// resolve redirects for a DN operation unless already resolved
if (op.getRedirect() && !redirected) {
final HttpOpParam.Op redirectOp =
@ -511,11 +516,24 @@ public class WebHdfsFileSystem extends FileSystem
try {
validateResponse(redirectOp, conn, false);
url = new URL(conn.getHeaderField("Location"));
redirectHost = url.getHost() + ":" + url.getPort();
} finally {
conn.disconnect();
}
}
try {
return connect(op, url);
} catch (IOException ioe) {
if (redirectHost != null) {
if (excludeDatanodes.getValue() != null) {
excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
+ excludeDatanodes.getValue());
} else {
excludeDatanodes = new ExcludeDatanodesParam(redirectHost);
}
}
throw ioe;
}
}
private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
@ -652,9 +670,16 @@ public class WebHdfsFileSystem extends FileSystem
@Override
protected URL getUrl() throws IOException {
if (excludeDatanodes.getValue() != null) {
Param<?, ?>[] tmpParam = new Param<?, ?>[parameters.length + 1];
System.arraycopy(parameters, 0, tmpParam, 0, parameters.length);
tmpParam[parameters.length] = excludeDatanodes;
return toUrl(op, fspath, tmpParam);
} else {
return toUrl(op, fspath, parameters);
}
}
}
/**
* Default path-based implementation expects no json response

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web.resources;
/** Exclude datanodes param */
public class ExcludeDatanodesParam extends StringParam {
/** Parameter name. */
public static final String NAME = "excludedatanodes";
/** Default parameter value. */
public static final String DEFAULT = "";
private static final Domain DOMAIN = new Domain(NAME, null);
/**
* Constructor.
* @param str a string representation of the parameter value.
*/
public ExcludeDatanodesParam(final String str) {
super(DOMAIN, str == null || str.equals(DEFAULT)? null: DOMAIN.parse(str));
}
@Override
public String getName() {
return NAME;
}
}

View File

@ -92,7 +92,7 @@ public class TestWebHdfsDataLocality {
//The chosen datanode must be the same as the client address
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, PutOpParam.Op.CREATE, -1L, blocksize);
namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null);
Assert.assertEquals(ipAddr, chosen.getIpAddr());
}
}
@ -117,23 +117,104 @@ public class TestWebHdfsDataLocality {
{ //test GETFILECHECKSUM
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize);
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null);
Assert.assertEquals(expected, chosen);
}
{ //test OPEN
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.OPEN, 0, blocksize);
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null);
Assert.assertEquals(expected, chosen);
}
{ //test APPEND
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, PostOpParam.Op.APPEND, -1L, blocksize);
namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null);
Assert.assertEquals(expected, chosen);
}
} finally {
cluster.shutdown();
}
}
@Test
public void testExcludeDataNodes() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
final String[] racks = {RACK0, RACK0, RACK1, RACK1, RACK2, RACK2};
final String[] hosts = {"DataNode1", "DataNode2", "DataNode3","DataNode4","DataNode5","DataNode6"};
final int nDataNodes = hosts.length;
LOG.info("nDataNodes=" + nDataNodes + ", racks=" + Arrays.asList(racks)
+ ", hosts=" + Arrays.asList(hosts));
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.hosts(hosts).numDataNodes(nDataNodes).racks(racks).build();
try {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final NameNode namenode = cluster.getNameNode();
final DatanodeManager dm = namenode.getNamesystem().getBlockManager(
).getDatanodeManager();
LOG.info("dm=" + dm);
final long blocksize = DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
final String f = "/foo";
//create a file with three replica.
final Path p = new Path(f);
final FSDataOutputStream out = dfs.create(p, (short)3);
out.write(1);
out.close();
//get replica location.
final LocatedBlocks locatedblocks = NameNodeAdapter.getBlockLocations(
namenode, f, 0, 1);
final List<LocatedBlock> lb = locatedblocks.getLocatedBlocks();
Assert.assertEquals(1, lb.size());
final DatanodeInfo[] locations = lb.get(0).getLocations();
Assert.assertEquals(3, locations.length);
//For GETFILECHECKSUM, OPEN and APPEND,
//the chosen datanode must be different with exclude nodes.
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 2; i++) {
sb.append(locations[i].getXferAddr());
{ // test GETFILECHECKSUM
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize,
sb.toString());
for (int j = 0; j <= i; j++) {
Assert.assertNotEquals(locations[j].getHostName(),
chosen.getHostName());
}
}
{ // test OPEN
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString());
for (int j = 0; j <= i; j++) {
Assert.assertNotEquals(locations[j].getHostName(),
chosen.getHostName());
}
}
{ // test APPEND
final DatanodeInfo chosen = NamenodeWebHdfsMethods
.chooseDatanode(namenode, f, PostOpParam.Op.APPEND, -1L,
blocksize, sb.toString());
for (int j = 0; j <= i; j++) {
Assert.assertNotEquals(locations[j].getHostName(),
chosen.getHostName());
}
}
sb.append(",");
}
} finally {
cluster.shutdown();
}
}
}