HDFS-5891. webhdfs should not try connecting the DN during redirection. Contributed by Haohui Mai

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1567810 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Brandon Li 2014-02-12 23:57:35 +00:00
parent c6e2f4f07f
commit 256adb2106
5 changed files with 70 additions and 68 deletions

View File

@ -425,6 +425,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5938. Make BlockReaderFactory#BlockReaderPeer a static class to avoid HDFS-5938. Make BlockReaderFactory#BlockReaderPeer a static class to avoid
a findbugs warning. (cmccabe) a findbugs warning. (cmccabe)
HDFS-5891. webhdfs should not try connecting the DN during redirection
(Haohui Mai via brandonli)
Release 2.3.1 - UNRELEASED Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -18,7 +18,26 @@
package org.apache.hadoop.hdfs.server.common; package org.apache.hadoop.hdfs.server.common;
import com.google.common.base.Charsets; import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URL;
import java.net.URLEncoder;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.jsp.JspWriter;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -33,7 +52,11 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.RemotePeerFactory; import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@ -56,22 +79,7 @@ import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
import javax.servlet.ServletContext; import com.google.common.base.Charsets;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.jsp.JspWriter;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URL;
import java.net.URLEncoder;
import java.util.*;
import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
@InterfaceAudience.Private @InterfaceAudience.Private
public class JspHelper { public class JspHelper {
@ -171,58 +179,31 @@ public class JspHelper {
} }
NodeRecord[] nodes = map.values().toArray(new NodeRecord[map.size()]); NodeRecord[] nodes = map.values().toArray(new NodeRecord[map.size()]);
Arrays.sort(nodes, new NodeRecordComparator()); Arrays.sort(nodes, new NodeRecordComparator());
return bestNode(nodes, false, conf); return bestNode(nodes, false);
} }
public static DatanodeInfo bestNode(LocatedBlock blk, Configuration conf) public static DatanodeInfo bestNode(LocatedBlock blk, Configuration conf)
throws IOException { throws IOException {
DatanodeInfo[] nodes = blk.getLocations(); DatanodeInfo[] nodes = blk.getLocations();
return bestNode(nodes, true, conf); return bestNode(nodes, true);
} }
public static DatanodeInfo bestNode(DatanodeInfo[] nodes, boolean doRandom, private static DatanodeInfo bestNode(DatanodeInfo[] nodes, boolean doRandom)
Configuration conf) throws IOException { throws IOException {
TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
DatanodeInfo chosenNode = null;
int failures = 0;
Socket s = null;
int index = -1;
if (nodes == null || nodes.length == 0) { if (nodes == null || nodes.length == 0) {
throw new IOException("No nodes contain this block"); throw new IOException("No nodes contain this block");
} }
while (s == null) { int l = 0;
if (chosenNode == null) { while (l < nodes.length && !nodes[l].isDecommissioned()) {
do { ++l;
if (doRandom) {
index = DFSUtil.getRandom().nextInt(nodes.length);
} else {
index++;
}
chosenNode = nodes[index];
} while (deadNodes.contains(chosenNode));
}
chosenNode = nodes[index];
//just ping to check whether the node is alive
InetSocketAddress targetAddr = NetUtils.createSocketAddr(
chosenNode.getInfoAddr());
try {
s = NetUtils.getDefaultSocketFactory(conf).createSocket();
s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
} catch (IOException e) {
deadNodes.add(chosenNode);
IOUtils.closeSocket(s);
s = null;
failures++;
}
if (failures == nodes.length)
throw new IOException("Could not reach the block containing the data. Please try again");
} }
s.close();
return chosenNode; if (l == 0) {
throw new IOException("No active nodes contain this block");
}
int index = doRandom ? DFSUtil.getRandom().nextInt(l) : 0;
return nodes[index];
} }
public static void streamBlockInAscii(InetSocketAddress addr, String poolId, public static void streamBlockInAscii(InetSocketAddress addr, String poolId,

View File

@ -107,6 +107,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.sun.jersey.spi.container.ResourceFilters; import com.sun.jersey.spi.container.ResourceFilters;
@ -160,9 +161,10 @@ public class NamenodeWebHdfsMethods {
response.setContentType(null); response.setContentType(null);
} }
@VisibleForTesting
static DatanodeInfo chooseDatanode(final NameNode namenode, static DatanodeInfo chooseDatanode(final NameNode namenode,
final String path, final HttpOpParam.Op op, final long openOffset, final String path, final HttpOpParam.Op op, final long openOffset,
final long blocksize, final Configuration conf) throws IOException { final long blocksize) throws IOException {
final BlockManager bm = namenode.getNamesystem().getBlockManager(); final BlockManager bm = namenode.getNamesystem().getBlockManager();
if (op == PutOpParam.Op.CREATE) { if (op == PutOpParam.Op.CREATE) {
@ -201,7 +203,7 @@ public class NamenodeWebHdfsMethods {
final LocatedBlocks locations = np.getBlockLocations(path, offset, 1); final LocatedBlocks locations = np.getBlockLocations(path, offset, 1);
final int count = locations.locatedBlockCount(); final int count = locations.locatedBlockCount();
if (count > 0) { if (count > 0) {
return JspHelper.bestNode(locations.get(0).getLocations(), false, conf); return bestNode(locations.get(0).getLocations());
} }
} }
} }
@ -210,13 +212,26 @@ public class NamenodeWebHdfsMethods {
).chooseRandom(NodeBase.ROOT); ).chooseRandom(NodeBase.ROOT);
} }
/**
* Choose the datanode to redirect the request. Note that the nodes have been
* sorted based on availability and network distances, thus it is sufficient
* to return the first element of the node here.
*/
private static DatanodeInfo bestNode(DatanodeInfo[] nodes) throws IOException {
if (nodes.length == 0 || nodes[0].isDecommissioned()) {
throw new IOException("No active nodes contain this block");
}
return nodes[0];
}
private Token<? extends TokenIdentifier> generateDelegationToken( private Token<? extends TokenIdentifier> generateDelegationToken(
final NameNode namenode, final UserGroupInformation ugi, final NameNode namenode, final UserGroupInformation ugi,
final String renewer) throws IOException { final String renewer) throws IOException {
final Credentials c = DelegationTokenSecretManager.createCredentials( final Credentials c = DelegationTokenSecretManager.createCredentials(
namenode, ugi, renewer != null? renewer: ugi.getShortUserName()); namenode, ugi, renewer != null? renewer: ugi.getShortUserName());
final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next(); final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next();
Text kind = request.getScheme().equals("http") ? WebHdfsFileSystem.TOKEN_KIND : SWebHdfsFileSystem.TOKEN_KIND; Text kind = request.getScheme().equals("http") ? WebHdfsFileSystem.TOKEN_KIND
: SWebHdfsFileSystem.TOKEN_KIND;
t.setKind(kind); t.setKind(kind);
return t; return t;
} }
@ -227,9 +242,8 @@ public class NamenodeWebHdfsMethods {
final String path, final HttpOpParam.Op op, final long openOffset, final String path, final HttpOpParam.Op op, final long openOffset,
final long blocksize, final long blocksize,
final Param<?, ?>... parameters) throws URISyntaxException, IOException { final Param<?, ?>... parameters) throws URISyntaxException, IOException {
final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset, final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset,
blocksize, conf); blocksize);
final String delegationQuery; final String delegationQuery;
if (!UserGroupInformation.isSecurityEnabled()) { if (!UserGroupInformation.isSecurityEnabled()) {

View File

@ -92,7 +92,7 @@ public class TestWebHdfsDataLocality {
//The chosen datanode must be the same as the client address //The chosen datanode must be the same as the client address
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, conf); namenode, f, PutOpParam.Op.CREATE, -1L, blocksize);
Assert.assertEquals(ipAddr, chosen.getIpAddr()); Assert.assertEquals(ipAddr, chosen.getIpAddr());
} }
} }
@ -117,19 +117,19 @@ public class TestWebHdfsDataLocality {
{ //test GETFILECHECKSUM { //test GETFILECHECKSUM
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, conf); namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize);
Assert.assertEquals(expected, chosen); Assert.assertEquals(expected, chosen);
} }
{ //test OPEN { //test OPEN
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, conf); namenode, f, GetOpParam.Op.OPEN, 0, blocksize);
Assert.assertEquals(expected, chosen); Assert.assertEquals(expected, chosen);
} }
{ //test APPEND { //test APPEND
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, conf); namenode, f, PostOpParam.Op.APPEND, -1L, blocksize);
Assert.assertEquals(expected, chosen); Assert.assertEquals(expected, chosen);
} }
} finally { } finally {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.web; package org.apache.hadoop.hdfs.web;
import java.io.File; import java.io.File;
import java.io.InputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
@ -92,6 +93,9 @@ public class TestHttpsFileSystem {
os.write(23); os.write(23);
os.close(); os.close();
Assert.assertTrue(fs.exists(f)); Assert.assertTrue(fs.exists(f));
InputStream is = fs.open(f);
Assert.assertEquals(23, is.read());
is.close();
fs.close(); fs.close();
} }
} }