diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index b552d30de40..e2fd5908ccb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -460,6 +460,10 @@ Release 2.0.5-beta - UNRELEASED HDFS-4346. Add SequentialNumber as a base class for INodeId and GenerationStamp. (szetszwo) + HDFS-4721. Speed up lease recovery by avoiding stale datanodes and choosing + the datanode with the most recent heartbeat as the primary. (Varun Sharma + via szetszwo) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java index 36b3598b2c6..fc59acf10a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -41,7 +42,10 @@ public class BlockInfoUnderConstruction extends BlockInfo { */ private List replicas; - /** A data-node responsible for block recovery. */ + /** + * Index of the primary data node doing the recovery. Useful for log + * messages. + */ private int primaryNodeIndex = -1; /** @@ -62,6 +66,7 @@ public class BlockInfoUnderConstruction extends BlockInfo { static class ReplicaUnderConstruction extends Block { private DatanodeDescriptor expectedLocation; private ReplicaState state; + private boolean chosenAsPrimary; ReplicaUnderConstruction(Block block, DatanodeDescriptor target, @@ -69,6 +74,7 @@ public class BlockInfoUnderConstruction extends BlockInfo { super(block); this.expectedLocation = target; this.state = state; + this.chosenAsPrimary = false; } /** @@ -88,6 +94,13 @@ public class BlockInfoUnderConstruction extends BlockInfo { return state; } + /** + * Whether the replica was chosen for recovery. + */ + boolean getChosenAsPrimary() { + return chosenAsPrimary; + } + /** * Set replica state. */ @@ -95,6 +108,13 @@ public class BlockInfoUnderConstruction extends BlockInfo { state = s; } + /** + * Set whether this replica was chosen for recovery. + */ + void setChosenAsPrimary(boolean chosenAsPrimary) { + this.chosenAsPrimary = chosenAsPrimary; + } + /** * Is data-node the replica belongs to alive. */ @@ -237,19 +257,40 @@ public class BlockInfoUnderConstruction extends BlockInfo { + " BlockInfoUnderConstruction.initLeaseRecovery:" + " No blocks found, lease removed."); } - - int previous = primaryNodeIndex; - for(int i = 1; i <= replicas.size(); i++) { - int j = (previous + i)%replicas.size(); - if (replicas.get(j).isAlive()) { - primaryNodeIndex = j; - DatanodeDescriptor primary = replicas.get(j).getExpectedLocation(); - primary.addBlockToBeRecovered(this); - NameNode.blockStateChangeLog.info("BLOCK* " + this - + " recovery started, primary=" + primary); - return; + boolean allLiveReplicasTriedAsPrimary = true; + for (int i = 0; i < replicas.size(); i++) { + // Check if all replicas have been tried or not. + if (replicas.get(i).isAlive()) { + allLiveReplicasTriedAsPrimary = + (allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary()); } } + if (allLiveReplicasTriedAsPrimary) { + // Just set all the replicas to be chosen whether they are alive or not. + for (int i = 0; i < replicas.size(); i++) { + replicas.get(i).setChosenAsPrimary(false); + } + } + long mostRecentLastUpdate = 0; + ReplicaUnderConstruction primary = null; + primaryNodeIndex = -1; + for(int i = 0; i < replicas.size(); i++) { + // Skip alive replicas which have been chosen for recovery. + if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) { + continue; + } + if (replicas.get(i).getExpectedLocation().getLastUpdate() > mostRecentLastUpdate) { + primary = replicas.get(i); + primaryNodeIndex = i; + mostRecentLastUpdate = primary.getExpectedLocation().getLastUpdate(); + } + } + if (primary != null) { + primary.getExpectedLocation().addBlockToBeRecovered(this); + primary.setChosenAsPrimary(true); + NameNode.blockStateChangeLog.info("BLOCK* " + this + + " recovery started, primary=" + primary); + } } void addReplicaIfNotPresent(DatanodeDescriptor dn, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 3d7fd9ac09c..670bea82ec8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -213,7 +213,7 @@ public class DatanodeManager { " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " + "It should be a positive non-zero float value, not greater than 1.0f."); } - + private static long getStaleIntervalFromConf(Configuration conf, long heartbeatExpireInterval) { long staleInterval = conf.getLong( @@ -942,7 +942,7 @@ public class DatanodeManager { (numStaleNodes <= heartbeatManager.getLiveDatanodeCount() * ratioUseStaleDataNodesForWrite); } - + /** * @return The time interval used to mark DataNodes as stale. */ @@ -1160,7 +1160,7 @@ public class DatanodeManager { * failed. As a special case, the loopback address is also considered * acceptable. This is particularly important on Windows, where 127.0.0.1 does * not resolve to "localhost". - * + * * @param address InetAddress to check * @return boolean true if name resolution successful or address is loopback */ @@ -1194,7 +1194,7 @@ public class DatanodeManager { setDatanodeDead(nodeinfo); throw new DisallowedDatanodeException(nodeinfo); } - + if (nodeinfo == null || !nodeinfo.isAlive) { return new DatanodeCommand[]{RegisterCommand.REGISTER}; } @@ -1209,9 +1209,34 @@ public class DatanodeManager { BlockRecoveryCommand brCommand = new BlockRecoveryCommand( blocks.length); for (BlockInfoUnderConstruction b : blocks) { - brCommand.add(new RecoveringBlock( - new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b - .getBlockRecoveryId())); + DatanodeDescriptor[] expectedLocations = b.getExpectedLocations(); + // Skip stale nodes during recovery - not heart beated for some time (30s by default). + List recoveryLocations = + new ArrayList(expectedLocations.length); + for (int i = 0; i < expectedLocations.length; i++) { + if (!expectedLocations[i].isStale(this.staleInterval)) { + recoveryLocations.add(expectedLocations[i]); + } + } + // If we only get 1 replica after eliminating stale nodes, then choose all + // replicas for recovery and let the primary data node handle failures. + if (recoveryLocations.size() > 1) { + if (recoveryLocations.size() != expectedLocations.length) { + LOG.info("Skipped stale nodes for recovery : " + + (expectedLocations.length - recoveryLocations.size())); + } + brCommand.add(new RecoveringBlock( + new ExtendedBlock(blockPoolId, b), + recoveryLocations.toArray(new DatanodeDescriptor[recoveryLocations.size()]), + b.getBlockRecoveryId())); + } else { + // If too many replicas are stale, then choose all replicas to participate + // in block recovery. + brCommand.add(new RecoveringBlock( + new ExtendedBlock(blockPoolId, b), + expectedLocations, + b.getBlockRecoveryId())); + } } return new DatanodeCommand[] { brCommand }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 30f2bc9dd0d..f896c335f10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1072,7 +1072,10 @@ otherwise this may cause too frequent change of stale states. We thus set a minimum stale interval value (the default value is 3 times of heartbeat interval) and guarantee that the stale interval cannot be less - than the minimum value. + than the minimum value. A stale data node is avoided during lease/block + recovery. It can be conditionally avoided for reads (see + dfs.namenode.avoid.read.stale.datanode) and for writes (see + dfs.namenode.avoid.write.stale.datanode). diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java new file mode 100644 index 00000000000..cafc8227147 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.GenerationStamp; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; +import org.junit.Test; + +/** + * This class provides tests for BlockInfoUnderConstruction class + */ +public class TestBlockInfoUnderConstruction { + @Test + public void testInitializeBlockRecovery() throws Exception { + DatanodeDescriptor dd1 = DFSTestUtil.getDatanodeDescriptor("10.10.1.1", + "default"); + DatanodeDescriptor dd2 = DFSTestUtil.getDatanodeDescriptor("10.10.1.2", + "default"); + DatanodeDescriptor dd3 = DFSTestUtil.getDatanodeDescriptor("10.10.1.3", + "default"); + dd1.isAlive = dd2.isAlive = dd3.isAlive = true; + BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction( + new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), + 3, + BlockUCState.UNDER_CONSTRUCTION, + new DatanodeDescriptor[] {dd1, dd2, dd3}); + + // Recovery attempt #1. + long currentTime = System.currentTimeMillis(); + dd1.setLastUpdate(currentTime - 3 * 1000); + dd2.setLastUpdate(currentTime - 1 * 1000); + dd3.setLastUpdate(currentTime - 2 * 1000); + blockInfo.initializeBlockRecovery(1); + BlockInfoUnderConstruction[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1); + assertEquals(blockInfoRecovery[0], blockInfo); + + // Recovery attempt #2. + currentTime = System.currentTimeMillis(); + dd1.setLastUpdate(currentTime - 2 * 1000); + dd2.setLastUpdate(currentTime - 1 * 1000); + dd3.setLastUpdate(currentTime - 3 * 1000); + blockInfo.initializeBlockRecovery(2); + blockInfoRecovery = dd1.getLeaseRecoveryCommand(1); + assertEquals(blockInfoRecovery[0], blockInfo); + + // Recovery attempt #3. + currentTime = System.currentTimeMillis(); + dd1.setLastUpdate(currentTime - 2 * 1000); + dd2.setLastUpdate(currentTime - 1 * 1000); + dd3.setLastUpdate(currentTime - 3 * 1000); + currentTime = System.currentTimeMillis(); + blockInfo.initializeBlockRecovery(3); + blockInfoRecovery = dd3.getLeaseRecoveryCommand(1); + assertEquals(blockInfoRecovery[0], blockInfo); + + // Recovery attempt #4. + // Reset everything. And again pick DN with most recent heart beat. + currentTime = System.currentTimeMillis(); + dd1.setLastUpdate(currentTime - 2 * 1000); + dd2.setLastUpdate(currentTime - 1 * 1000); + dd3.setLastUpdate(currentTime); + currentTime = System.currentTimeMillis(); + blockInfo.initializeBlockRecovery(3); + blockInfoRecovery = dd3.getLeaseRecoveryCommand(1); + assertEquals(blockInfoRecovery[0], blockInfo); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java index e4038676a16..bbb83070a30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java @@ -20,17 +20,21 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.junit.Assert.assertEquals; import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.common.GenerationStamp; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -56,14 +60,12 @@ public class TestHeartbeatHandling { final HeartbeatManager hm = namesystem.getBlockManager( ).getDatanodeManager().getHeartbeatManager(); final String poolId = namesystem.getBlockPoolId(); - final DatanodeRegistration nodeReg = + final DatanodeRegistration nodeReg = DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId); - - final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, nodeReg); - + final int REMAINING_BLOCKS = 1; - final int MAX_REPLICATE_LIMIT = + final int MAX_REPLICATE_LIMIT = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 2); final int MAX_INVALIDATE_LIMIT = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT; final int MAX_INVALIDATE_BLOCKS = 2*MAX_INVALIDATE_LIMIT+REMAINING_BLOCKS; @@ -83,7 +85,7 @@ public class TestHeartbeatHandling { assertEquals(1, cmds.length); assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction()); assertEquals(MAX_REPLICATE_LIMIT, ((BlockCommand)cmds[0]).getBlocks().length); - + ArrayList blockList = new ArrayList(MAX_INVALIDATE_BLOCKS); for (int i=0; i mostRecentLastUpdate) { + expectedPrimary = datanodes[i]; + mostRecentLastUpdate = expectedPrimary.getLastUpdate(); + } + } return expectedPrimary; }