From d20b598f97e76c67d6103a950ea9e89644be2c41 Mon Sep 17 00:00:00 2001 From: KevinWikant <94480406+KevinWikant@users.noreply.github.com> Date: Thu, 23 Dec 2021 08:59:08 -0500 Subject: [PATCH] HDFS-16303. Improve handling of datanode lost while decommissioning (#3675) Co-authored-by: Kevin Wikant Signed-off-by: Akira Ajisaka --- .../DatanodeAdminBackoffMonitor.java | 32 ++- .../DatanodeAdminDefaultMonitor.java | 38 ++- .../DatanodeAdminMonitorBase.java | 45 +++- .../hadoop/hdfs/AdminStatesBaseTest.java | 2 +- .../apache/hadoop/hdfs/TestDecommission.java | 237 +++++++++++++++++- .../blockmanagement/BlockManagerTestUtil.java | 10 + .../TestDatanodeAdminMonitorBase.java | 92 +++++++ 7 files changed, 438 insertions(+), 18 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeAdminMonitorBase.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java index c04f3daabf7..eb65b3843a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java @@ -34,6 +34,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.ArrayDeque; import java.util.Queue; +import java.util.stream.Collectors; /** * This class implements the logic to track decommissioning and entering @@ -149,7 +150,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase */ @Override public void stopTrackingNode(DatanodeDescriptor dn) { - pendingNodes.remove(dn); + getPendingNodes().remove(dn); cancelledNodes.add(dn); } @@ -189,6 +190,29 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase * node will be removed from tracking by the pending cancel. */ processCancelledNodes(); + + // Having more nodes decommissioning than can be tracked will impact decommissioning + // performance due to queueing delay + int numTrackedNodes = outOfServiceNodeBlocks.size(); + int numQueuedNodes = getPendingNodes().size(); + int numDecommissioningNodes = numTrackedNodes + numQueuedNodes; + if (numDecommissioningNodes > maxConcurrentTrackedNodes) { + LOG.warn( + "{} nodes are decommissioning but only {} nodes will be tracked at a time. " + + "{} nodes are currently queued waiting to be decommissioned.", + numDecommissioningNodes, maxConcurrentTrackedNodes, numQueuedNodes); + + // Re-queue unhealthy nodes to make space for decommissioning healthy nodes + final List unhealthyDns = outOfServiceNodeBlocks.keySet().stream() + .filter(dn -> !blockManager.isNodeHealthyForDecommissionOrMaintenance(dn)) + .collect(Collectors.toList()); + getUnhealthyNodesToRequeue(unhealthyDns, numDecommissioningNodes).forEach(dn -> { + getPendingNodes().add(dn); + outOfServiceNodeBlocks.remove(dn); + pendingRep.remove(dn); + }); + } + processPendingNodes(); } finally { namesystem.writeUnlock(); @@ -207,7 +231,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase LOG.info("Checked {} blocks this tick. {} nodes are now " + "in maintenance or transitioning state. {} nodes pending. {} " + "nodes waiting to be cancelled.", - numBlocksChecked, outOfServiceNodeBlocks.size(), pendingNodes.size(), + numBlocksChecked, outOfServiceNodeBlocks.size(), getPendingNodes().size(), cancelledNodes.size()); } } @@ -220,10 +244,10 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase * the pendingNodes list from being modified externally. */ private void processPendingNodes() { - while (!pendingNodes.isEmpty() && + while (!getPendingNodes().isEmpty() && (maxConcurrentTrackedNodes == 0 || outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) { - outOfServiceNodeBlocks.put(pendingNodes.poll(), null); + outOfServiceNodeBlocks.put(getPendingNodes().poll(), null); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java index a217c9978e8..bb9e9efb8c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java @@ -123,7 +123,7 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase @Override public void stopTrackingNode(DatanodeDescriptor dn) { - pendingNodes.remove(dn); + getPendingNodes().remove(dn); outOfServiceNodeBlocks.remove(dn); } @@ -164,19 +164,19 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase LOG.info("Checked {} blocks and {} nodes this tick. {} nodes are now " + "in maintenance or transitioning state. {} nodes pending.", numBlocksChecked, numNodesChecked, outOfServiceNodeBlocks.size(), - pendingNodes.size()); + getPendingNodes().size()); } } /** - * Pop datanodes off the pending list and into decomNodeBlocks, + * Pop datanodes off the pending priority queue and into decomNodeBlocks, * subject to the maxConcurrentTrackedNodes limit. */ private void processPendingNodes() { - while (!pendingNodes.isEmpty() && + while (!getPendingNodes().isEmpty() && (maxConcurrentTrackedNodes == 0 || outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) { - outOfServiceNodeBlocks.put(pendingNodes.poll(), null); + outOfServiceNodeBlocks.put(getPendingNodes().poll(), null); } } @@ -185,6 +185,7 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase it = new CyclicIteration<>(outOfServiceNodeBlocks, iterkey).iterator(); final List toRemove = new ArrayList<>(); + final List unhealthyDns = new ArrayList<>(); while (it.hasNext() && !exceededNumBlocksPerCheck() && namesystem .isRunning()) { @@ -221,6 +222,10 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase LOG.debug("Processing {} node {}", dn.getAdminState(), dn); pruneReliableBlocks(dn, blocks); } + final boolean isHealthy = blockManager.isNodeHealthyForDecommissionOrMaintenance(dn); + if (!isHealthy) { + unhealthyDns.add(dn); + } if (blocks.size() == 0) { if (!fullScan) { // If we didn't just do a full scan, need to re-check with the @@ -236,8 +241,6 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase } // If the full scan is clean AND the node liveness is okay, // we can finally mark as DECOMMISSIONED or IN_MAINTENANCE. - final boolean isHealthy = - blockManager.isNodeHealthyForDecommissionOrMaintenance(dn); if (blocks.size() == 0 && isHealthy) { if (dn.isDecommissionInProgress()) { dnAdmin.setDecommissioned(dn); @@ -270,12 +273,31 @@ public class DatanodeAdminDefaultMonitor extends DatanodeAdminMonitorBase // an invalid state. LOG.warn("DatanodeAdminMonitor caught exception when processing node " + "{}.", dn, e); - pendingNodes.add(dn); + getPendingNodes().add(dn); toRemove.add(dn); } finally { iterkey = dn; } } + + // Having more nodes decommissioning than can be tracked will impact decommissioning + // performance due to queueing delay + int numTrackedNodes = outOfServiceNodeBlocks.size() - toRemove.size(); + int numQueuedNodes = getPendingNodes().size(); + int numDecommissioningNodes = numTrackedNodes + numQueuedNodes; + if (numDecommissioningNodes > maxConcurrentTrackedNodes) { + LOG.warn( + "{} nodes are decommissioning but only {} nodes will be tracked at a time. " + + "{} nodes are currently queued waiting to be decommissioned.", + numDecommissioningNodes, maxConcurrentTrackedNodes, numQueuedNodes); + + // Re-queue unhealthy nodes to make space for decommissioning healthy nodes + getUnhealthyNodesToRequeue(unhealthyDns, numDecommissioningNodes).forEach(dn -> { + getPendingNodes().add(dn); + outOfServiceNodeBlocks.remove(dn); + }); + } + // Remove the datanodes that are DECOMMISSIONED or in service after // maintenance expiration. for (DatanodeDescriptor dn : toRemove) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminMonitorBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminMonitorBase.java index 9eee241eddd..f9761c2dfcb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminMonitorBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminMonitorBase.java @@ -24,8 +24,11 @@ import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayDeque; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; import java.util.Queue; +import java.util.stream.Stream; /** * This abstract class provides some base methods which are inherited by @@ -35,12 +38,20 @@ import java.util.Queue; public abstract class DatanodeAdminMonitorBase implements DatanodeAdminMonitorInterface, Configurable { + /** + * Sort by lastUpdate time descending order, such that unhealthy + * nodes are de-prioritized given they cannot be decommissioned. + */ + static final Comparator PENDING_NODES_QUEUE_COMPARATOR = + (dn1, dn2) -> Long.compare(dn2.getLastUpdate(), dn1.getLastUpdate()); + protected BlockManager blockManager; protected Namesystem namesystem; protected DatanodeAdminManager dnAdmin; protected Configuration conf; - protected final Queue pendingNodes = new ArrayDeque<>(); + private final PriorityQueue pendingNodes = new PriorityQueue<>( + PENDING_NODES_QUEUE_COMPARATOR); /** * The maximum number of nodes to track in outOfServiceNodeBlocks. @@ -151,4 +162,34 @@ public abstract class DatanodeAdminMonitorBase public Queue getPendingNodes() { return pendingNodes; } + + /** + * If node "is dead while in Decommission In Progress", it cannot be decommissioned + * until it becomes healthy again. If there are more pendingNodes than can be tracked + * & some unhealthy tracked nodes, then re-queue the unhealthy tracked nodes + * to avoid blocking decommissioning of healthy nodes. + * + * @param unhealthyDns The unhealthy datanodes which may be re-queued + * @param numDecommissioningNodes The total number of nodes being decommissioned + * @return Stream of unhealthy nodes to be re-queued + */ + Stream getUnhealthyNodesToRequeue( + final List unhealthyDns, int numDecommissioningNodes) { + if (!unhealthyDns.isEmpty()) { + // Compute the number of unhealthy nodes to re-queue + final int numUnhealthyNodesToRequeue = + Math.min(numDecommissioningNodes - maxConcurrentTrackedNodes, unhealthyDns.size()); + + LOG.warn("{} limit has been reached, re-queueing {} " + + "nodes which are dead while in Decommission In Progress.", + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES, + numUnhealthyNodesToRequeue); + + // Order unhealthy nodes by lastUpdate descending such that nodes + // which have been unhealthy the longest are preferred to be re-queued + return unhealthyDns.stream().sorted(PENDING_NODES_QUEUE_COMPARATOR.reversed()) + .limit(numUnhealthyNodesToRequeue); + } + return Stream.empty(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java index 10b18032e13..82a983004df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java @@ -449,7 +449,7 @@ public class AdminStatesBaseTest { refreshNodes(conf); } - static private DatanodeDescriptor getDatanodeDesriptor( + static DatanodeDescriptor getDatanodeDesriptor( final FSNamesystem ns, final String datanodeUuid) { return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java index f7e6dce0033..f3b81acf374 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -26,6 +27,8 @@ import static org.junit.Assert.fail; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintStream; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -40,6 +43,8 @@ import java.util.regex.Pattern; import java.util.EnumSet; import java.util.function.Supplier; +import java.util.stream.Collectors; + import org.apache.commons.text.TextStringBuilder; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -77,6 +82,7 @@ import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Lists; import org.apache.hadoop.util.ToolRunner; + import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -1299,7 +1305,7 @@ public class TestDecommission extends AdminStatesBaseTest { DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY, 3); // Disable the normal monitor runs - getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, + getConf().setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY, Integer.MAX_VALUE); startCluster(1, 3); final FileSystem fs = getCluster().getFileSystem(); @@ -1352,7 +1358,7 @@ public class TestDecommission extends AdminStatesBaseTest { DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES, 1); // Disable the normal monitor runs - getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, + getConf().setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY, Integer.MAX_VALUE); startCluster(1, 2); final DatanodeManager datanodeManager = @@ -1401,7 +1407,7 @@ public class TestDecommission extends AdminStatesBaseTest { DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES, 1); // Disable the normal monitor runs - getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, + getConf().setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY, Integer.MAX_VALUE); startCluster(1, 3); final FileSystem fs = getCluster().getFileSystem(); @@ -1654,4 +1660,229 @@ public class TestDecommission extends AdminStatesBaseTest { cleanupFile(fileSys, file); } + + /** + * Test DatanodeAdminManager logic to re-queue unhealthy decommissioning nodes + * which are blocking the decommissioning of healthy nodes. + * Force the tracked nodes set to be filled with nodes lost while decommissioning, + * then decommission healthy nodes & validate they are decommissioned eventually. + */ + @Test(timeout = 120000) + public void testRequeueUnhealthyDecommissioningNodes() throws Exception { + // Create a MiniDFSCluster with 3 live datanode in AdminState=NORMAL and + // 2 dead datanodes in AdminState=DECOMMISSION_INPROGRESS and a file + // with replication factor of 5. + final int numLiveNodes = 3; + final int numDeadNodes = 2; + final int numNodes = numLiveNodes + numDeadNodes; + final List liveNodes = new ArrayList<>(); + final Map deadNodeProps = + new HashMap<>(); + final ArrayList decommissionedNodes = new ArrayList<>(); + final Path filePath = new Path("/tmp/test"); + createClusterWithDeadNodesDecommissionInProgress(numLiveNodes, liveNodes, numDeadNodes, + deadNodeProps, decommissionedNodes, filePath); + final FSNamesystem namesystem = getCluster().getNamesystem(); + final BlockManager blockManager = namesystem.getBlockManager(); + final DatanodeManager datanodeManager = blockManager.getDatanodeManager(); + final DatanodeAdminManager decomManager = datanodeManager.getDatanodeAdminManager(); + + // Validate the 2 "dead" nodes are not removed from the tracked nodes set + // after several seconds of operation + final Duration checkDuration = Duration.ofSeconds(5); + Instant checkUntil = Instant.now().plus(checkDuration); + while (Instant.now().isBefore(checkUntil)) { + BlockManagerTestUtil.recheckDecommissionState(datanodeManager); + assertEquals( + "Unexpected number of decommissioning nodes queued in DatanodeAdminManager.", + 0, decomManager.getNumPendingNodes()); + assertEquals( + "Unexpected number of decommissioning nodes tracked in DatanodeAdminManager.", + numDeadNodes, decomManager.getNumTrackedNodes()); + assertTrue( + "Dead decommissioning nodes unexpectedly transitioned out of DECOMMISSION_INPROGRESS.", + deadNodeProps.keySet().stream() + .allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS))); + Thread.sleep(500); + } + + // Delete the file such that its no longer a factor blocking decommissioning of live nodes + // which have block replicas for that file + getCluster().getFileSystem().delete(filePath, true); + + // Start decommissioning 2 "live" datanodes + int numLiveDecommNodes = 2; + final List liveDecommNodes = liveNodes.subList(0, numLiveDecommNodes); + for (final DatanodeDescriptor liveNode : liveDecommNodes) { + takeNodeOutofService(0, liveNode.getDatanodeUuid(), 0, decommissionedNodes, + AdminStates.DECOMMISSION_INPROGRESS); + decommissionedNodes.add(liveNode); + } + + // Write a new file such that there are under-replicated blocks preventing decommissioning + // of dead nodes + writeFile(getCluster().getFileSystem(), filePath, numNodes, 10); + + // Validate that the live datanodes are put into the pending decommissioning queue + GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == numDeadNodes + && decomManager.getNumPendingNodes() == numLiveDecommNodes + && liveDecommNodes.stream().allMatch( + node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS)), + 500, 30000); + assertThat(liveDecommNodes) + .as("Check all live decommissioning nodes queued in DatanodeAdminManager") + .containsAll(decomManager.getPendingNodes()); + + // Run DatanodeAdminManager.Monitor, then validate the dead nodes are re-queued & the + // live nodes are decommissioned + if (this instanceof TestDecommissionWithBackoffMonitor) { + // For TestDecommissionWithBackoffMonitor a single tick/execution of the + // DatanodeAdminBackoffMonitor will re-queue the dead nodes, then call + // "processPendingNodes" to de-queue the live nodes & decommission them + BlockManagerTestUtil.recheckDecommissionState(datanodeManager); + assertEquals( + "DatanodeAdminBackoffMonitor did not re-queue dead decommissioning nodes as expected.", + 2, decomManager.getNumPendingNodes()); + assertEquals( + "DatanodeAdminBackoffMonitor did not re-queue dead decommissioning nodes as expected.", + 0, decomManager.getNumTrackedNodes()); + } else { + // For TestDecommission a single tick/execution of the DatanodeAdminDefaultMonitor + // will re-queue the dead nodes. A seconds tick is needed to de-queue the live nodes + // & decommission them + BlockManagerTestUtil.recheckDecommissionState(datanodeManager); + assertEquals( + "DatanodeAdminDefaultMonitor did not re-queue dead decommissioning nodes as expected.", + 4, decomManager.getNumPendingNodes()); + assertEquals( + "DatanodeAdminDefaultMonitor did not re-queue dead decommissioning nodes as expected.", + 0, decomManager.getNumTrackedNodes()); + BlockManagerTestUtil.recheckDecommissionState(datanodeManager); + assertEquals( + "DatanodeAdminDefaultMonitor did not decommission live nodes as expected.", + 2, decomManager.getNumPendingNodes()); + assertEquals( + "DatanodeAdminDefaultMonitor did not decommission live nodes as expected.", + 0, decomManager.getNumTrackedNodes()); + } + assertTrue("Live nodes not DECOMMISSIONED as expected.", liveDecommNodes.stream() + .allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSIONED))); + assertTrue("Dead nodes not DECOMMISSION_INPROGRESS as expected.", + deadNodeProps.keySet().stream() + .allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS))); + assertThat(deadNodeProps.keySet()) + .as("Check all dead decommissioning nodes queued in DatanodeAdminManager") + .containsAll(decomManager.getPendingNodes()); + + // Validate the 2 "dead" nodes are not removed from the tracked nodes set + // after several seconds of operation + checkUntil = Instant.now().plus(checkDuration); + while (Instant.now().isBefore(checkUntil)) { + BlockManagerTestUtil.recheckDecommissionState(datanodeManager); + assertEquals( + "Unexpected number of decommissioning nodes queued in DatanodeAdminManager.", + 0, decomManager.getNumPendingNodes()); + assertEquals( + "Unexpected number of decommissioning nodes tracked in DatanodeAdminManager.", + numDeadNodes, decomManager.getNumTrackedNodes()); + assertTrue( + "Dead decommissioning nodes unexpectedly transitioned out of DECOMMISSION_INPROGRESS.", + deadNodeProps.keySet().stream() + .allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS))); + Thread.sleep(500); + } + + // Delete the file such that there are no more under-replicated blocks + // allowing the dead nodes to be decommissioned + getCluster().getFileSystem().delete(filePath, true); + + // Validate the dead nodes are eventually decommissioned + GenericTestUtils.waitFor(() -> { + try { + BlockManagerTestUtil.recheckDecommissionState(datanodeManager); + } catch (ExecutionException | InterruptedException e) { + LOG.warn("Exception running DatanodeAdminMonitor", e); + return false; + } + return decomManager.getNumTrackedNodes() == 0 && decomManager.getNumPendingNodes() == 0 + && deadNodeProps.keySet().stream().allMatch( + node -> node.getAdminState().equals(AdminStates.DECOMMISSIONED)); + }, 500, 30000); + } + + /** + * Create a MiniDFSCluster with "numLiveNodes" live datanodes in AdminState=NORMAL and + * "numDeadNodes" dead datanodes in AdminState=DECOMMISSION_INPROGRESS. Create a file + * replicated to all datanodes. + * + * @param numLiveNodes - number of live nodes in cluster + * @param liveNodes - list which will be loaded with references to 3 live datanodes + * @param numDeadNodes - number of live nodes in cluster + * @param deadNodeProps - map which will be loaded with references to 2 dead datanodes + * @param decommissionedNodes - list which will be loaded with references to decommissioning nodes + * @param filePath - path used to create HDFS file + */ + private void createClusterWithDeadNodesDecommissionInProgress(final int numLiveNodes, + final List liveNodes, final int numDeadNodes, + final Map deadNodeProps, + final ArrayList decommissionedNodes, final Path filePath) throws Exception { + assertTrue("Must have numLiveNode > 0", numLiveNodes > 0); + assertTrue("Must have numDeadNode > 0", numDeadNodes > 0); + int numNodes = numLiveNodes + numDeadNodes; + + // Allow "numDeadNodes" datanodes to be decommissioned at a time + getConf() + .setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES, numDeadNodes); + // Disable the normal monitor runs + getConf() + .setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY, Integer.MAX_VALUE); + + // Start cluster with "numNodes" datanodes + startCluster(1, numNodes); + final FSNamesystem namesystem = getCluster().getNamesystem(); + final BlockManager blockManager = namesystem.getBlockManager(); + final DatanodeManager datanodeManager = blockManager.getDatanodeManager(); + final DatanodeAdminManager decomManager = datanodeManager.getDatanodeAdminManager(); + assertEquals(numNodes, getCluster().getDataNodes().size()); + getCluster().waitActive(); + + // "numLiveNodes" datanodes will remain "live" + for (final DataNode node : getCluster().getDataNodes().subList(0, numLiveNodes)) { + liveNodes.add(getDatanodeDesriptor(namesystem, node.getDatanodeUuid())); + } + assertEquals(numLiveNodes, liveNodes.size()); + + // "numDeadNodes" datanodes will be "dead" while decommissioning + final List deadNodes = + getCluster().getDataNodes().subList(numLiveNodes, numNodes).stream() + .map(dn -> getDatanodeDesriptor(namesystem, dn.getDatanodeUuid())) + .collect(Collectors.toList()); + assertEquals(numDeadNodes, deadNodes.size()); + + // Create file with block replicas on all nodes + writeFile(getCluster().getFileSystem(), filePath, numNodes, 10); + + // Cause the "dead" nodes to be lost while in state decommissioning + // and fill the tracked nodes set with those "dead" nodes + for (final DatanodeDescriptor deadNode : deadNodes) { + // Start decommissioning the node, it will not be able to complete due to the + // under-replicated file + takeNodeOutofService(0, deadNode.getDatanodeUuid(), 0, decommissionedNodes, + AdminStates.DECOMMISSION_INPROGRESS); + decommissionedNodes.add(deadNode); + + // Stop the datanode so that it is lost while decommissioning + MiniDFSCluster.DataNodeProperties dn = getCluster().stopDataNode(deadNode.getXferAddr()); + deadNodeProps.put(deadNode, dn); + deadNode.setLastUpdate(213); // Set last heartbeat to be in the past + } + assertEquals(numDeadNodes, deadNodeProps.size()); + + // Wait for the decommissioning nodes to become dead & to be added to "pendingNodes" + GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == 0 + && decomManager.getNumPendingNodes() == numDeadNodes + && deadNodes.stream().allMatch(node -> + !BlockManagerTestUtil.isNodeHealthyForDecommissionOrMaintenance(blockManager, node) + && !node.isAlive()), 500, 20000); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 97c65556e7d..790d0933465 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -367,6 +367,16 @@ public class BlockManagerTestUtil { dm.getDatanodeAdminManager().runMonitorForTest(); } + /** + * Have BlockManager check isNodeHealthyForDecommissionOrMaintenance for a given datanode. + * @param blockManager the BlockManager to check against + * @param dn the datanode to check + */ + public static boolean isNodeHealthyForDecommissionOrMaintenance(BlockManager blockManager, + DatanodeDescriptor dn) { + return blockManager.isNodeHealthyForDecommissionOrMaintenance(dn); + } + /** * add block to the replicateBlocks queue of the Datanode */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeAdminMonitorBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeAdminMonitorBase.java new file mode 100644 index 00000000000..451c63be131 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeAdminMonitorBase.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.PriorityQueue; +import java.util.stream.Collectors; + +import org.apache.hadoop.hdfs.protocol.DatanodeID; + +public class TestDatanodeAdminMonitorBase { + public static final Logger LOG = LoggerFactory.getLogger(TestDatanodeAdminMonitorBase.class); + + // Sort by lastUpdate time descending order, such that unhealthy + // nodes are de-prioritized given they cannot be decommissioned. + private static final int NUM_DATANODE = 10; + private static final int[] UNORDERED_LAST_UPDATE_TIMES = + new int[] {0, 5, 2, 11, 0, 3, 1001, 5, 1, 103}; + private static final int[] ORDERED_LAST_UPDATE_TIMES = + new int[] {1001, 103, 11, 5, 5, 3, 2, 1, 0, 0}; + private static final int[] REVERSE_ORDER_LAST_UPDATE_TIMES = + new int[] {0, 0, 1, 2, 3, 5, 5, 11, 103, 1001}; + + private static final DatanodeDescriptor[] NODES; + + static { + NODES = new DatanodeDescriptor[NUM_DATANODE]; + for (int i = 0; i < NUM_DATANODE; i++) { + NODES[i] = new DatanodeDescriptor(DatanodeID.EMPTY_DATANODE_ID); + NODES[i].setLastUpdate(UNORDERED_LAST_UPDATE_TIMES[i]); + NODES[i].setLastUpdateMonotonic(UNORDERED_LAST_UPDATE_TIMES[i]); + } + } + + /** + * Verify that DatanodeAdminManager pendingNodes priority queue + * correctly orders the nodes by lastUpdate time descending. + */ + @Test + public void testPendingNodesQueueOrdering() { + final PriorityQueue pendingNodes = + new PriorityQueue<>(DatanodeAdminMonitorBase.PENDING_NODES_QUEUE_COMPARATOR); + + pendingNodes.addAll(Arrays.asList(NODES)); + + for (int i = 0; i < NUM_DATANODE; i++) { + final DatanodeDescriptor dn = pendingNodes.poll(); + Assert.assertNotNull(dn); + Assert.assertEquals(ORDERED_LAST_UPDATE_TIMES[i], dn.getLastUpdate()); + } + } + + /** + * Verify that DatanodeAdminManager logic to sort unhealthy nodes + * correctly orders the nodes by lastUpdate time ascending. + */ + @Test + public void testPendingNodesQueueReverseOrdering() { + final List nodes = Arrays.asList(NODES); + final List reverseOrderNodes = + nodes.stream().sorted(DatanodeAdminMonitorBase.PENDING_NODES_QUEUE_COMPARATOR.reversed()) + .collect(Collectors.toList()); + + Assert.assertEquals(NUM_DATANODE, reverseOrderNodes.size()); + for (int i = 0; i < NUM_DATANODE; i++) { + Assert.assertEquals(REVERSE_ORDER_LAST_UPDATE_TIMES[i], + reverseOrderNodes.get(i).getLastUpdate()); + } + } +}