svn merge -c 1476399 from trunk for HDFS-4721. Speed up lease recovery by avoiding stale datanodes and choosing he datanode with the most recent heartbeat as the primary.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1476400 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-04-26 20:52:29 +00:00
parent ffe3368d94
commit b2927bd936
7 changed files with 307 additions and 29 deletions

View File

@ -94,6 +94,10 @@ Release 2.0.5-beta - UNRELEASED
HDFS-4346. Add SequentialNumber as a base class for INodeId and
GenerationStamp. (szetszwo)
HDFS-4721. Speed up lease recovery by avoiding stale datanodes and choosing
the datanode with the most recent heartbeat as the primary. (Varun Sharma
via szetszwo)
OPTIMIZATIONS
BUG FIXES

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@ -41,7 +42,10 @@ public class BlockInfoUnderConstruction extends BlockInfo {
*/
private List<ReplicaUnderConstruction> replicas;
/** A data-node responsible for block recovery. */
/**
* Index of the primary data node doing the recovery. Useful for log
* messages.
*/
private int primaryNodeIndex = -1;
/**
@ -62,6 +66,7 @@ public class BlockInfoUnderConstruction extends BlockInfo {
static class ReplicaUnderConstruction extends Block {
private DatanodeDescriptor expectedLocation;
private ReplicaState state;
private boolean chosenAsPrimary;
ReplicaUnderConstruction(Block block,
DatanodeDescriptor target,
@ -69,6 +74,7 @@ public class BlockInfoUnderConstruction extends BlockInfo {
super(block);
this.expectedLocation = target;
this.state = state;
this.chosenAsPrimary = false;
}
/**
@ -88,6 +94,13 @@ public class BlockInfoUnderConstruction extends BlockInfo {
return state;
}
/**
* Whether the replica was chosen for recovery.
*/
boolean getChosenAsPrimary() {
return chosenAsPrimary;
}
/**
* Set replica state.
*/
@ -95,6 +108,13 @@ public class BlockInfoUnderConstruction extends BlockInfo {
state = s;
}
/**
* Set whether this replica was chosen for recovery.
*/
void setChosenAsPrimary(boolean chosenAsPrimary) {
this.chosenAsPrimary = chosenAsPrimary;
}
/**
* Is data-node the replica belongs to alive.
*/
@ -237,19 +257,40 @@ public class BlockInfoUnderConstruction extends BlockInfo {
+ " BlockInfoUnderConstruction.initLeaseRecovery:"
+ " No blocks found, lease removed.");
}
int previous = primaryNodeIndex;
for(int i = 1; i <= replicas.size(); i++) {
int j = (previous + i)%replicas.size();
if (replicas.get(j).isAlive()) {
primaryNodeIndex = j;
DatanodeDescriptor primary = replicas.get(j).getExpectedLocation();
primary.addBlockToBeRecovered(this);
NameNode.blockStateChangeLog.info("BLOCK* " + this
+ " recovery started, primary=" + primary);
return;
boolean allLiveReplicasTriedAsPrimary = true;
for (int i = 0; i < replicas.size(); i++) {
// Check if all replicas have been tried or not.
if (replicas.get(i).isAlive()) {
allLiveReplicasTriedAsPrimary =
(allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary());
}
}
if (allLiveReplicasTriedAsPrimary) {
// Just set all the replicas to be chosen whether they are alive or not.
for (int i = 0; i < replicas.size(); i++) {
replicas.get(i).setChosenAsPrimary(false);
}
}
long mostRecentLastUpdate = 0;
ReplicaUnderConstruction primary = null;
primaryNodeIndex = -1;
for(int i = 0; i < replicas.size(); i++) {
// Skip alive replicas which have been chosen for recovery.
if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) {
continue;
}
if (replicas.get(i).getExpectedLocation().getLastUpdate() > mostRecentLastUpdate) {
primary = replicas.get(i);
primaryNodeIndex = i;
mostRecentLastUpdate = primary.getExpectedLocation().getLastUpdate();
}
}
if (primary != null) {
primary.getExpectedLocation().addBlockToBeRecovered(this);
primary.setChosenAsPrimary(true);
NameNode.blockStateChangeLog.info("BLOCK* " + this
+ " recovery started, primary=" + primary);
}
}
void addReplicaIfNotPresent(DatanodeDescriptor dn,

View File

@ -209,7 +209,7 @@ public class DatanodeManager {
" = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
"It should be a positive non-zero float value, not greater than 1.0f.");
}
private static long getStaleIntervalFromConf(Configuration conf,
long heartbeatExpireInterval) {
long staleInterval = conf.getLong(
@ -862,7 +862,7 @@ public class DatanodeManager {
(numStaleNodes <= heartbeatManager.getLiveDatanodeCount()
* ratioUseStaleDataNodesForWrite);
}
/**
* @return The time interval used to mark DataNodes as stale.
*/
@ -1080,7 +1080,7 @@ public class DatanodeManager {
* failed. As a special case, the loopback address is also considered
* acceptable. This is particularly important on Windows, where 127.0.0.1 does
* not resolve to "localhost".
*
*
* @param address InetAddress to check
* @return boolean true if name resolution successful or address is loopback
*/
@ -1114,7 +1114,7 @@ public class DatanodeManager {
setDatanodeDead(nodeinfo);
throw new DisallowedDatanodeException(nodeinfo);
}
if (nodeinfo == null || !nodeinfo.isAlive) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
@ -1129,9 +1129,34 @@ public class DatanodeManager {
BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
blocks.length);
for (BlockInfoUnderConstruction b : blocks) {
brCommand.add(new RecoveringBlock(
new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b
.getBlockRecoveryId()));
DatanodeDescriptor[] expectedLocations = b.getExpectedLocations();
// Skip stale nodes during recovery - not heart beated for some time (30s by default).
List<DatanodeDescriptor> recoveryLocations =
new ArrayList<DatanodeDescriptor>(expectedLocations.length);
for (int i = 0; i < expectedLocations.length; i++) {
if (!expectedLocations[i].isStale(this.staleInterval)) {
recoveryLocations.add(expectedLocations[i]);
}
}
// If we only get 1 replica after eliminating stale nodes, then choose all
// replicas for recovery and let the primary data node handle failures.
if (recoveryLocations.size() > 1) {
if (recoveryLocations.size() != expectedLocations.length) {
LOG.info("Skipped stale nodes for recovery : " +
(expectedLocations.length - recoveryLocations.size()));
}
brCommand.add(new RecoveringBlock(
new ExtendedBlock(blockPoolId, b),
recoveryLocations.toArray(new DatanodeDescriptor[recoveryLocations.size()]),
b.getBlockRecoveryId()));
} else {
// If too many replicas are stale, then choose all replicas to participate
// in block recovery.
brCommand.add(new RecoveringBlock(
new ExtendedBlock(blockPoolId, b),
expectedLocations,
b.getBlockRecoveryId()));
}
}
return new DatanodeCommand[] { brCommand };
}

View File

@ -1072,7 +1072,10 @@
otherwise this may cause too frequent change of stale states.
We thus set a minimum stale interval value (the default value is 3 times
of heartbeat interval) and guarantee that the stale interval cannot be less
than the minimum value.
than the minimum value. A stale data node is avoided during lease/block
recovery. It can be conditionally avoided for reads (see
dfs.namenode.avoid.read.stale.datanode) and for writes (see
dfs.namenode.avoid.write.stale.datanode).
</description>
</property>

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.junit.Test;
/**
* This class provides tests for BlockInfoUnderConstruction class
*/
public class TestBlockInfoUnderConstruction {
@Test
public void testInitializeBlockRecovery() throws Exception {
DatanodeDescriptor dd1 = DFSTestUtil.getDatanodeDescriptor("10.10.1.1",
"default");
DatanodeDescriptor dd2 = DFSTestUtil.getDatanodeDescriptor("10.10.1.2",
"default");
DatanodeDescriptor dd3 = DFSTestUtil.getDatanodeDescriptor("10.10.1.3",
"default");
dd1.isAlive = dd2.isAlive = dd3.isAlive = true;
BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP),
3,
BlockUCState.UNDER_CONSTRUCTION,
new DatanodeDescriptor[] {dd1, dd2, dd3});
// Recovery attempt #1.
long currentTime = System.currentTimeMillis();
dd1.setLastUpdate(currentTime - 3 * 1000);
dd2.setLastUpdate(currentTime - 1 * 1000);
dd3.setLastUpdate(currentTime - 2 * 1000);
blockInfo.initializeBlockRecovery(1);
BlockInfoUnderConstruction[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1);
assertEquals(blockInfoRecovery[0], blockInfo);
// Recovery attempt #2.
currentTime = System.currentTimeMillis();
dd1.setLastUpdate(currentTime - 2 * 1000);
dd2.setLastUpdate(currentTime - 1 * 1000);
dd3.setLastUpdate(currentTime - 3 * 1000);
blockInfo.initializeBlockRecovery(2);
blockInfoRecovery = dd1.getLeaseRecoveryCommand(1);
assertEquals(blockInfoRecovery[0], blockInfo);
// Recovery attempt #3.
currentTime = System.currentTimeMillis();
dd1.setLastUpdate(currentTime - 2 * 1000);
dd2.setLastUpdate(currentTime - 1 * 1000);
dd3.setLastUpdate(currentTime - 3 * 1000);
currentTime = System.currentTimeMillis();
blockInfo.initializeBlockRecovery(3);
blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
assertEquals(blockInfoRecovery[0], blockInfo);
// Recovery attempt #4.
// Reset everything. And again pick DN with most recent heart beat.
currentTime = System.currentTimeMillis();
dd1.setLastUpdate(currentTime - 2 * 1000);
dd2.setLastUpdate(currentTime - 1 * 1000);
dd3.setLastUpdate(currentTime);
currentTime = System.currentTimeMillis();
blockInfo.initializeBlockRecovery(3);
blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
assertEquals(blockInfoRecovery[0], blockInfo);
}
}

