HDFS-4721. Speed up lease recovery by avoiding stale datanodes and choosing the datanode with the most recent heartbeat as the primary. Contributed by Varun Sharma

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1476399 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-04-26 20:51:45 +00:00
parent 5d2ffde68e
commit d2495d553f
7 changed files with 307 additions and 29 deletions

View File

@ -460,6 +460,10 @@ Release 2.0.5-beta - UNRELEASED
HDFS-4346. Add SequentialNumber as a base class for INodeId and HDFS-4346. Add SequentialNumber as a base class for INodeId and
GenerationStamp. (szetszwo) GenerationStamp. (szetszwo)
HDFS-4721. Speed up lease recovery by avoiding stale datanodes and choosing
the datanode with the most recent heartbeat as the primary. (Varun Sharma
via szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -41,7 +42,10 @@ public class BlockInfoUnderConstruction extends BlockInfo {
*/ */
private List<ReplicaUnderConstruction> replicas; private List<ReplicaUnderConstruction> replicas;
/** A data-node responsible for block recovery. */ /**
* Index of the primary data node doing the recovery. Useful for log
* messages.
*/
private int primaryNodeIndex = -1; private int primaryNodeIndex = -1;
/** /**
@ -62,6 +66,7 @@ public class BlockInfoUnderConstruction extends BlockInfo {
static class ReplicaUnderConstruction extends Block { static class ReplicaUnderConstruction extends Block {
private DatanodeDescriptor expectedLocation; private DatanodeDescriptor expectedLocation;
private ReplicaState state; private ReplicaState state;
private boolean chosenAsPrimary;
ReplicaUnderConstruction(Block block, ReplicaUnderConstruction(Block block,
DatanodeDescriptor target, DatanodeDescriptor target,
@ -69,6 +74,7 @@ public class BlockInfoUnderConstruction extends BlockInfo {
super(block); super(block);
this.expectedLocation = target; this.expectedLocation = target;
this.state = state; this.state = state;
this.chosenAsPrimary = false;
} }
/** /**
@ -88,6 +94,13 @@ public class BlockInfoUnderConstruction extends BlockInfo {
return state; return state;
} }
/**
* Whether the replica was chosen for recovery.
*/
boolean getChosenAsPrimary() {
return chosenAsPrimary;
}
/** /**
* Set replica state. * Set replica state.
*/ */
@ -95,6 +108,13 @@ public class BlockInfoUnderConstruction extends BlockInfo {
state = s; state = s;
} }
/**
* Set whether this replica was chosen for recovery.
*/
void setChosenAsPrimary(boolean chosenAsPrimary) {
this.chosenAsPrimary = chosenAsPrimary;
}
/** /**
* Is data-node the replica belongs to alive. * Is data-node the replica belongs to alive.
*/ */
@ -237,19 +257,40 @@ public class BlockInfoUnderConstruction extends BlockInfo {
+ " BlockInfoUnderConstruction.initLeaseRecovery:" + " BlockInfoUnderConstruction.initLeaseRecovery:"
+ " No blocks found, lease removed."); + " No blocks found, lease removed.");
} }
boolean allLiveReplicasTriedAsPrimary = true;
int previous = primaryNodeIndex; for (int i = 0; i < replicas.size(); i++) {
for(int i = 1; i <= replicas.size(); i++) { // Check if all replicas have been tried or not.
int j = (previous + i)%replicas.size(); if (replicas.get(i).isAlive()) {
if (replicas.get(j).isAlive()) { allLiveReplicasTriedAsPrimary =
primaryNodeIndex = j; (allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary());
DatanodeDescriptor primary = replicas.get(j).getExpectedLocation();
primary.addBlockToBeRecovered(this);
NameNode.blockStateChangeLog.info("BLOCK* " + this
+ " recovery started, primary=" + primary);
return;
} }
} }
if (allLiveReplicasTriedAsPrimary) {
// Just set all the replicas to be chosen whether they are alive or not.
for (int i = 0; i < replicas.size(); i++) {
replicas.get(i).setChosenAsPrimary(false);
}
}
long mostRecentLastUpdate = 0;
ReplicaUnderConstruction primary = null;
primaryNodeIndex = -1;
for(int i = 0; i < replicas.size(); i++) {
// Skip alive replicas which have been chosen for recovery.
if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) {
continue;
}
if (replicas.get(i).getExpectedLocation().getLastUpdate() > mostRecentLastUpdate) {
primary = replicas.get(i);
primaryNodeIndex = i;
mostRecentLastUpdate = primary.getExpectedLocation().getLastUpdate();
}
}
if (primary != null) {
primary.getExpectedLocation().addBlockToBeRecovered(this);
primary.setChosenAsPrimary(true);
NameNode.blockStateChangeLog.info("BLOCK* " + this
+ " recovery started, primary=" + primary);
}
} }
void addReplicaIfNotPresent(DatanodeDescriptor dn, void addReplicaIfNotPresent(DatanodeDescriptor dn,

View File

@ -213,7 +213,7 @@ public class DatanodeManager {
" = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " + " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
"It should be a positive non-zero float value, not greater than 1.0f."); "It should be a positive non-zero float value, not greater than 1.0f.");
} }
private static long getStaleIntervalFromConf(Configuration conf, private static long getStaleIntervalFromConf(Configuration conf,
long heartbeatExpireInterval) { long heartbeatExpireInterval) {
long staleInterval = conf.getLong( long staleInterval = conf.getLong(
@ -942,7 +942,7 @@ public class DatanodeManager {
(numStaleNodes <= heartbeatManager.getLiveDatanodeCount() (numStaleNodes <= heartbeatManager.getLiveDatanodeCount()
* ratioUseStaleDataNodesForWrite); * ratioUseStaleDataNodesForWrite);
} }
/** /**
* @return The time interval used to mark DataNodes as stale. * @return The time interval used to mark DataNodes as stale.
*/ */
@ -1160,7 +1160,7 @@ public class DatanodeManager {
* failed. As a special case, the loopback address is also considered * failed. As a special case, the loopback address is also considered
* acceptable. This is particularly important on Windows, where 127.0.0.1 does * acceptable. This is particularly important on Windows, where 127.0.0.1 does
* not resolve to "localhost". * not resolve to "localhost".
* *
* @param address InetAddress to check * @param address InetAddress to check
* @return boolean true if name resolution successful or address is loopback * @return boolean true if name resolution successful or address is loopback
*/ */
@ -1194,7 +1194,7 @@ public class DatanodeManager {
setDatanodeDead(nodeinfo); setDatanodeDead(nodeinfo);
throw new DisallowedDatanodeException(nodeinfo); throw new DisallowedDatanodeException(nodeinfo);
} }
if (nodeinfo == null || !nodeinfo.isAlive) { if (nodeinfo == null || !nodeinfo.isAlive) {
return new DatanodeCommand[]{RegisterCommand.REGISTER}; return new DatanodeCommand[]{RegisterCommand.REGISTER};
} }
@ -1209,9 +1209,34 @@ public class DatanodeManager {
BlockRecoveryCommand brCommand = new BlockRecoveryCommand( BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
blocks.length); blocks.length);
for (BlockInfoUnderConstruction b : blocks) { for (BlockInfoUnderConstruction b : blocks) {
brCommand.add(new RecoveringBlock( DatanodeDescriptor[] expectedLocations = b.getExpectedLocations();
new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b // Skip stale nodes during recovery - not heart beated for some time (30s by default).
.getBlockRecoveryId())); List<DatanodeDescriptor> recoveryLocations =
new ArrayList<DatanodeDescriptor>(expectedLocations.length);
for (int i = 0; i < expectedLocations.length; i++) {
if (!expectedLocations[i].isStale(this.staleInterval)) {
recoveryLocations.add(expectedLocations[i]);
}
}
// If we only get 1 replica after eliminating stale nodes, then choose all
// replicas for recovery and let the primary data node handle failures.
if (recoveryLocations.size() > 1) {
if (recoveryLocations.size() != expectedLocations.length) {
LOG.info("Skipped stale nodes for recovery : " +
(expectedLocations.length - recoveryLocations.size()));
}
brCommand.add(new RecoveringBlock(
new ExtendedBlock(blockPoolId, b),
recoveryLocations.toArray(new DatanodeDescriptor[recoveryLocations.size()]),
b.getBlockRecoveryId()));
} else {
// If too many replicas are stale, then choose all replicas to participate
// in block recovery.
brCommand.add(new RecoveringBlock(
new ExtendedBlock(blockPoolId, b),
expectedLocations,
b.getBlockRecoveryId()));
}
} }
return new DatanodeCommand[] { brCommand }; return new DatanodeCommand[] { brCommand };
} }

View File

@ -1072,7 +1072,10 @@
otherwise this may cause too frequent change of stale states. otherwise this may cause too frequent change of stale states.
We thus set a minimum stale interval value (the default value is 3 times We thus set a minimum stale interval value (the default value is 3 times
of heartbeat interval) and guarantee that the stale interval cannot be less of heartbeat interval) and guarantee that the stale interval cannot be less
than the minimum value. than the minimum value. A stale data node is avoided during lease/block
recovery. It can be conditionally avoided for reads (see
dfs.namenode.avoid.read.stale.datanode) and for writes (see
dfs.namenode.avoid.write.stale.datanode).
</description> </description>
</property> </property>

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.junit.Test;
/**
* This class provides tests for BlockInfoUnderConstruction class
*/
public class TestBlockInfoUnderConstruction {
@Test
public void testInitializeBlockRecovery() throws Exception {
DatanodeDescriptor dd1 = DFSTestUtil.getDatanodeDescriptor("10.10.1.1",
"default");
DatanodeDescriptor dd2 = DFSTestUtil.getDatanodeDescriptor("10.10.1.2",
"default");
DatanodeDescriptor dd3 = DFSTestUtil.getDatanodeDescriptor("10.10.1.3",
"default");
dd1.isAlive = dd2.isAlive = dd3.isAlive = true;
BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP),
3,
BlockUCState.UNDER_CONSTRUCTION,
new DatanodeDescriptor[] {dd1, dd2, dd3});
// Recovery attempt #1.
long currentTime = System.currentTimeMillis();
dd1.setLastUpdate(currentTime - 3 * 1000);
dd2.setLastUpdate(currentTime - 1 * 1000);
dd3.setLastUpdate(currentTime - 2 * 1000);
blockInfo.initializeBlockRecovery(1);
BlockInfoUnderConstruction[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1);
assertEquals(blockInfoRecovery[0], blockInfo);
// Recovery attempt #2.
currentTime = System.currentTimeMillis();
dd1.setLastUpdate(currentTime - 2 * 1000);
dd2.setLastUpdate(currentTime - 1 * 1000);
dd3.setLastUpdate(currentTime - 3 * 1000);
blockInfo.initializeBlockRecovery(2);
blockInfoRecovery = dd1.getLeaseRecoveryCommand(1);
assertEquals(blockInfoRecovery[0], blockInfo);
// Recovery attempt #3.
currentTime = System.currentTimeMillis();
dd1.setLastUpdate(currentTime - 2 * 1000);
dd2.setLastUpdate(currentTime - 1 * 1000);
dd3.setLastUpdate(currentTime - 3 * 1000);
currentTime = System.currentTimeMillis();
blockInfo.initializeBlockRecovery(3);
blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
assertEquals(blockInfoRecovery[0], blockInfo);
// Recovery attempt #4.
// Reset everything. And again pick DN with most recent heart beat.
currentTime = System.currentTimeMillis();
dd1.setLastUpdate(currentTime - 2 * 1000);
dd2.setLastUpdate(currentTime - 1 * 1000);
dd3.setLastUpdate(currentTime);
currentTime = System.currentTimeMillis();
blockInfo.initializeBlockRecovery(3);
blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
assertEquals(blockInfoRecovery[0], blockInfo);
}
}

