HDFS-3703. Merging change 1384209 from trunk to branch-2

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1384210 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-09-13 06:08:07 +00:00
parent 1415f8490d
commit 042f9102fe
8 changed files with 291 additions and 46 deletions

View File

@ -9,6 +9,10 @@ Release 2.0.3-alpha - Unreleased
HDFS-2656. Add libwebhdfs, a pure C client based on WebHDFS.
(Jaimin D Jetly and Jing Zhao via szetszwo)
HDFS-3703. Datanodes are marked stale if heartbeat is not received in
configured timeout and are selected as the last location to read from.
(Jing Zhao via suresh)
IMPROVEMENTS
HDFS-3925. Prettify PipelineAck#toString() for printing to a log

View File

@ -174,6 +174,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
public static final int DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000;
// Whether to enable datanode's stale state detection and usage
public static final String DFS_NAMENODE_CHECK_STALE_DATANODE_KEY = "dfs.namenode.check.stale.datanode";
public static final boolean DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT = false;
// The default value of the time interval for marking datanodes as stale
public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval";
public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT = 30 * 1000; // 30s
// Replication monitoring related keys
public static final String DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION =

View File

@ -133,6 +133,43 @@ public class DFSUtil {
a.isDecommissioned() ? 1 : -1;
}
};
/**
* Comparator for sorting DataNodeInfo[] based on decommissioned/stale states.
* Decommissioned/stale nodes are moved to the end of the array on sorting
* with this compartor.
*/
@InterfaceAudience.Private
public static class DecomStaleComparator implements Comparator<DatanodeInfo> {
private long staleInterval;
/**
* Constructor of DecomStaleComparator
*
* @param interval
* The time invertal for marking datanodes as stale is passed from
* outside, since the interval may be changed dynamically
*/
public DecomStaleComparator(long interval) {
this.staleInterval = interval;
}
@Override
public int compare(DatanodeInfo a, DatanodeInfo b) {
// Decommissioned nodes will still be moved to the end of the list
if (a.isDecommissioned()) {
return b.isDecommissioned() ? 0 : 1;
} else if (b.isDecommissioned()) {
return -1;
}
// Stale nodes will be moved behind the normal nodes
boolean aStale = a.isStale(staleInterval);
boolean bStale = b.isStale(staleInterval);
return aStale == bStale ? 0 : (aStale ? 1 : -1);
}
}
/**
* Address matcher for matching an address to local address
*/

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
/**
* This class extends the primary identifier of a Datanode with ephemeral
@ -319,7 +320,24 @@ public class DatanodeInfo extends DatanodeID implements Node {
}
return adminState;
}
/**
* Check if the datanode is in stale state. Here if
* the namenode has not received heartbeat msg from a
* datanode for more than staleInterval (default value is
* {@link DFSConfigKeys#DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT}),
* the datanode will be treated as stale node.
*
* @param staleInterval
* the time interval for marking the node as stale. If the last
* update time is beyond the given time interval, the node will be
* marked as stale.
* @return true if the node is stale
*/
public boolean isStale(long staleInterval) {
return (Time.now() - lastUpdate) >= staleInterval;
}
/**
* Sets the admin state of this node.
*/

View File

