From 03c908fb046b7b426a4abc54d9c03037b0d2daf0 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Thu, 25 Nov 2021 18:48:01 +0530 Subject: [PATCH] HDFS-16171. De-flake testDecommissionStatus (#3280) (#3720) Signed-off-by: Akira Ajisaka --- .../apache/hadoop/test/GenericTestUtils.java | 31 ++++- .../namenode/TestDecommissioningStatus.java | 121 +++++++++++++++--- 2 files changed, 129 insertions(+), 23 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java index e6ca6ab78f1..d5e202e7abd 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java @@ -398,6 +398,28 @@ public abstract class GenericTestUtils { public static void waitFor(final Supplier check, final long checkEveryMillis, final long waitForMillis) throws TimeoutException, InterruptedException { + waitFor(check, checkEveryMillis, waitForMillis, null); + } + + /** + * Wait for the specified test to return true. The test will be performed + * initially and then every {@code checkEveryMillis} until at least + * {@code waitForMillis} time has expired. If {@code check} is null or + * {@code waitForMillis} is less than {@code checkEveryMillis} this method + * will throw an {@link IllegalArgumentException}. + * + * @param check the test to perform. + * @param checkEveryMillis how often to perform the test. + * @param waitForMillis the amount of time after which no more tests will be + * performed. + * @param errorMsg error message to provide in TimeoutException. + * @throws TimeoutException if the test does not return true in the allotted + * time. + * @throws InterruptedException if the method is interrupted while waiting. + */ + public static void waitFor(final Supplier check, + final long checkEveryMillis, final long waitForMillis, + final String errorMsg) throws TimeoutException, InterruptedException { Objects.requireNonNull(check, ERROR_MISSING_ARGUMENT); if (waitForMillis < checkEveryMillis) { throw new IllegalArgumentException(ERROR_INVALID_ARGUMENT); @@ -412,9 +434,12 @@ public abstract class GenericTestUtils { } if (!result) { - throw new TimeoutException("Timed out waiting for condition. " + - "Thread diagnostics:\n" + - TimedOutTestsListener.buildThreadDiagnosticString()); + final String exceptionErrorMsg = "Timed out waiting for condition. " + + (org.apache.commons.lang3.StringUtils.isNotEmpty(errorMsg) + ? "Error Message: " + errorMsg : "") + + "\nThread diagnostics:\n" + + TimedOutTestsListener.buildThreadDiagnosticString(); + throw new TimeoutException(exceptionErrorMsg); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java index d9d1f4777ea..0a0c957883c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java @@ -27,7 +27,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -57,11 +60,12 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.util.HostsFileWriter; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; /** * This class tests the decommissioning of nodes. @@ -75,7 +79,8 @@ public class TestDecommissioningStatus { private static FileSystem fileSys; private static HostsFileWriter hostsFileWriter; private static Configuration conf; - private Logger LOG; + private static final Logger LOG = + LoggerFactory.getLogger(TestDecommissioningStatus.class); final ArrayList decommissionedNodes = new ArrayList(numDatanodes); @@ -102,8 +107,8 @@ public class TestDecommissioningStatus { fileSys = cluster.getFileSystem(); cluster.getNamesystem().getBlockManager().getDatanodeManager() .setHeartbeatExpireInterval(3000); - Logger.getLogger(DatanodeAdminManager.class).setLevel(Level.DEBUG); - LOG = Logger.getLogger(TestDecommissioningStatus.class); + GenericTestUtils.setLogLevel( + LoggerFactory.getLogger(DatanodeAdminManager.class), Level.DEBUG); } @After @@ -142,17 +147,30 @@ public class TestDecommissioningStatus { private void checkDecommissionStatus(DatanodeDescriptor decommNode, int expectedUnderRep, int expectedDecommissionOnly, - int expectedUnderRepInOpenFiles) { - assertEquals("Unexpected num under-replicated blocks", - expectedUnderRep, - decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks()); - assertEquals("Unexpected number of decom-only replicas", - expectedDecommissionOnly, - decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas()); - assertEquals( - "Unexpected number of replicas in under-replicated open files", - expectedUnderRepInOpenFiles, - decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles()); + int expectedUnderRepInOpenFiles) throws TimeoutException, + InterruptedException { + String errorMsg; + errorMsg = "Under replicated blocks. Expected: " + + expectedUnderRep + " , Actual: " + + decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks(); + GenericTestUtils.waitFor( + () -> expectedUnderRep == decommNode.getLeavingServiceStatus() + .getUnderReplicatedBlocks(), + 1000, TimeUnit.SECONDS.toMillis(10), errorMsg); + errorMsg = "OutOfService only replicas. Expected: " + + expectedDecommissionOnly + " , Actual: " + + decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas(); + GenericTestUtils.waitFor( + () -> expectedDecommissionOnly == decommNode.getLeavingServiceStatus() + .getOutOfServiceOnlyReplicas(), + 1000, TimeUnit.SECONDS.toMillis(10), errorMsg); + errorMsg = "UnderReplicated in open files. Expected: " + + expectedUnderRepInOpenFiles + " , Actual: " + + decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles(); + GenericTestUtils.waitFor( + () -> expectedUnderRepInOpenFiles == decommNode + .getLeavingServiceStatus().getUnderReplicatedInOpenFiles(), + 1000, TimeUnit.SECONDS.toMillis(10), errorMsg); } private void checkDFSAdminDecommissionStatus( @@ -247,6 +265,7 @@ public class TestDecommissioningStatus { FSNamesystem fsn = cluster.getNamesystem(); final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager(); + verifyInitialState(fsn, dm); for (int iteration = 0; iteration < numDatanodes; iteration++) { String downnode = decommissionNode(client, iteration); dm.refreshNodes(conf); @@ -255,14 +274,13 @@ public class TestDecommissioningStatus { // Block until the admin's monitor updates the number of tracked nodes. waitForDecommissionedNodes(dm.getDatanodeAdminManager(), iteration + 1); final List decommissioningNodes = dm.getDecommissioningNodes(); + assertEquals(decommissioningNodes.size(), iteration + 1); if (iteration == 0) { - assertEquals(decommissioningNodes.size(), 1); DatanodeDescriptor decommNode = decommissioningNodes.get(0); checkDecommissionStatus(decommNode, 3, 0, 1); checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 1), fileSys, admin); } else { - assertEquals(decommissioningNodes.size(), 2); DatanodeDescriptor decommNode1 = decommissioningNodes.get(0); DatanodeDescriptor decommNode2 = decommissioningNodes.get(1); // This one is still 3,3,1 since it passed over the UC block @@ -284,6 +302,69 @@ public class TestDecommissioningStatus { AdminStatesBaseTest.cleanupFile(fileSys, file2); } + // Why do we verify initial state of DataNodes here? + // Before we start actual decommission testing, we should ensure that + // total 8 blocks (original 4 blocks of 2 files and 4 replicas) are + // present over two Datanodes available. If we don't wait until all 8 blocks + // are reported live by BlockManager, we might get to a situation + // where one of the replicas might not yet been present on any of Datanodes + // and we start decommissioning process, and then it would result in + // flaky test because total (no of under replicated blocks, no of outOfService + // only replicas, no of under replicated in open files) counts would be + // incorrect. + protected void verifyInitialState(FSNamesystem fsn, DatanodeManager dm) + throws InterruptedException { + dm.getDatanodes().forEach(datanodeDescriptor -> { + try { + checkDecommissionStatus(datanodeDescriptor, 0, 0, 0); + } catch (TimeoutException | InterruptedException e) { + throw new AssertionError("Datanode not in good state.", e); + } + }); + int c = 0; + int totalBlocks; + long totalReplicatedBlocks; + while (true) { + totalBlocks = fsn.getBlockManager().getTotalBlocks(); + totalReplicatedBlocks = fsn.getBlockManager().getTotalReplicatedBlocks(); + if (totalBlocks == 4 && totalReplicatedBlocks == 4) { + break; + } else { + if (c == 4) { + throw new AssertionError("Unexpected Total blocks " + totalBlocks + + " and replicated blocks " + totalReplicatedBlocks); + } + Thread.sleep(3000); + } + c++; + } + c = 0; + AtomicInteger total = new AtomicInteger(0); + AtomicInteger sufficientBlocksSuccess = new AtomicInteger(0); + while (true) { + total.set(0); + sufficientBlocksSuccess.set(0); + dm.getDatanodes().forEach( + datanodeDescriptor -> { + total.addAndGet(datanodeDescriptor.numBlocks()); + if (datanodeDescriptor.numBlocks() == 4) { + sufficientBlocksSuccess.incrementAndGet(); + } + }); + if (total.get() == 8 && sufficientBlocksSuccess.get() == 2) { + break; + } else { + if (c == 4) { + throw new AssertionError("Unexpected Total blocks " + total.get() + + " from Datanode Storage. 4 blocks per Datanode Storage" + + " expected from each DataNode"); + } + Thread.sleep(3000); + } + c++; + } + } + /** * Verify a DN remains in DECOMMISSION_INPROGRESS state if it is marked * as dead before decommission has completed. That will allow DN to resume @@ -367,8 +448,8 @@ public class TestDecommissioningStatus { */ @Test(timeout=120000) public void testDecommissionDeadDN() throws Exception { - Logger log = Logger.getLogger(DatanodeAdminManager.class); - log.setLevel(Level.DEBUG); + Logger log = LoggerFactory.getLogger(DatanodeAdminManager.class); + GenericTestUtils.setLogLevel(log, Level.DEBUG); DatanodeID dnID = cluster.getDataNodes().get(0).getDatanodeId(); String dnName = dnID.getXferAddr(); DataNodeProperties stoppedDN = cluster.stopDataNode(0);