View File

@ -20,17 +20,21 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -56,14 +60,12 @@ public class TestHeartbeatHandling {
final HeartbeatManager hm = namesystem.getBlockManager( final HeartbeatManager hm = namesystem.getBlockManager(
).getDatanodeManager().getHeartbeatManager(); ).getDatanodeManager().getHeartbeatManager();
final String poolId = namesystem.getBlockPoolId(); final String poolId = namesystem.getBlockPoolId();
final DatanodeRegistration nodeReg = final DatanodeRegistration nodeReg =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId); DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, nodeReg); final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, nodeReg);
final int REMAINING_BLOCKS = 1; final int REMAINING_BLOCKS = 1;
final int MAX_REPLICATE_LIMIT = final int MAX_REPLICATE_LIMIT =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 2); conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 2);
final int MAX_INVALIDATE_LIMIT = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT; final int MAX_INVALIDATE_LIMIT = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
final int MAX_INVALIDATE_BLOCKS = 2*MAX_INVALIDATE_LIMIT+REMAINING_BLOCKS; final int MAX_INVALIDATE_BLOCKS = 2*MAX_INVALIDATE_LIMIT+REMAINING_BLOCKS;
@ -83,7 +85,7 @@ public class TestHeartbeatHandling {
assertEquals(1, cmds.length); assertEquals(1, cmds.length);
assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction()); assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
assertEquals(MAX_REPLICATE_LIMIT, ((BlockCommand)cmds[0]).getBlocks().length); assertEquals(MAX_REPLICATE_LIMIT, ((BlockCommand)cmds[0]).getBlocks().length);
ArrayList<Block> blockList = new ArrayList<Block>(MAX_INVALIDATE_BLOCKS); ArrayList<Block> blockList = new ArrayList<Block>(MAX_INVALIDATE_BLOCKS);
for (int i=0; i<MAX_INVALIDATE_BLOCKS; i++) { for (int i=0; i<MAX_INVALIDATE_BLOCKS; i++) {
blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP)); blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP));
@ -122,4 +124,113 @@ public class TestHeartbeatHandling {
cluster.shutdown(); cluster.shutdown();
} }
} }
/**
* Test if
* {@link FSNamesystem#handleHeartbeat}
* correctly selects data node targets for block recovery.
*/
@Test
public void testHeartbeatBlockRecovery() throws Exception {
final Configuration conf = new HdfsConfiguration();
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
try {
cluster.waitActive();
final FSNamesystem namesystem = cluster.getNamesystem();
final HeartbeatManager hm = namesystem.getBlockManager(
).getDatanodeManager().getHeartbeatManager();
final String poolId = namesystem.getBlockPoolId();
final DatanodeRegistration nodeReg1 =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
final DatanodeDescriptor dd1 = NameNodeAdapter.getDatanode(namesystem, nodeReg1);
final DatanodeRegistration nodeReg2 =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(1), poolId);
final DatanodeDescriptor dd2 = NameNodeAdapter.getDatanode(namesystem, nodeReg2);
final DatanodeRegistration nodeReg3 =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(2), poolId);
final DatanodeDescriptor dd3 = NameNodeAdapter.getDatanode(namesystem, nodeReg3);
try {
namesystem.writeLock();
synchronized(hm) {
NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem);
NameNodeAdapter.sendHeartBeat(nodeReg2, dd2, namesystem);
NameNodeAdapter.sendHeartBeat(nodeReg3, dd3, namesystem);
// Test with all alive nodes.
dd1.setLastUpdate(System.currentTimeMillis());
dd2.setLastUpdate(System.currentTimeMillis());
dd3.setLastUpdate(System.currentTimeMillis());
BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
BlockUCState.UNDER_RECOVERY,
new DatanodeDescriptor[] {dd1, dd2, dd3});
dd1.addBlockToBeRecovered(blockInfo);
DatanodeCommand[] cmds =
NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
assertEquals(1, cmds.length);
assertEquals(DatanodeProtocol.DNA_RECOVERBLOCK, cmds[0].getAction());
BlockRecoveryCommand recoveryCommand = (BlockRecoveryCommand)cmds[0];
assertEquals(1, recoveryCommand.getRecoveringBlocks().size());
DatanodeInfo[] recoveringNodes = recoveryCommand.getRecoveringBlocks()
.toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
assertEquals(3, recoveringNodes.length);
assertEquals(recoveringNodes[0], (DatanodeInfo)dd1);
assertEquals(recoveringNodes[1], (DatanodeInfo)dd2);
assertEquals(recoveringNodes[2], (DatanodeInfo)dd3);
// Test with one stale node.
dd1.setLastUpdate(System.currentTimeMillis());
// More than the default stale interval of 30 seconds.
dd2.setLastUpdate(System.currentTimeMillis() - 40 * 1000);
dd3.setLastUpdate(System.currentTimeMillis());
blockInfo = new BlockInfoUnderConstruction(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
BlockUCState.UNDER_RECOVERY,
new DatanodeDescriptor[] {dd1, dd2, dd3});
dd1.addBlockToBeRecovered(blockInfo);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
assertEquals(1, cmds.length);
assertEquals(DatanodeProtocol.DNA_RECOVERBLOCK, cmds[0].getAction());
recoveryCommand = (BlockRecoveryCommand)cmds[0];
assertEquals(1, recoveryCommand.getRecoveringBlocks().size());
recoveringNodes = recoveryCommand.getRecoveringBlocks()
.toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
assertEquals(2, recoveringNodes.length);
// dd2 is skipped.
assertEquals(recoveringNodes[0], (DatanodeInfo)dd1);
assertEquals(recoveringNodes[1], (DatanodeInfo)dd3);
// Test with all stale node.
dd1.setLastUpdate(System.currentTimeMillis() - 60 * 1000);
// More than the default stale interval of 30 seconds.
dd2.setLastUpdate(System.currentTimeMillis() - 40 * 1000);
dd3.setLastUpdate(System.currentTimeMillis() - 80 * 1000);
blockInfo = new BlockInfoUnderConstruction(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
BlockUCState.UNDER_RECOVERY,
new DatanodeDescriptor[] {dd1, dd2, dd3});
dd1.addBlockToBeRecovered(blockInfo);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
assertEquals(1, cmds.length);
assertEquals(DatanodeProtocol.DNA_RECOVERBLOCK, cmds[0].getAction());
recoveryCommand = (BlockRecoveryCommand)cmds[0];
assertEquals(1, recoveryCommand.getRecoveringBlocks().size());
recoveringNodes = recoveryCommand.getRecoveringBlocks()
.toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
// Only dd1 is included since it heart beated and hence its not stale
// when the list of recovery blocks is constructed.
assertEquals(3, recoveringNodes.length);
assertEquals(recoveringNodes[0], (DatanodeInfo)dd1);
assertEquals(recoveringNodes[1], (DatanodeInfo)dd2);
assertEquals(recoveringNodes[2], (DatanodeInfo)dd3);
}
} finally {
namesystem.writeUnlock();
}
} finally {
cluster.shutdown();
}
}
} }

View File

@ -521,9 +521,17 @@ public class TestPipelinesFailover {
storedBlock instanceof BlockInfoUnderConstruction); storedBlock instanceof BlockInfoUnderConstruction);
BlockInfoUnderConstruction ucBlock = BlockInfoUnderConstruction ucBlock =
(BlockInfoUnderConstruction)storedBlock; (BlockInfoUnderConstruction)storedBlock;
// We expect that the first indexed replica will be the one // We expect that the replica with the most recent heart beat will be
// to be in charge of the synchronization / recovery protocol. // the one to be in charge of the synchronization / recovery protocol.
DatanodeDescriptor expectedPrimary = ucBlock.getExpectedLocations()[0]; DatanodeDescriptor[] datanodes = ucBlock.getExpectedLocations();
DatanodeDescriptor expectedPrimary = datanodes[0];
long mostRecentLastUpdate = expectedPrimary.getLastUpdate();
for (int i = 1; i < datanodes.length; i++) {
if (datanodes[i].getLastUpdate() > mostRecentLastUpdate) {
expectedPrimary = datanodes[i];
mostRecentLastUpdate = expectedPrimary.getLastUpdate();
}
}
return expectedPrimary; return expectedPrimary;
} }