diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java index cf59c55675f..03f8b359e7f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java @@ -418,6 +418,28 @@ public abstract class GenericTestUtils { public static void waitFor(final Supplier check, final long checkEveryMillis, final long waitForMillis) throws TimeoutException, InterruptedException { + waitFor(check, checkEveryMillis, waitForMillis, null); + } + + /** + * Wait for the specified test to return true. The test will be performed + * initially and then every {@code checkEveryMillis} until at least + * {@code waitForMillis} time has expired. If {@code check} is null or + * {@code waitForMillis} is less than {@code checkEveryMillis} this method + * will throw an {@link IllegalArgumentException}. + * + * @param check the test to perform. + * @param checkEveryMillis how often to perform the test. + * @param waitForMillis the amount of time after which no more tests will be + * performed. + * @param errorMsg error message to provide in TimeoutException. + * @throws TimeoutException if the test does not return true in the allotted + * time. + * @throws InterruptedException if the method is interrupted while waiting. + */ + public static void waitFor(final Supplier check, + final long checkEveryMillis, final long waitForMillis, + final String errorMsg) throws TimeoutException, InterruptedException { Objects.requireNonNull(check, ERROR_MISSING_ARGUMENT); if (waitForMillis < checkEveryMillis) { throw new IllegalArgumentException(ERROR_INVALID_ARGUMENT); @@ -432,9 +454,12 @@ public abstract class GenericTestUtils { } if (!result) { - throw new TimeoutException("Timed out waiting for condition. " + - "Thread diagnostics:\n" + - TimedOutTestsListener.buildThreadDiagnosticString()); + final String exceptionErrorMsg = "Timed out waiting for condition. " + + (org.apache.commons.lang3.StringUtils.isNotEmpty(errorMsg) + ? "Error Message: " + errorMsg : "") + + "\nThread diagnostics:\n" + + TimedOutTestsListener.buildThreadDiagnosticString(); + throw new TimeoutException(exceptionErrorMsg); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java index 800f273edb5..e8bd8377a3c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java @@ -27,7 +27,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -57,11 +60,12 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.util.HostsFileWriter; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; /** * This class tests the decommissioning of nodes. @@ -75,7 +79,8 @@ public class TestDecommissioningStatus { private FileSystem fileSys; private HostsFileWriter hostsFileWriter; private Configuration conf; - private Logger LOG; + private static final Logger LOG = + LoggerFactory.getLogger(TestDecommissioningStatus.class); final ArrayList decommissionedNodes = new ArrayList(numDatanodes); @@ -107,8 +112,8 @@ public class TestDecommissioningStatus { conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1); conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1); - Logger.getLogger(DatanodeAdminManager.class).setLevel(Level.DEBUG); - LOG = Logger.getLogger(TestDecommissioningStatus.class); + GenericTestUtils.setLogLevel( + LoggerFactory.getLogger(DatanodeAdminManager.class), Level.DEBUG); return conf; } @@ -163,17 +168,30 @@ public class TestDecommissioningStatus { protected void checkDecommissionStatus(DatanodeDescriptor decommNode, int expectedUnderRep, int expectedDecommissionOnly, - int expectedUnderRepInOpenFiles) { - assertEquals("Unexpected num under-replicated blocks", - expectedUnderRep, - decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks()); - assertEquals("Unexpected number of decom-only replicas", - expectedDecommissionOnly, - decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas()); - assertEquals( - "Unexpected number of replicas in under-replicated open files", - expectedUnderRepInOpenFiles, - decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles()); + int expectedUnderRepInOpenFiles) throws TimeoutException, + InterruptedException { + String errorMsg; + errorMsg = "Under replicated blocks. Expected: " + + expectedUnderRep + " , Actual: " + + decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks(); + GenericTestUtils.waitFor( + () -> expectedUnderRep == decommNode.getLeavingServiceStatus() + .getUnderReplicatedBlocks(), + 1000, TimeUnit.SECONDS.toMillis(10), errorMsg); + errorMsg = "OutOfService only replicas. Expected: " + + expectedDecommissionOnly + " , Actual: " + + decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas(); + GenericTestUtils.waitFor( + () -> expectedDecommissionOnly == decommNode.getLeavingServiceStatus() + .getOutOfServiceOnlyReplicas(), + 1000, TimeUnit.SECONDS.toMillis(10), errorMsg); + errorMsg = "UnderReplicated in open files. Expected: " + + expectedUnderRepInOpenFiles + " , Actual: " + + decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles(); + GenericTestUtils.waitFor( + () -> expectedUnderRepInOpenFiles == decommNode + .getLeavingServiceStatus().getUnderReplicatedInOpenFiles(), + 1000, TimeUnit.SECONDS.toMillis(10), errorMsg); } protected void checkDFSAdminDecommissionStatus( @@ -268,6 +286,7 @@ public class TestDecommissioningStatus { FSNamesystem fsn = cluster.getNamesystem(); final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager(); + verifyInitialState(fsn, dm); for (int iteration = 0; iteration < numDatanodes; iteration++) { String downnode = decommissionNode(client, iteration); dm.refreshNodes(conf); @@ -276,14 +295,13 @@ public class TestDecommissioningStatus { // Block until the admin's monitor updates the number of tracked nodes. waitForDecommissionedNodes(dm.getDatanodeAdminManager(), iteration + 1); final List decommissioningNodes = dm.getDecommissioningNodes(); + assertEquals(decommissioningNodes.size(), iteration + 1); if (iteration == 0) { - assertEquals(decommissioningNodes.size(), 1); DatanodeDescriptor decommNode = decommissioningNodes.get(0); checkDecommissionStatus(decommNode, 3, 0, 1); checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 1), fileSys, admin); } else { - assertEquals(decommissioningNodes.size(), 2); DatanodeDescriptor decommNode1 = decommissioningNodes.get(0); DatanodeDescriptor decommNode2 = decommissioningNodes.get(1); // This one is still 3,3,1 since it passed over the UC block @@ -305,6 +323,69 @@ public class TestDecommissioningStatus { AdminStatesBaseTest.cleanupFile(fileSys, file2); } + // Why do we verify initial state of DataNodes here? + // Before we start actual decommission testing, we should ensure that + // total 8 blocks (original 4 blocks of 2 files and 4 replicas) are + // present over two Datanodes available. If we don't wait until all 8 blocks + // are reported live by BlockManager, we might get to a situation + // where one of the replicas might not yet been present on any of Datanodes + // and we start decommissioning process, and then it would result in + // flaky test because total (no of under replicated blocks, no of outOfService + // only replicas, no of under replicated in open files) counts would be + // incorrect. + protected void verifyInitialState(FSNamesystem fsn, DatanodeManager dm) + throws InterruptedException { + dm.getDatanodes().forEach(datanodeDescriptor -> { + try { + checkDecommissionStatus(datanodeDescriptor, 0, 0, 0); + } catch (TimeoutException | InterruptedException e) { + throw new AssertionError("Datanode not in good state.", e); + } + }); + int c = 0; + int totalBlocks; + long totalReplicatedBlocks; + while (true) { + totalBlocks = fsn.getBlockManager().getTotalBlocks(); + totalReplicatedBlocks = fsn.getBlockManager().getTotalReplicatedBlocks(); + if (totalBlocks == 4 && totalReplicatedBlocks == 4) { + break; + } else { + if (c == 4) { + throw new AssertionError("Unexpected Total blocks " + totalBlocks + + " and replicated blocks " + totalReplicatedBlocks); + } + Thread.sleep(3000); + } + c++; + } + c = 0; + AtomicInteger total = new AtomicInteger(0); + AtomicInteger sufficientBlocksSuccess = new AtomicInteger(0); + while (true) { + total.set(0); + sufficientBlocksSuccess.set(0); + dm.getDatanodes().forEach( + datanodeDescriptor -> { + total.addAndGet(datanodeDescriptor.numBlocks()); + if (datanodeDescriptor.numBlocks() == 4) { + sufficientBlocksSuccess.incrementAndGet(); + } + }); + if (total.get() == 8 && sufficientBlocksSuccess.get() == 2) { + break; + } else { + if (c == 4) { + throw new AssertionError("Unexpected Total blocks " + total.get() + + " from Datanode Storage. 4 blocks per Datanode Storage" + + " expected from each DataNode"); + } + Thread.sleep(3000); + } + c++; + } + } + /** * Verify a DN remains in DECOMMISSION_INPROGRESS state if it is marked * as dead before decommission has completed. That will allow DN to resume @@ -388,8 +469,8 @@ public class TestDecommissioningStatus { */ @Test(timeout=120000) public void testDecommissionDeadDN() throws Exception { - Logger log = Logger.getLogger(DatanodeAdminManager.class); - log.setLevel(Level.DEBUG); + Logger log = LoggerFactory.getLogger(DatanodeAdminManager.class); + GenericTestUtils.setLogLevel(log, Level.DEBUG); DatanodeID dnID = cluster.getDataNodes().get(0).getDatanodeId(); String dnName = dnID.getXferAddr(); DataNodeProperties stoppedDN = cluster.stopDataNode(0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatusWithBackoffMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatusWithBackoffMonitor.java index eb748dae9ea..a68a530b813 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatusWithBackoffMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatusWithBackoffMonitor.java @@ -106,6 +106,7 @@ public class TestDecommissioningStatusWithBackoffMonitor FSNamesystem fsn = cluster.getNamesystem(); final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager(); + verifyInitialState(fsn, dm); for (int iteration = 0; iteration < numDatanodes; iteration++) { String downnode = decommissionNode(client, iteration); dm.refreshNodes(conf);