From 603293da9283bcf55f6ce329da6de9ea7e138643 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Wed, 15 Oct 2014 20:44:24 -0700 Subject: [PATCH] HDFS-7208. NN doesn't schedule replication when a DN storage fails. Contributed by Ming Ma --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/blockmanagement/BlockManager.java | 13 ++++ .../BlockPlacementPolicyDefault.java | 6 ++ .../blockmanagement/DatanodeDescriptor.java | 65 ++++++++++++++++++- .../blockmanagement/DatanodeStorageInfo.java | 10 ++- .../blockmanagement/HeartbeatManager.java | 50 +++++++++++++- .../hdfs/server/protocol/DatanodeStorage.java | 4 +- .../blockmanagement/BlockManagerTestUtil.java | 21 +++++- .../datanode/TestDataNodeVolumeFailure.java | 49 ++++++++++++-- 9 files changed, 209 insertions(+), 12 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 0496cf01291..e313cc3e367 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -607,6 +607,9 @@ Release 2.6.0 - UNRELEASED HDFS-7185. The active NameNode will not accept an fsimage sent from the standby during rolling upgrade. (jing9) + HDFS-7208. NN doesn't schedule replication when a DN storage fails. + (Ming Ma via szetszwo) + BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS HDFS-6387. HDFS CLI admin tool for creating & deleting an diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 12afa4fdb37..c4dfa68ecc8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1058,6 +1058,19 @@ public class BlockManager { } } + /** Remove the blocks associated to the given DatanodeStorageInfo. */ + void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) { + assert namesystem.hasWriteLock(); + final Iterator it = storageInfo.getBlockIterator(); + DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); + while(it.hasNext()) { + Block block = it.next(); + removeStoredBlock(block, node); + invalidateBlocks.remove(node, block); + } + namesystem.checkSafeMode(); + } + /** * Adds block to list of blocks which will be invalidated on specified * datanode and log the operation diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 66d72d082eb..99f509e8d04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -722,6 +722,12 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { logNodeIsNotChosen(storage, "storage is read-only"); return false; } + + if (storage.getState() == State.FAILED) { + logNodeIsNotChosen(storage, "storage has failed"); + return false; + } + DatanodeDescriptor node = storage.getDatanodeDescriptor(); // check if the node is (being) decommissioned if (node.isDecommissionInProgress() || node.isDecommissioned()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index eaea77daeca..a3a7b0dde1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -21,11 +21,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; import com.google.common.annotations.VisibleForTesting; @@ -224,13 +226,16 @@ public class DatanodeDescriptor extends DatanodeInfo { // The number of replication work pending before targets are determined private int PendingReplicationWithoutTargets = 0; + // HB processing can use it to tell if it is the first HB since DN restarted + private boolean heartbeatedSinceRegistration = false; + /** * DatanodeDescriptor constructor * @param nodeID id of the data node */ public DatanodeDescriptor(DatanodeID nodeID) { super(nodeID); - updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0); + updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0); } /** @@ -241,7 +246,7 @@ public class DatanodeDescriptor extends DatanodeInfo { public DatanodeDescriptor(DatanodeID nodeID, String networkLocation) { super(nodeID, networkLocation); - updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0); + updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0); } @VisibleForTesting @@ -343,10 +348,48 @@ public class DatanodeDescriptor extends DatanodeInfo { */ public void updateHeartbeat(StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int volFailures) { + updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount, + volFailures); + heartbeatedSinceRegistration = true; + } + + /** + * process datanode heartbeat or stats initialization. + */ + public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity, + long cacheUsed, int xceiverCount, int volFailures) { long totalCapacity = 0; long totalRemaining = 0; long totalBlockPoolUsed = 0; long totalDfsUsed = 0; + Set failedStorageInfos = null; + + // Decide if we should check for any missing StorageReport and mark it as + // failed. There are different scenarios. + // 1. When DN is running, a storage failed. Given the current DN + // implementation doesn't add recovered storage back to its storage list + // until DN restart, we can assume volFailures won't decrease + // during the current DN registration session. + // When volumeFailures == this.volumeFailures, it implies there is no + // state change. No need to check for failed storage. This is an + // optimization. + // 2. After DN restarts, volFailures might not increase and it is possible + // we still have new failed storage. For example, admins reduce + // available storages in configuration. Another corner case + // is the failed volumes might change after restart; a) there + // is one good storage A, one restored good storage B, so there is + // one element in storageReports and that is A. b) A failed. c) Before + // DN sends HB to NN to indicate A has failed, DN restarts. d) After DN + // restarts, storageReports has one element which is B. + boolean checkFailedStorages = (volFailures > this.volumeFailures) || + !heartbeatedSinceRegistration; + + if (checkFailedStorages) { + LOG.info("Number of failed storage changes from " + + this.volumeFailures + " to " + volFailures); + failedStorageInfos = new HashSet( + storageMap.values()); + } setCacheCapacity(cacheCapacity); setCacheUsed(cacheUsed); @@ -355,6 +398,10 @@ public class DatanodeDescriptor extends DatanodeInfo { this.volumeFailures = volFailures; for (StorageReport report : reports) { DatanodeStorageInfo storage = updateStorage(report.getStorage()); + if (checkFailedStorages) { + failedStorageInfos.remove(storage); + } + storage.receivedHeartbeat(report); totalCapacity += report.getCapacity(); totalRemaining += report.getRemaining(); @@ -368,6 +415,19 @@ public class DatanodeDescriptor extends DatanodeInfo { setRemaining(totalRemaining); setBlockPoolUsed(totalBlockPoolUsed); setDfsUsed(totalDfsUsed); + if (checkFailedStorages) { + updateFailedStorage(failedStorageInfos); + } + } + + private void updateFailedStorage( + Set failedStorageInfos) { + for (DatanodeStorageInfo storageInfo : failedStorageInfos) { + if (storageInfo.getState() != DatanodeStorage.State.FAILED) { + LOG.info(storageInfo + " failed."); + storageInfo.setState(DatanodeStorage.State.FAILED); + } + } } private static class BlockIterator implements Iterator { @@ -641,6 +701,7 @@ public class DatanodeDescriptor extends DatanodeInfo { for(DatanodeStorageInfo storage : getStorageInfos()) { storage.setBlockReportCount(0); } + heartbeatedSinceRegistration = false; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 58ca2ace254..8c44b3043fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -182,7 +182,15 @@ public class DatanodeStorageInfo { State getState() { return this.state; } - + + void setState(State state) { + this.state = state; + } + + boolean areBlocksOnFailedStorage() { + return getState() == State.FAILED && numBlocks != 0; + } + String getStorageID() { return storageID; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index 4fb538595cb..e53d334e2d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -192,7 +192,7 @@ class HeartbeatManager implements DatanodeStatistics { addDatanode(d); //update its timestamp - d.updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0); + d.updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0); } } @@ -242,6 +242,25 @@ class HeartbeatManager implements DatanodeStatistics { * While removing dead datanodes, make sure that only one datanode is marked * dead at a time within the synchronized section. Otherwise, a cascading * effect causes more datanodes to be declared dead. + * Check if there are any failed storage and if so, + * Remove all the blocks on the storage. It also covers the following less + * common scenarios. After DatanodeStorage is marked FAILED, it is still + * possible to receive IBR for this storage. + * 1) DN could deliver IBR for failed storage due to its implementation. + * a) DN queues a pending IBR request. + * b) The storage of the block fails. + * c) DN first sends HB, NN will mark the storage FAILED. + * d) DN then sends the pending IBR request. + * 2) SBN processes block request from pendingDNMessages. + * It is possible to have messages in pendingDNMessages that refer + * to some failed storage. + * a) SBN receives a IBR and put it in pendingDNMessages. + * b) The storage of the block fails. + * c) Edit log replay get the IBR from pendingDNMessages. + * Alternatively, we can resolve these scenarios with the following approaches. + * A. Make sure DN don't deliver IBR for failed storage. + * B. Remove all blocks in PendingDataNodeMessages for the failed storage + * when we remove all blocks from BlocksMap for that storage. */ void heartbeatCheck() { final DatanodeManager dm = blockManager.getDatanodeManager(); @@ -254,6 +273,10 @@ class HeartbeatManager implements DatanodeStatistics { while (!allAlive) { // locate the first dead node. DatanodeID dead = null; + + // locate the first failed storage that isn't on a dead node. + DatanodeStorageInfo failedStorage = null; + // check the number of stale nodes int numOfStaleNodes = 0; int numOfStaleStorages = 0; @@ -271,7 +294,14 @@ class HeartbeatManager implements DatanodeStatistics { if (storageInfo.areBlockContentsStale()) { numOfStaleStorages++; } + + if (failedStorage == null && + storageInfo.areBlocksOnFailedStorage() && + d != dead) { + failedStorage = storageInfo; + } } + } // Set the number of stale nodes in the DatanodeManager @@ -279,8 +309,8 @@ class HeartbeatManager implements DatanodeStatistics { dm.setNumStaleStorages(numOfStaleStorages); } - allAlive = dead == null; - if (!allAlive) { + allAlive = dead == null && failedStorage == null; + if (dead != null) { // acquire the fsnamesystem lock, and then remove the dead node. namesystem.writeLock(); try { @@ -294,6 +324,20 @@ class HeartbeatManager implements DatanodeStatistics { namesystem.writeUnlock(); } } + if (failedStorage != null) { + // acquire the fsnamesystem lock, and remove blocks on the storage. + namesystem.writeLock(); + try { + if (namesystem.isInStartupSafeMode()) { + return; + } + synchronized(this) { + blockManager.removeBlocksAssociatedTo(failedStorage); + } + } finally { + namesystem.writeUnlock(); + } + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java index 63303ebf1f1..4fe07b9b4b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java @@ -39,7 +39,9 @@ public class DatanodeStorage { * property should be used for debugging purposes only. *

*/ - READ_ONLY_SHARED; + READ_ONLY_SHARED, + + FAILED; } private final String storageID; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 852c80b7801..2755b2989dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -213,7 +213,26 @@ public class BlockManagerTestUtil { public static void checkHeartbeat(BlockManager bm) { bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck(); } - + + /** + * Call heartbeat check function of HeartbeatManager and get + * under replicated blocks count within write lock to make sure + * computeDatanodeWork doesn't interfere. + * @param namesystem the FSNamesystem + * @param bm the BlockManager to manipulate + * @return the number of under replicated blocks + */ + public static int checkHeartbeatAndGetUnderReplicatedBlocksCount( + FSNamesystem namesystem, BlockManager bm) { + namesystem.writeLock(); + try { + bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck(); + return bm.getUnderReplicatedNotMissingBlocks(); + } finally { + namesystem.writeUnlock(); + } + } + public static DatanodeStorageInfo updateStorage(DatanodeDescriptor dn, DatanodeStorage s) { return dn.updateStorage(s); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index b1172a0806f..8429055a209 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; import java.io.File; import java.io.FilenameFilter; @@ -52,6 +54,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; @@ -79,7 +82,8 @@ public class TestDataNodeVolumeFailure { File dataDir = null; File data_fail = null; File failedDir = null; - + private FileSystem fs; + // mapping blocks to Meta files(physical files) and locs(NameNode locations) private class BlockLocs { public int num_files = 0; @@ -97,6 +101,8 @@ public class TestDataNodeVolumeFailure { conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dn_num).build(); cluster.waitActive(); + fs = cluster.getFileSystem(); + dataDir = new File(cluster.getDataDirectory()); } @After @@ -110,6 +116,10 @@ public class TestDataNodeVolumeFailure { if(cluster != null) { cluster.shutdown(); } + for (int i = 0; i < 3; i++) { + FileUtil.setExecutable(new File(dataDir, "data"+(2*i+1)), true); + FileUtil.setExecutable(new File(dataDir, "data"+(2*i+2)), true); + } } /* @@ -119,8 +129,6 @@ public class TestDataNodeVolumeFailure { */ @Test public void testVolumeFailure() throws Exception { - FileSystem fs = cluster.getFileSystem(); - dataDir = new File(cluster.getDataDirectory()); System.out.println("Data dir: is " + dataDir.getPath()); @@ -191,7 +199,40 @@ public class TestDataNodeVolumeFailure { System.out.println("file " + fileName1.getName() + " is created and replicated"); } - + + /** + * Test that there are under replication blocks after vol failures + */ + @Test + public void testUnderReplicationAfterVolFailure() throws Exception { + // Bring up one more datanode + cluster.startDataNodes(conf, 1, true, null, null); + cluster.waitActive(); + + final BlockManager bm = cluster.getNamesystem().getBlockManager(); + + Path file1 = new Path("/test1"); + DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L); + DFSTestUtil.waitReplication(fs, file1, (short)3); + + // Fail the first volume on both datanodes + File dn1Vol1 = new File(dataDir, "data"+(2*0+1)); + File dn2Vol1 = new File(dataDir, "data"+(2*1+1)); + assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn1Vol1, false)); + assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn2Vol1, false)); + + Path file2 = new Path("/test2"); + DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L); + DFSTestUtil.waitReplication(fs, file2, (short)3); + + // underReplicatedBlocks are due to failed volumes + int underReplicatedBlocks = + BlockManagerTestUtil.checkHeartbeatAndGetUnderReplicatedBlocksCount( + cluster.getNamesystem(), bm); + assertTrue("There is no under replicated block after volume failure", + underReplicatedBlocks > 0); + } + /** * verifies two things: * 1. number of locations of each block in the name node