diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6d0b6c02bd2..e57e67ab0a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -145,7 +145,9 @@ Trunk (unreleased changes) HDFS-3163. TestHDFSCLI.testAll fails if the user name is not all lowercase. (Brandon Li via atm) - + + HDFS-3368. Missing blocks due to bad DataNodes coming up and down. (shv) + Release 2.0.1-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index abe14afe687..309a72bb2e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -107,6 +107,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT = 40000; public static final String DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY = "dfs.namenode.heartbeat.recheck-interval"; public static final int DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT = 5*60*1000; + public static final String DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY = "dfs.namenode.tolerate.heartbeat.multiplier"; + public static final int DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT = 4; public static final String DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY = "dfs.client.https.keystore.resource"; public static final String DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-client.xml"; public static final String DFS_CLIENT_HTTPS_NEED_AUTH_KEY = "dfs.client.https.need-auth"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index a1e7a208ec7..6995a2e30f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import static org.apache.hadoop.hdfs.server.common.Util.now; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -25,6 +27,7 @@ import java.util.List; import java.util.Set; import java.util.TreeSet; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -49,13 +52,19 @@ import com.google.common.annotations.VisibleForTesting; */ @InterfaceAudience.Private public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { + private static final String enableDebugLogging = + "For more information, please enable DEBUG log level on " + + ((Log4JLogger)LOG).getLogger().getName(); + private boolean considerLoad; private boolean preferLocalNode = true; private NetworkTopology clusterMap; private FSClusterStats stats; - static final String enableDebugLogging = "For more information, please enable" - + " DEBUG level logging on the " - + "org.apache.hadoop.hdfs.server.namenode.FSNamesystem logger."; + private long heartbeatInterval; // interval for DataNode heartbeats + /** + * A miss of that many heartbeats is tolerated for replica deletion policy. + */ + private int tolerateHeartbeatMultiplier; BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap) { @@ -71,6 +80,12 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true); this.stats = stats; this.clusterMap = clusterMap; + this.heartbeatInterval = conf.getLong( + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000; + this.tolerateHeartbeatMultiplier = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, + DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT); } private ThreadLocal threadLocalBuilder = @@ -551,24 +566,33 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { short replicationFactor, Collection first, Collection second) { + long oldestHeartbeat = + now() - heartbeatInterval * tolerateHeartbeatMultiplier; + DatanodeDescriptor oldestHeartbeatNode = null; long minSpace = Long.MAX_VALUE; - DatanodeDescriptor cur = null; + DatanodeDescriptor minSpaceNode = null; // pick replica from the first Set. If first is empty, then pick replicas // from second set. Iterator iter = first.isEmpty() ? second.iterator() : first.iterator(); - // pick node with least free space + // Pick the node with the oldest heartbeat or with the least free space, + // if all hearbeats are within the tolerable heartbeat interval while (iter.hasNext() ) { DatanodeDescriptor node = iter.next(); long free = node.getRemaining(); + long lastHeartbeat = node.getLastUpdate(); + if(lastHeartbeat < oldestHeartbeat) { + oldestHeartbeat = lastHeartbeat; + oldestHeartbeatNode = node; + } if (minSpace > free) { minSpace = free; - cur = node; + minSpaceNode = node; } } - return cur; + return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode; } @VisibleForTesting diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java index 0125b021e96..e9abc21199f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java @@ -17,12 +17,15 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import static org.apache.hadoop.hdfs.server.common.Util.now; import static org.junit.Assert.*; import java.io.File; import java.io.IOException; +import java.util.Collection; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -32,11 +35,14 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.TestDatanodeBlockScanner; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.junit.Test; public class TestOverReplicatedBlocks { @@ -116,6 +122,77 @@ public class TestOverReplicatedBlocks { cluster.shutdown(); } } + + static final long SMALL_BLOCK_SIZE = + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; + static final long SMALL_FILE_LENGTH = SMALL_BLOCK_SIZE * 4; + + /** + * The test verifies that replica for deletion is chosen on a node, + * with the oldest heartbeat, when this heartbeat is larger than the + * tolerable heartbeat interval. + * It creates a file with several blocks and replication 4. + * The last DN is configured to send heartbeats rarely. + * + * Test waits until the tolerable heartbeat interval expires, and reduces + * replication of the file. All replica deletions should be scheduled for the + * last node. No replicas will actually be deleted, since last DN doesn't + * send heartbeats. + */ + @Test + public void testChooseReplicaToDelete() throws IOException { + MiniDFSCluster cluster = null; + FileSystem fs = null; + try { + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, SMALL_BLOCK_SIZE); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + fs = cluster.getFileSystem(); + final FSNamesystem namesystem = cluster.getNamesystem(); + + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 300); + cluster.startDataNodes(conf, 1, true, null, null, null); + DataNode lastDN = cluster.getDataNodes().get(3); + DatanodeRegistration dnReg = DataNodeTestUtils.getDNRegistrationForBP( + lastDN, namesystem.getBlockPoolId()); + String lastDNid = dnReg.getStorageID(); + + final Path fileName = new Path("/foo2"); + DFSTestUtil.createFile(fs, fileName, SMALL_FILE_LENGTH, (short)4, 0L); + DFSTestUtil.waitReplication(fs, fileName, (short)4); + + // Wait for tolerable number of heartbeats plus one + DatanodeDescriptor nodeInfo = null; + long lastHeartbeat = 0; + long waitTime = DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT * 1000 * + (DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT + 1); + do { + nodeInfo = + namesystem.getBlockManager().getDatanodeManager().getDatanode(dnReg); + lastHeartbeat = nodeInfo.getLastUpdate(); + } while(now() - lastHeartbeat < waitTime); + fs.setReplication(fileName, (short)3); + + BlockLocation locs[] = fs.getFileBlockLocations( + fs.getFileStatus(fileName), 0, Long.MAX_VALUE); + + // All replicas for deletion should be scheduled on lastDN. + // And should not actually be deleted, because lastDN does not heartbeat. + namesystem.readLock(); + Collection dnBlocks = + namesystem.getBlockManager().excessReplicateMap.get(lastDNid); + assertEquals("Replicas on node " + lastDNid + " should have been deleted", + SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks.size()); + namesystem.readUnlock(); + for(BlockLocation location : locs) + assertEquals("Block should still have 4 replicas", + 4, location.getNames().length); + } finally { + if(fs != null) fs.close(); + if(cluster != null) cluster.shutdown(); + } + } + /** * Test over replicated block should get invalidated when decreasing the * replication for a partial block.