From 8af247df858655f144cc1bf7bacd467808864e9b Mon Sep 17 00:00:00 2001 From: Jim Brennan Date: Tue, 8 Dec 2020 20:32:31 +0000 Subject: [PATCH] HDFS-15716. WaitforReplication in TestUpgradeDomainBlockPlacementPolicy (#2528) Contributed by Ahmed Hussein --- ...TestUpgradeDomainBlockPlacementPolicy.java | 73 ++++++++++--------- 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java index 3383c4ee47b..0c3d267debf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java @@ -17,16 +17,12 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.util.HashSet; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -41,11 +37,11 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpg import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager; import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.util.HostsFileWriter; import org.apache.hadoop.net.StaticMapping; import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -63,6 +59,8 @@ public class TestUpgradeDomainBlockPlacementPolicy { private static final short REPLICATION_FACTOR = (short) 3; private static final int DEFAULT_BLOCK_SIZE = 1024; + private static final int WAIT_TIMEOUT_MS = 60000; + private static final long FILE_SIZE = DEFAULT_BLOCK_SIZE * 5; static final String[] racks = { "/RACK1", "/RACK1", "/RACK1", "/RACK2", "/RACK2", "/RACK2" }; static final String[] hosts = @@ -71,9 +69,6 @@ public class TestUpgradeDomainBlockPlacementPolicy { {"ud5", "ud2", "ud3", "ud1", "ud2", "ud4"}; static final Set expectedDatanodeIDs = new HashSet<>(); private MiniDFSCluster cluster = null; - private NamenodeProtocols nameNodeRpc = null; - private FSNamesystem namesystem = null; - private PermissionStatus perm = null; private HostsFileWriter hostsFileWriter = new HostsFileWriter(); @Before @@ -92,10 +87,6 @@ public class TestUpgradeDomainBlockPlacementPolicy { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).racks(racks) .hosts(hosts).build(); cluster.waitActive(); - nameNodeRpc = cluster.getNameNodeRpc(); - namesystem = cluster.getNamesystem(); - perm = new PermissionStatus("TestDefaultBlockPlacementPolicy", null, - FsPermission.getDefault()); refreshDatanodeAdminProperties(); } @@ -186,43 +177,51 @@ public class TestUpgradeDomainBlockPlacementPolicy { expectedDatanodeIDs.add(cluster.getDataNodes().get(5).getDatanodeId()); } + private void createFileAndWaitForReplication(final Path path, + final long fileLen) + throws Exception { + DFSTestUtil.createFile(cluster.getFileSystem(), path, fileLen, + REPLICATION_FACTOR, 1000L); + DFSTestUtil.waitForReplication(cluster.getFileSystem(), path, + REPLICATION_FACTOR, WAIT_TIMEOUT_MS); + } + @Test public void testPlacement() throws Exception { - final long fileSize = DEFAULT_BLOCK_SIZE * 5; - final String testFile = new String("/testfile"); + final long fileSize = FILE_SIZE; + final String testFile = "/testfile"; final Path path = new Path(testFile); - DFSTestUtil.createFile(cluster.getFileSystem(), path, fileSize, - REPLICATION_FACTOR, 1000L); + createFileAndWaitForReplication(path, FILE_SIZE); LocatedBlocks locatedBlocks = cluster.getFileSystem().getClient().getLocatedBlocks( path.toString(), 0, fileSize); for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) { Set locs = new HashSet<>(); for(DatanodeInfo datanodeInfo : block.getLocations()) { - if (datanodeInfo.getAdminState() == DatanodeInfo.AdminStates.NORMAL) { + if (datanodeInfo.getAdminState() + .equals(DatanodeInfo.AdminStates.NORMAL)) { locs.add(datanodeInfo); } } for (DatanodeID datanodeID : expectedDatanodeIDs) { - assertTrue(locs.contains(datanodeID)); + Assert.assertTrue(locs.contains(datanodeID)); } } } @Test(timeout = 300000) public void testPlacementAfterDecommission() throws Exception { - final long fileSize = DEFAULT_BLOCK_SIZE * 5; - final String testFile = new String("/testfile"); + final long fileSize = FILE_SIZE; + final String testFile = "/testfile-afterdecomm"; final Path path = new Path(testFile); - DFSTestUtil.createFile(cluster.getFileSystem(), path, fileSize, - REPLICATION_FACTOR, 1000L); + createFileAndWaitForReplication(path, fileSize); // Decommission some nodes and wait until decommissions have finished. refreshDatanodeAdminProperties2(); + GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { - boolean successful = true; LocatedBlocks locatedBlocks; try { locatedBlocks = @@ -231,32 +230,34 @@ public class TestUpgradeDomainBlockPlacementPolicy { } catch (IOException ioe) { return false; } - for(LocatedBlock block : locatedBlocks.getLocatedBlocks()) { + for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) { Set locs = new HashSet<>(); for (DatanodeInfo datanodeInfo : block.getLocations()) { - if (datanodeInfo.getAdminState() == - DatanodeInfo.AdminStates.NORMAL) { + if (datanodeInfo.getAdminState().equals( + DatanodeInfo.AdminStates.NORMAL)) { locs.add(datanodeInfo); } } for (DatanodeID datanodeID : expectedDatanodeIDs) { - successful = successful && locs.contains(datanodeID); + if (!locs.contains(datanodeID)) { + return false; + } } } - return successful; + return true; } - }, 1000, 60000); + }, 1000, WAIT_TIMEOUT_MS); // Verify block placement policy of each block. - LocatedBlocks locatedBlocks; - locatedBlocks = + LocatedBlocks locatedBlocks = cluster.getFileSystem().getClient().getLocatedBlocks( path.toString(), 0, fileSize); - for(LocatedBlock block : locatedBlocks.getLocatedBlocks()) { - BlockPlacementStatus status = cluster.getNamesystem().getBlockManager(). - getBlockPlacementPolicy().verifyBlockPlacement( - block.getLocations(), REPLICATION_FACTOR); - assertTrue(status.isPlacementPolicySatisfied()); + for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) { + BlockPlacementStatus status = + cluster.getNamesystem().getBlockManager() + .getBlockPlacementPolicy() + .verifyBlockPlacement(block.getLocations(), REPLICATION_FACTOR); + Assert.assertTrue(status.isPlacementPolicySatisfied()); } } }