@ -25,6 +25,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -129,6 +130,11 @@ public class DatanodeManager {
*/
private boolean hasClusterEverBeenMultiRack = false;
/** Whether or not to check the stale datanodes */
private volatile boolean checkForStaleNodes;
/** The time interval for detecting stale datanodes */
private volatile long staleInterval;
DatanodeManager(final BlockManager blockManager,
final Namesystem namesystem, final Configuration conf
) throws IOException {
@ -166,6 +172,21 @@ public class DatanodeManager {
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit);
LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
+ "=" + this.blockInvalidateLimit);
// set the value of stale interval based on configuration
this.checkForStaleNodes = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
if (this.checkForStaleNodes) {
this.staleInterval = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT);
if (this.staleInterval < DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT) {
LOG.warn("The given interval for marking stale datanode = "
+ this.staleInterval + ", which is smaller than the default value "
+ DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT
+ ".");
}
}
}
private Daemon decommissionthread = null;
@ -213,14 +234,17 @@ public class DatanodeManager {
final List<LocatedBlock> locatedblocks) {
//sort the blocks
final DatanodeDescriptor client = getDatanodeByHost(targethost);
Comparator<DatanodeInfo> comparator = checkForStaleNodes ?
new DFSUtil.DecomStaleComparator(staleInterval) :
DFSUtil.DECOM_COMPARATOR;
for (LocatedBlock b : locatedblocks) {
networktopology.pseudoSortByDistance(client, b.getLocations());
// Move decommissioned datanodes to the bottom
Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR);
// Move decommissioned/stale datanodes to the bottom
Arrays.sort(b.getLocations(), comparator);
}
}
CyclicIteration<String, DatanodeDescriptor> getDatanodeCyclicIteration(
final String firstkey) {
return new CyclicIteration<String, DatanodeDescriptor>(

View File

@ -1158,6 +1158,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (blocks != null) {
blockManager.getDatanodeManager().sortLocatedBlocks(
clientMachine, blocks.getLocatedBlocks());
LocatedBlock lastBlock = blocks.getLastLocatedBlock();
if (lastBlock != null) {
ArrayList<LocatedBlock> lastBlockList = new ArrayList<LocatedBlock>();
lastBlockList.add(lastBlock);
blockManager.getDatanodeManager().sortLocatedBlocks(
clientMachine, lastBlockList);
}
}
return blocks;
}

View File

@ -960,6 +960,30 @@
<value>${dfs.web.authentication.kerberos.principal}</value>
</property>
<property>
<name>dfs.namenode.check.stale.datanode</name>
<value>false</value>
<description>
Indicate whether or not to check "stale" datanodes whose
heartbeat messages have not been received by the namenode
for more than a specified time interval. If this configuration
parameter is set as true, the stale datanodes will be moved to
the end of the target node list for reading. The writing will
also try to avoid stale nodes.
</description>
</property>
<property>
<name>dfs.namenode.stale.datanode.interval</name>
<value>30000</value>
<description>
Default time interval for marking a datanode as "stale", i.e., if
the namenode has not received heartbeat msg from a datanode for
more than this time interval, the datanode will be marked and treated
as "stale" by default.
</description>
</property>
<property>
<name>dfs.namenode.invalidate.work.pct.per.iteration</name>
<value>0.32f</value>

View File

@ -17,8 +17,7 @@
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -28,48 +27,178 @@ import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Time;
import org.junit.Test;
/**
* This class tests if block replacement request to data nodes work correctly.
* This class tests if getblocks request works correctly.
*/
public class TestGetBlocks {
private static final int blockSize = 8192;
private static final String racks[] = new String[] { "/d1/r1", "/d1/r1",
"/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3" };
private static final int numDatanodes = racks.length;
/**
* Stop the heartbeat of a datanode in the MiniDFSCluster
*
* @param cluster
* The MiniDFSCluster
* @param hostName
* The hostName of the datanode to be stopped
* @return The DataNode whose heartbeat has been stopped
*/
private DataNode stopDataNodeHeartbeat(MiniDFSCluster cluster, String hostName) {
for (DataNode dn : cluster.getDataNodes()) {
if (dn.getDatanodeId().getHostName().equals(hostName)) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
return dn;
}
}
return null;
}
/**
* Test if the datanodes returned by
* {@link ClientProtocol#getBlockLocations(String, long, long)} is correct
* when stale nodes checking is enabled. Also test during the scenario when 1)
* stale nodes checking is enabled, 2) a writing is going on, 3) a datanode
* becomes stale happen simultaneously
*
* @throws Exception
*/
@Test
public void testReadSelectNonStaleDatanode() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
long staleInterval = 30 * 1000 * 60;
conf.setLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
staleInterval);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes).racks(racks).build();
cluster.waitActive();
InetSocketAddress addr = new InetSocketAddress("localhost",
cluster.getNameNodePort());
DFSClient client = new DFSClient(addr, conf);
List<DatanodeDescriptor> nodeInfoList = cluster.getNameNode()
.getNamesystem().getBlockManager().getDatanodeManager()
.getDatanodeListForReport(DatanodeReportType.LIVE);
assertEquals("Unexpected number of datanodes", numDatanodes,
nodeInfoList.size());
FileSystem fileSys = cluster.getFileSystem();
FSDataOutputStream stm = null;
try {
// do the writing but do not close the FSDataOutputStream
// in order to mimic the ongoing writing
final Path fileName = new Path("/file1");
stm = fileSys.create(
fileName,
true,
fileSys.getConf().getInt(
CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
(short) 3, blockSize);
stm.write(new byte[(blockSize * 3) / 2]);
// We do not close the stream so that
// the writing seems to be still ongoing
stm.hflush();
LocatedBlocks blocks = client.getNamenode().getBlockLocations(
fileName.toString(), 0, blockSize);
DatanodeInfo[] nodes = blocks.get(0).getLocations();
assertEquals(nodes.length, 3);
DataNode staleNode = null;
DatanodeDescriptor staleNodeInfo = null;
// stop the heartbeat of the first node
staleNode = this.stopDataNodeHeartbeat(cluster, nodes[0].getHostName());
assertNotNull(staleNode);
// set the first node as stale
staleNodeInfo = cluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager()
.getDatanode(staleNode.getDatanodeId());
staleNodeInfo.setLastUpdate(Time.now() - staleInterval - 1);
LocatedBlocks blocksAfterStale = client.getNamenode().getBlockLocations(
fileName.toString(), 0, blockSize);
DatanodeInfo[] nodesAfterStale = blocksAfterStale.get(0).getLocations();
assertEquals(nodesAfterStale.length, 3);
assertEquals(nodesAfterStale[2].getHostName(), nodes[0].getHostName());
// restart the staleNode's heartbeat
DataNodeTestUtils.setHeartbeatsDisabledForTests(staleNode, false);
// reset the first node as non-stale, so as to avoid two stale nodes
staleNodeInfo.setLastUpdate(Time.now());
LocatedBlock lastBlock = client.getLocatedBlocks(fileName.toString(), 0,
Long.MAX_VALUE).getLastLocatedBlock();
nodes = lastBlock.getLocations();
assertEquals(nodes.length, 3);
// stop the heartbeat of the first node for the last block
staleNode = this.stopDataNodeHeartbeat(cluster, nodes[0].getHostName());
assertNotNull(staleNode);
// set the node as stale
cluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager()
.getDatanode(staleNode.getDatanodeId())
.setLastUpdate(Time.now() - staleInterval - 1);
LocatedBlock lastBlockAfterStale = client.getLocatedBlocks(
fileName.toString(), 0, Long.MAX_VALUE).getLastLocatedBlock();
nodesAfterStale = lastBlockAfterStale.getLocations();
assertEquals(nodesAfterStale.length, 3);
assertEquals(nodesAfterStale[2].getHostName(), nodes[0].getHostName());
} finally {
if (stm != null) {
stm.close();
}
cluster.shutdown();
}
}
/** test getBlocks */
@Test
public void testGetBlocks() throws Exception {
final Configuration CONF = new HdfsConfiguration();
final short REPLICATION_FACTOR = (short)2;
final short REPLICATION_FACTOR = (short) 2;
final int DEFAULT_BLOCK_SIZE = 1024;
final Random r = new Random();
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF)
.numDataNodes(REPLICATION_FACTOR)
.build();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(
REPLICATION_FACTOR).build();
try {
cluster.waitActive();
// create a file with two blocks
FileSystem fs = cluster.getFileSystem();
FSDataOutputStream out = fs.create(new Path("/tmp.txt"),
REPLICATION_FACTOR);
byte [] data = new byte[1024];
long fileLen = 2*DEFAULT_BLOCK_SIZE;
byte[] data = new byte[1024];
long fileLen = 2 * DEFAULT_BLOCK_SIZE;
long bytesToWrite = fileLen;
while( bytesToWrite > 0 ) {
while (bytesToWrite > 0) {
r.nextBytes(data);
int bytesToWriteNext = (1024<bytesToWrite)?1024:(int)bytesToWrite;
int bytesToWriteNext = (1024 < bytesToWrite) ? 1024
: (int) bytesToWrite;
out.write(data, 0, bytesToWriteNext);
bytesToWrite -= bytesToWriteNext;
}
@ -77,27 +206,28 @@ public class TestGetBlocks {
// get blocks & data nodes
List<LocatedBlock> locatedBlocks;
DatanodeInfo[] dataNodes=null;
DatanodeInfo[] dataNodes = null;
boolean notWritten;
do {
final DFSClient dfsclient = new DFSClient(NameNode.getAddress(CONF), CONF);
locatedBlocks = dfsclient.getNamenode().
getBlockLocations("/tmp.txt", 0, fileLen).getLocatedBlocks();
final DFSClient dfsclient = new DFSClient(NameNode.getAddress(CONF),
CONF);
locatedBlocks = dfsclient.getNamenode()
.getBlockLocations("/tmp.txt", 0, fileLen).getLocatedBlocks();
assertEquals(2, locatedBlocks.size());
notWritten = false;
for(int i=0; i<2; i++) {
for (int i = 0; i < 2; i++) {
dataNodes = locatedBlocks.get(i).getLocations();
if(dataNodes.length != REPLICATION_FACTOR) {
if (dataNodes.length != REPLICATION_FACTOR) {
notWritten = true;
try {
Thread.sleep(10);
} catch(InterruptedException e) {
} catch (InterruptedException e) {
}
break;
}
}
} while(notWritten);
} while (notWritten);
// get RPC client to namenode
InetSocketAddress addr = new InetSocketAddress("localhost",
cluster.getNameNodePort());
@ -122,7 +252,7 @@ public class TestGetBlocks {
assertEquals(locs[0].getStorageIDs().length, 2);
// get blocks of size 0 from dataNodes[0]
getBlocksWithException(namenode, dataNodes[0], 0);
getBlocksWithException(namenode, dataNodes[0], 0);
// get blocks of size -1 from dataNodes[0]
getBlocksWithException(namenode, dataNodes[0], -1);
@ -136,46 +266,39 @@ public class TestGetBlocks {
}
private void getBlocksWithException(NamenodeProtocol namenode,
DatanodeInfo datanode,
long size) throws IOException {
DatanodeInfo datanode, long size) throws IOException {
boolean getException = false;
try {
namenode.getBlocks(DFSTestUtil.getLocalDatanodeInfo(), 2);
} catch(RemoteException e) {
namenode.getBlocks(DFSTestUtil.getLocalDatanodeInfo(), 2);
} catch (RemoteException e) {
getException = true;
assertTrue(e.getClassName().contains("HadoopIllegalArgumentException"));
}
assertTrue(getException);
}
@Test
public void testBlockKey() {
Map<Block, Long> map = new HashMap<Block, Long>();
final Random RAN = new Random();
final long seed = RAN.nextLong();
System.out.println("seed=" + seed);
System.out.println("seed=" + seed);
RAN.setSeed(seed);
long[] blkids = new long[10];
for(int i = 0; i < blkids.length; i++) {
long[] blkids = new long[10];
for (int i = 0; i < blkids.length; i++) {
blkids[i] = 1000L + RAN.nextInt(100000);
map.put(new Block(blkids[i], 0, blkids[i]), blkids[i]);
}
System.out.println("map=" + map.toString().replace(",", "\n "));
for(int i = 0; i < blkids.length; i++) {
Block b = new Block(blkids[i], 0, GenerationStamp.GRANDFATHER_GENERATION_STAMP);
for (int i = 0; i < blkids.length; i++) {
Block b = new Block(blkids[i], 0,
GenerationStamp.GRANDFATHER_GENERATION_STAMP);
Long v = map.get(b);
System.out.println(b + " => " + v);
assertEquals(blkids[i], v.longValue());
}
}
/**
* @param args
*/
public static void main(String[] args) throws Exception {
(new TestGetBlocks()).testGetBlocks();
}
}