View File

@ -20,17 +20,21 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -56,14 +60,12 @@ public class TestHeartbeatHandling {
final HeartbeatManager hm = namesystem.getBlockManager(
).getDatanodeManager().getHeartbeatManager();
final String poolId = namesystem.getBlockPoolId();
final DatanodeRegistration nodeReg =
final DatanodeRegistration nodeReg =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, nodeReg);
final int REMAINING_BLOCKS = 1;
final int MAX_REPLICATE_LIMIT =
final int MAX_REPLICATE_LIMIT =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 2);
final int MAX_INVALIDATE_LIMIT = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
final int MAX_INVALIDATE_BLOCKS = 2*MAX_INVALIDATE_LIMIT+REMAINING_BLOCKS;
@ -83,7 +85,7 @@ public class TestHeartbeatHandling {
assertEquals(1, cmds.length);
assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
assertEquals(MAX_REPLICATE_LIMIT, ((BlockCommand)cmds[0]).getBlocks().length);
ArrayList<Block> blockList = new ArrayList<Block>(MAX_INVALIDATE_BLOCKS);
for (int i=0; i<MAX_INVALIDATE_BLOCKS; i++) {
blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP));
@ -122,4 +124,113 @@ public class TestHeartbeatHandling {
cluster.shutdown();
}
}
/**
* Test if
* {@link FSNamesystem#handleHeartbeat}
* correctly selects data node targets for block recovery.
*/
@Test
public void testHeartbeatBlockRecovery() throws Exception {
final Configuration conf = new HdfsConfiguration();
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
try {
cluster.waitActive();
final FSNamesystem namesystem = cluster.getNamesystem();
final HeartbeatManager hm = namesystem.getBlockManager(
).getDatanodeManager().getHeartbeatManager();
final String poolId = namesystem.getBlockPoolId();
final DatanodeRegistration nodeReg1 =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
final DatanodeDescriptor dd1 = NameNodeAdapter.getDatanode(namesystem, nodeReg1);
final DatanodeRegistration nodeReg2 =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(1), poolId);
final DatanodeDescriptor dd2 = NameNodeAdapter.getDatanode(namesystem, nodeReg2);
final DatanodeRegistration nodeReg3 =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(2), poolId);
final DatanodeDescriptor dd3 = NameNodeAdapter.getDatanode(namesystem, nodeReg3);
try {
namesystem.writeLock();
synchronized(hm) {
NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem);
NameNodeAdapter.sendHeartBeat(nodeReg2, dd2, namesystem);
NameNodeAdapter.sendHeartBeat(nodeReg3, dd3, namesystem);
// Test with all alive nodes.
dd1.setLastUpdate(System.currentTimeMillis());
dd2.setLastUpdate(System.currentTimeMillis());
dd3.setLastUpdate(System.currentTimeMillis());
BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
BlockUCState.UNDER_RECOVERY,
new DatanodeDescriptor[] {dd1, dd2, dd3});
dd1.addBlockToBeRecovered(blockInfo);
DatanodeCommand[] cmds =
NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
assertEquals(1, cmds.length);
assertEquals(DatanodeProtocol.DNA_RECOVERBLOCK, cmds[0].getAction());
BlockRecoveryCommand recoveryCommand = (BlockRecoveryCommand)cmds[0];
assertEquals(1, recoveryCommand.getRecoveringBlocks().size());
DatanodeInfo[] recoveringNodes = recoveryCommand.getRecoveringBlocks()
.toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
assertEquals(3, recoveringNodes.length);
assertEquals(recoveringNodes[0], (DatanodeInfo)dd1);
assertEquals(recoveringNodes[1], (DatanodeInfo)dd2);
assertEquals(recoveringNodes[2], (DatanodeInfo)dd3);
// Test with one stale node.
dd1.setLastUpdate(System.currentTimeMillis());
// More than the default stale interval of 30 seconds.
dd2.setLastUpdate(System.currentTimeMillis() - 40 * 1000);
dd3.setLastUpdate(System.currentTimeMillis());
blockInfo = new BlockInfoUnderConstruction(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
BlockUCState.UNDER_RECOVERY,
new DatanodeDescriptor[] {dd1, dd2, dd3});
dd1.addBlockToBeRecovered(blockInfo);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
assertEquals(1, cmds.length);
assertEquals(DatanodeProtocol.DNA_RECOVERBLOCK, cmds[0].getAction());
recoveryCommand = (BlockRecoveryCommand)cmds[0];
assertEquals(1, recoveryCommand.getRecoveringBlocks().size());
recoveringNodes = recoveryCommand.getRecoveringBlocks()
.toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
assertEquals(2, recoveringNodes.length);
// dd2 is skipped.
assertEquals(recoveringNodes[0], (DatanodeInfo)dd1);
assertEquals(recoveringNodes[1], (DatanodeInfo)dd3);
// Test with all stale node.
dd1.setLastUpdate(System.currentTimeMillis() - 60 * 1000);
// More than the default stale interval of 30 seconds.
dd2.setLastUpdate(System.currentTimeMillis() - 40 * 1000);
dd3.setLastUpdate(System.currentTimeMillis() - 80 * 1000);
blockInfo = new BlockInfoUnderConstruction(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
BlockUCState.UNDER_RECOVERY,
new DatanodeDescriptor[] {dd1, dd2, dd3});
dd1.addBlockToBeRecovered(blockInfo);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
assertEquals(1, cmds.length);
assertEquals(DatanodeProtocol.DNA_RECOVERBLOCK, cmds[0].getAction());
recoveryCommand = (BlockRecoveryCommand)cmds[0];
assertEquals(1, recoveryCommand.getRecoveringBlocks().size());
recoveringNodes = recoveryCommand.getRecoveringBlocks()
.toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
// Only dd1 is included since it heart beated and hence its not stale
// when the list of recovery blocks is constructed.
assertEquals(3, recoveringNodes.length);
assertEquals(recoveringNodes[0], (DatanodeInfo)dd1);
assertEquals(recoveringNodes[1], (DatanodeInfo)dd2);
assertEquals(recoveringNodes[2], (DatanodeInfo)dd3);
}
} finally {
namesystem.writeUnlock();
}
} finally {
cluster.shutdown();
}
}
}

View File

@ -521,9 +521,17 @@ public class TestPipelinesFailover {
storedBlock instanceof BlockInfoUnderConstruction);
BlockInfoUnderConstruction ucBlock =
(BlockInfoUnderConstruction)storedBlock;
// We expect that the first indexed replica will be the one
// to be in charge of the synchronization / recovery protocol.
DatanodeDescriptor expectedPrimary = ucBlock.getExpectedLocations()[0];
// We expect that the replica with the most recent heart beat will be
// the one to be in charge of the synchronization / recovery protocol.
DatanodeDescriptor[] datanodes = ucBlock.getExpectedLocations();
DatanodeDescriptor expectedPrimary = datanodes[0];
long mostRecentLastUpdate = expectedPrimary.getLastUpdate();
for (int i = 1; i < datanodes.length; i++) {
if (datanodes[i].getLastUpdate() > mostRecentLastUpdate) {
expectedPrimary = datanodes[i];
mostRecentLastUpdate = expectedPrimary.getLastUpdate();
}
}
return expectedPrimary;
}