diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index c522e2604e7..51e12ec4337 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1916,23 +1916,29 @@ public class BlockManager implements BlockStatsMXBean { b.getReasonCode(), b.getStored().isStriped()); NumberReplicas numberOfReplicas = countNodes(b.getStored()); - boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= + final int numUsableReplicas = numberOfReplicas.liveReplicas() + + numberOfReplicas.decommissioning() + + numberOfReplicas.liveEnteringMaintenanceReplicas(); + boolean hasEnoughLiveReplicas = numUsableReplicas >= expectedRedundancies; boolean minReplicationSatisfied = hasMinStorage(b.getStored(), - numberOfReplicas.liveReplicas()); + numUsableReplicas); boolean hasMoreCorruptReplicas = minReplicationSatisfied && (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) > expectedRedundancies; boolean corruptedDuringWrite = minReplicationSatisfied && b.isCorruptedDuringWrite(); - // case 1: have enough number of live replicas - // case 2: corrupted replicas + live replicas > Replication factor + // case 1: have enough number of usable replicas + // case 2: corrupted replicas + usable replicas > Replication factor // case 3: Block is marked corrupt due to failure while writing. In this // case genstamp will be different than that of valid block. // In all these cases we can delete the replica. - // In case of 3, rbw block will be deleted and valid block can be replicated + // In case 3, rbw block will be deleted and valid block can be replicated. + // Note NN only becomes aware of corrupt blocks when the block report is sent, + // this means that by default it can take up to 6 hours for a corrupt block to + // be invalidated, after which the valid block can be replicated. if (hasEnoughLiveReplicas || hasMoreCorruptReplicas || corruptedDuringWrite) { if (b.getStored().isStriped()) { @@ -3656,7 +3662,7 @@ public class BlockManager implements BlockStatsMXBean { ". blockMap has {} but corrupt replicas map has {}", storedBlock, numCorruptNodes, corruptReplicasCount); } - if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileRedundancy)) { + if ((corruptReplicasCount > 0) && (numUsableReplicas >= fileRedundancy)) { invalidateCorruptReplicas(storedBlock, reportedBlock, num); } return storedBlock; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java index 670ca5fd9a6..0133d3aec37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java @@ -52,6 +52,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.Block; @@ -1902,4 +1903,285 @@ public class TestDecommission extends AdminStatesBaseTest { !BlockManagerTestUtil.isNodeHealthyForDecommissionOrMaintenance(blockManager, node) && !node.isAlive()), 500, 20000); } + + /* + This test reproduces a scenario where an under-replicated block on a decommissioning node + cannot be replicated to some datanodes because they have a corrupt replica of the block. + The test ensures that the corrupt replicas are eventually invalidated so that the + under-replicated block can be replicated to sufficient datanodes & the decommissioning + node can be decommissioned. + */ + @Test(timeout = 60000) + public void testDeleteCorruptReplicaForUnderReplicatedBlock() throws Exception { + // Constants + final Path file = new Path("/test-file"); + final int numDatanode = 3; + final short replicationFactor = 2; + final int numStoppedNodes = 2; + final int numDecommNodes = 1; + assertEquals(numDatanode, numStoppedNodes + numDecommNodes); + + // Run monitor every 5 seconds to speed up decommissioning & make the test faster + final int datanodeAdminMonitorFixedRateSeconds = 5; + getConf().setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY, + datanodeAdminMonitorFixedRateSeconds); + // Set block report interval to 6 hours to avoid unexpected block reports. + // The default block report interval is different for a MiniDFSCluster + getConf().setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); + // Run the BlockManager RedundancyMonitor every 3 seconds such that the Namenode + // sends under-replication blocks for replication frequently + getConf().setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, + DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT); + // Ensure that the DataStreamer client will replace the bad datanode on append failure + getConf().set(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY, "ALWAYS"); + // Avoid having the DataStreamer client fail the append operation if datanode replacement fails + getConf() + .setBoolean(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, true); + + // References to datanodes in the cluster + // - 2 datanode will be stopped to generate corrupt block replicas & then + // restarted later to validate the corrupt replicas are invalidated + // - 1 datanode will start decommissioning to make the block under replicated + final List allNodes = new ArrayList<>(); + final List stoppedNodes = new ArrayList<>(); + final DatanodeDescriptor decommNode; + + // Create MiniDFSCluster + startCluster(1, numDatanode); + getCluster().waitActive(); + final FSNamesystem namesystem = getCluster().getNamesystem(); + final BlockManager blockManager = namesystem.getBlockManager(); + final DatanodeManager datanodeManager = blockManager.getDatanodeManager(); + final DatanodeAdminManager decomManager = datanodeManager.getDatanodeAdminManager(); + final FileSystem fs = getCluster().getFileSystem(); + + // Get DatanodeDescriptors + for (final DataNode node : getCluster().getDataNodes()) { + allNodes.add(getDatanodeDesriptor(namesystem, node.getDatanodeUuid())); + } + + // Create block with 2 FINALIZED replicas + // Note that: + // - calling hflush leaves block in state ReplicaBeingWritten + // - calling close leaves the block in state FINALIZED + // - amount of data is kept small because flush is not synchronous + LOG.info("Creating Initial Block with {} FINALIZED replicas", replicationFactor); + FSDataOutputStream out = fs.create(file, replicationFactor); + for (int i = 0; i < 512; i++) { + out.write(i); + } + out.close(); + + // Validate the block exists with expected number of replicas + assertEquals(1, blockManager.getTotalBlocks()); + BlockLocation[] blocksInFile = fs.getFileBlockLocations(file, 0, 0); + assertEquals(1, blocksInFile.length); + List replicasInBlock = Arrays.asList(blocksInFile[0].getNames()); + assertEquals(replicationFactor, replicasInBlock.size()); + + // Identify the DatanodeDescriptors associated with the 2 nodes with replicas. + // Each of nodes with a replica will be stopped later to corrupt the replica + DatanodeDescriptor decommNodeTmp = null; + for (DatanodeDescriptor node : allNodes) { + if (replicasInBlock.contains(node.getName())) { + stoppedNodes.add(node); + } else { + decommNodeTmp = node; + } + } + assertEquals(numStoppedNodes, stoppedNodes.size()); + assertNotNull(decommNodeTmp); + decommNode = decommNodeTmp; + final DatanodeDescriptor firstStoppedNode = stoppedNodes.get(0); + final DatanodeDescriptor secondStoppedNode = stoppedNodes.get(1); + LOG.info("Detected 2 nodes with replicas : {} , {}", firstStoppedNode.getXferAddr(), + secondStoppedNode.getXferAddr()); + LOG.info("Detected 1 node without replica : {}", decommNode.getXferAddr()); + + // Stop firstStoppedNode & the append to the block pipeline such that DataStreamer client: + // - detects firstStoppedNode as bad link in block pipeline + // - replaces the firstStoppedNode with decommNode in block pipeline + // The result is that: + // - secondStoppedNode & decommNode have a live block replica + // - firstStoppedNode has a corrupt replica (corrupt because of old GenStamp) + LOG.info("Stopping first node with replica {}", firstStoppedNode.getXferAddr()); + final List stoppedNodeProps = new ArrayList<>(); + MiniDFSCluster.DataNodeProperties stoppedNodeProp = + getCluster().stopDataNode(firstStoppedNode.getXferAddr()); + stoppedNodeProps.add(stoppedNodeProp); + firstStoppedNode.setLastUpdate(213); // Set last heartbeat to be in the past + // Wait for NN to detect the datanode as dead + GenericTestUtils.waitFor( + () -> 2 == datanodeManager.getNumLiveDataNodes() && 1 == datanodeManager + .getNumDeadDataNodes(), 500, 30000); + // Append to block pipeline + appendBlock(fs, file, 2); + + // Stop secondStoppedNode & the append to the block pipeline such that DataStreamer client: + // - detects secondStoppedNode as bad link in block pipeline + // - attempts to replace secondStoppedNode but cannot because there are no more live nodes + // - appends to the block pipeline containing just decommNode + // The result is that: + // - decommNode has a live block replica + // - firstStoppedNode & secondStoppedNode both have a corrupt replica + LOG.info("Stopping second node with replica {}", secondStoppedNode.getXferAddr()); + stoppedNodeProp = getCluster().stopDataNode(secondStoppedNode.getXferAddr()); + stoppedNodeProps.add(stoppedNodeProp); + secondStoppedNode.setLastUpdate(213); // Set last heartbeat to be in the past + // Wait for NN to detect the datanode as dead + GenericTestUtils.waitFor(() -> numDecommNodes == datanodeManager.getNumLiveDataNodes() + && numStoppedNodes == datanodeManager.getNumDeadDataNodes(), 500, 30000); + // Append to block pipeline + appendBlock(fs, file, 1); + + // Validate block replica locations + blocksInFile = fs.getFileBlockLocations(file, 0, 0); + assertEquals(1, blocksInFile.length); + replicasInBlock = Arrays.asList(blocksInFile[0].getNames()); + assertEquals(numDecommNodes, replicasInBlock.size()); + assertTrue(replicasInBlock.contains(decommNode.getName())); + LOG.info("Block now has 2 corrupt replicas on [{} , {}] and 1 live replica on {}", + firstStoppedNode.getXferAddr(), secondStoppedNode.getXferAddr(), decommNode.getXferAddr()); + + LOG.info("Decommission node {} with the live replica", decommNode.getXferAddr()); + final ArrayList decommissionedNodes = new ArrayList<>(); + takeNodeOutofService(0, decommNode.getDatanodeUuid(), 0, decommissionedNodes, + AdminStates.DECOMMISSION_INPROGRESS); + + // Wait for the datanode to start decommissioning + try { + GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == 0 + && decomManager.getNumPendingNodes() == numDecommNodes && decommNode.getAdminState() + .equals(AdminStates.DECOMMISSION_INPROGRESS), 500, 30000); + } catch (Exception e) { + blocksInFile = fs.getFileBlockLocations(file, 0, 0); + assertEquals(1, blocksInFile.length); + replicasInBlock = Arrays.asList(blocksInFile[0].getNames()); + String errMsg = String.format("Node %s failed to start decommissioning." + + " numTrackedNodes=%d , numPendingNodes=%d , adminState=%s , nodesWithReplica=[%s]", + decommNode.getXferAddr(), decomManager.getNumTrackedNodes(), + decomManager.getNumPendingNodes(), decommNode.getAdminState(), + String.join(", ", replicasInBlock)); + LOG.error(errMsg); // Do not log generic timeout exception + fail(errMsg); + } + + // Validate block replica locations + blocksInFile = fs.getFileBlockLocations(file, 0, 0); + assertEquals(1, blocksInFile.length); + replicasInBlock = Arrays.asList(blocksInFile[0].getNames()); + assertEquals(numDecommNodes, replicasInBlock.size()); + assertEquals(replicasInBlock.get(0), decommNode.getName()); + LOG.info("Block now has 2 corrupt replicas on [{} , {}] and 1 decommissioning replica on {}", + firstStoppedNode.getXferAddr(), secondStoppedNode.getXferAddr(), decommNode.getXferAddr()); + + // Restart the 2 stopped datanodes + LOG.info("Restarting stopped nodes {} , {}", firstStoppedNode.getXferAddr(), + secondStoppedNode.getXferAddr()); + for (final MiniDFSCluster.DataNodeProperties stoppedNode : stoppedNodeProps) { + assertTrue(getCluster().restartDataNode(stoppedNode)); + } + for (final MiniDFSCluster.DataNodeProperties stoppedNode : stoppedNodeProps) { + try { + getCluster().waitDatanodeFullyStarted(stoppedNode.getDatanode(), 30000); + LOG.info("Node {} Restarted", stoppedNode.getDatanode().getXferAddress()); + } catch (Exception e) { + String errMsg = String.format("Node %s Failed to Restart within 30 seconds", + stoppedNode.getDatanode().getXferAddress()); + LOG.error(errMsg); // Do not log generic timeout exception + fail(errMsg); + } + } + + // Trigger block reports for the 2 restarted nodes to ensure their corrupt + // block replicas are identified by the namenode + for (MiniDFSCluster.DataNodeProperties dnProps : stoppedNodeProps) { + DataNodeTestUtils.triggerBlockReport(dnProps.getDatanode()); + } + + // Validate the datanode is eventually decommissioned + // Some changes are needed to ensure replication/decommissioning occur in a timely manner: + // - if the namenode sends a DNA_TRANSFER before sending the DNA_INVALIDATE's then: + // - the block will enter the pendingReconstruction queue + // - this prevent the block from being sent for transfer again for some time + // - solution is to call "clearQueues" so that DNA_TRANSFER is sent again after DNA_INVALIDATE + // - need to run the check less frequently than DatanodeAdminMonitor + // such that in between "clearQueues" calls 2 things can occur: + // - DatanodeAdminMonitor runs which sets the block as neededReplication + // - datanode heartbeat is received which sends the DNA_TRANSFER to the node + final int checkEveryMillis = datanodeAdminMonitorFixedRateSeconds * 2 * 1000; + try { + GenericTestUtils.waitFor(() -> { + blockManager.clearQueues(); // Clear pendingReconstruction queue + return decomManager.getNumTrackedNodes() == 0 && decomManager.getNumPendingNodes() == 0 + && decommNode.getAdminState().equals(AdminStates.DECOMMISSIONED); + }, checkEveryMillis, 40000); + } catch (Exception e) { + blocksInFile = fs.getFileBlockLocations(file, 0, 0); + assertEquals(1, blocksInFile.length); + replicasInBlock = Arrays.asList(blocksInFile[0].getNames()); + String errMsg = String.format("Node %s failed to complete decommissioning." + + " numTrackedNodes=%d , numPendingNodes=%d , adminState=%s , nodesWithReplica=[%s]", + decommNode.getXferAddr(), decomManager.getNumTrackedNodes(), + decomManager.getNumPendingNodes(), decommNode.getAdminState(), + String.join(", ", replicasInBlock)); + LOG.error(errMsg); // Do not log generic timeout exception + fail(errMsg); + } + + // Validate block replica locations. + // Note that in order for decommissioning to complete the block must be + // replicated to both of the restarted datanodes; this implies that the + // corrupt replicas were invalidated on both of the restarted datanodes. + blocksInFile = fs.getFileBlockLocations(file, 0, 0); + assertEquals(1, blocksInFile.length); + replicasInBlock = Arrays.asList(blocksInFile[0].getNames()); + assertEquals(numDatanode, replicasInBlock.size()); + assertTrue(replicasInBlock.contains(decommNode.getName())); + for (final DatanodeDescriptor node : stoppedNodes) { + assertTrue(replicasInBlock.contains(node.getName())); + } + LOG.info("Block now has 2 live replicas on [{} , {}] and 1 decommissioned replica on {}", + firstStoppedNode.getXferAddr(), secondStoppedNode.getXferAddr(), decommNode.getXferAddr()); + } + + void appendBlock(final FileSystem fs, final Path file, int expectedReplicas) throws IOException { + LOG.info("Appending to the block pipeline"); + boolean failed = false; + Exception failedReason = null; + try { + FSDataOutputStream out = fs.append(file); + for (int i = 0; i < 512; i++) { + out.write(i); + } + out.close(); + } catch (Exception e) { + failed = true; + failedReason = e; + } finally { + BlockLocation[] blocksInFile = fs.getFileBlockLocations(file, 0, 0); + assertEquals(1, blocksInFile.length); + List replicasInBlock = Arrays.asList(blocksInFile[0].getNames()); + if (failed) { + String errMsg = String.format( + "Unexpected exception appending to the block pipeline." + + " nodesWithReplica=[%s]", String.join(", ", replicasInBlock)); + LOG.error(errMsg, failedReason); // Do not swallow the exception + fail(errMsg); + } else if (expectedReplicas != replicasInBlock.size()) { + String errMsg = String.format("Expecting %d replicas in block pipeline," + + " unexpectedly found %d replicas. nodesWithReplica=[%s]", expectedReplicas, + replicasInBlock.size(), String.join(", ", replicasInBlock)); + LOG.error(errMsg); + fail(errMsg); + } else { + String infoMsg = String.format( + "Successfully appended block pipeline with %d replicas." + + " nodesWithReplica=[%s]", + replicasInBlock.size(), String.join(", ", replicasInBlock)); + LOG.info(infoMsg); + } + } + } }