diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java index b057c9bf999..0421941f3f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java @@ -17,16 +17,13 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.util.HashSet; import java.util.Set; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -41,16 +38,14 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager; import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager; -import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.util.HostsFileWriter; import org.apache.hadoop.net.StaticMapping; import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.function.Supplier; - /** * End-to-end test case for upgrade domain * The test configs upgrade domain for nodes via admin json @@ -63,6 +58,8 @@ public class TestUpgradeDomainBlockPlacementPolicy { private static final short REPLICATION_FACTOR = (short) 3; private static final int DEFAULT_BLOCK_SIZE = 1024; + private static final int WAIT_TIMEOUT_MS = 60000; + private static final long FILE_SIZE = DEFAULT_BLOCK_SIZE * 5; static final String[] racks = { "/RACK1", "/RACK1", "/RACK1", "/RACK2", "/RACK2", "/RACK2" }; static final String[] hosts = @@ -71,9 +68,6 @@ public class TestUpgradeDomainBlockPlacementPolicy { {"ud5", "ud2", "ud3", "ud1", "ud2", "ud4"}; static final Set expectedDatanodeIDs = new HashSet<>(); private MiniDFSCluster cluster = null; - private NamenodeProtocols nameNodeRpc = null; - private FSNamesystem namesystem = null; - private PermissionStatus perm = null; private HostsFileWriter hostsFileWriter = new HostsFileWriter(); @Before @@ -92,10 +86,6 @@ public void setup() throws IOException { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).racks(racks) .hosts(hosts).build(); cluster.waitActive(); - nameNodeRpc = cluster.getNameNodeRpc(); - namesystem = cluster.getNamesystem(); - perm = new PermissionStatus("TestDefaultBlockPlacementPolicy", null, - FsPermission.getDefault()); refreshDatanodeAdminProperties(); } @@ -186,43 +176,51 @@ private void refreshDatanodeAdminProperties2() expectedDatanodeIDs.add(cluster.getDataNodes().get(5).getDatanodeId()); } + private void createFileAndWaitForReplication(final Path path, + final long fileLen) + throws Exception { + DFSTestUtil.createFile(cluster.getFileSystem(), path, fileLen, + REPLICATION_FACTOR, 1000L); + DFSTestUtil.waitForReplication(cluster.getFileSystem(), path, + REPLICATION_FACTOR, WAIT_TIMEOUT_MS); + } + @Test public void testPlacement() throws Exception { - final long fileSize = DEFAULT_BLOCK_SIZE * 5; - final String testFile = new String("/testfile"); + final long fileSize = FILE_SIZE; + final String testFile = "/testfile"; final Path path = new Path(testFile); - DFSTestUtil.createFile(cluster.getFileSystem(), path, fileSize, - REPLICATION_FACTOR, 1000L); + createFileAndWaitForReplication(path, FILE_SIZE); LocatedBlocks locatedBlocks = cluster.getFileSystem().getClient().getLocatedBlocks( path.toString(), 0, fileSize); for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) { Set locs = new HashSet<>(); for(DatanodeInfo datanodeInfo : block.getLocations()) { - if (datanodeInfo.getAdminState() == DatanodeInfo.AdminStates.NORMAL) { + if (datanodeInfo.getAdminState() + .equals(DatanodeInfo.AdminStates.NORMAL)) { locs.add(datanodeInfo); } } for (DatanodeID datanodeID : expectedDatanodeIDs) { - assertTrue(locs.contains(datanodeID)); + Assert.assertTrue(locs.contains(datanodeID)); } } } @Test(timeout = 300000) public void testPlacementAfterDecommission() throws Exception { - final long fileSize = DEFAULT_BLOCK_SIZE * 5; - final String testFile = new String("/testfile"); + final long fileSize = FILE_SIZE; + final String testFile = "/testfile-afterdecomm"; final Path path = new Path(testFile); - DFSTestUtil.createFile(cluster.getFileSystem(), path, fileSize, - REPLICATION_FACTOR, 1000L); + createFileAndWaitForReplication(path, fileSize); // Decommission some nodes and wait until decommissions have finished. refreshDatanodeAdminProperties2(); + GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { - boolean successful = true; LocatedBlocks locatedBlocks; try { locatedBlocks = @@ -231,32 +229,34 @@ public Boolean get() { } catch (IOException ioe) { return false; } - for(LocatedBlock block : locatedBlocks.getLocatedBlocks()) { + for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) { Set locs = new HashSet<>(); for (DatanodeInfo datanodeInfo : block.getLocations()) { - if (datanodeInfo.getAdminState() == - DatanodeInfo.AdminStates.NORMAL) { + if (datanodeInfo.getAdminState().equals( + DatanodeInfo.AdminStates.NORMAL)) { locs.add(datanodeInfo); } } for (DatanodeID datanodeID : expectedDatanodeIDs) { - successful = successful && locs.contains(datanodeID); + if (!locs.contains(datanodeID)) { + return false; + } } } - return successful; + return true; } - }, 1000, 60000); + }, 1000, WAIT_TIMEOUT_MS); // Verify block placement policy of each block. - LocatedBlocks locatedBlocks; - locatedBlocks = + LocatedBlocks locatedBlocks = cluster.getFileSystem().getClient().getLocatedBlocks( path.toString(), 0, fileSize); - for(LocatedBlock block : locatedBlocks.getLocatedBlocks()) { - BlockPlacementStatus status = cluster.getNamesystem().getBlockManager(). - getBlockPlacementPolicy().verifyBlockPlacement( - block.getLocations(), REPLICATION_FACTOR); - assertTrue(status.isPlacementPolicySatisfied()); + for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) { + BlockPlacementStatus status = + cluster.getNamesystem().getBlockManager() + .getBlockPlacementPolicy() + .verifyBlockPlacement(block.getLocations(), REPLICATION_FACTOR); + Assert.assertTrue(status.isPlacementPolicySatisfied()); } } }