From f6453244ab8a676144bb001d497582da284730a1 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Wed, 3 Jun 2020 12:16:36 +0530 Subject: [PATCH] HDFS-14960. TestBalancerWithNodeGroup should not succeed with DFSNetworkTopology. Contributed by Jim Brennan. --- .../balancer/TestBalancerWithNodeGroup.java | 62 ++++++++++++++++--- 1 file changed, 52 insertions(+), 10 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java index 6088722ef26..28dc9a0ae40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.balancer; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import java.io.IOException; import java.net.URI; @@ -44,11 +46,12 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopologyWithNodeGroup; import org.apache.hadoop.test.LambdaTestUtils; -import org.junit.Assert; import org.junit.Test; /** @@ -84,7 +87,7 @@ static Configuration createConf() { TestBalancer.initConf(conf); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); conf.setBoolean(DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY, false); - conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, + conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, NetworkTopologyWithNodeGroup.class.getName()); conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, BlockPlacementPolicyWithNodeGroup.class.getName()); @@ -192,8 +195,8 @@ private void runBalancerCanFinish(Configuration conf, // start rebalancing Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); - Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() || - (r == ExitStatus.NO_MOVE_PROGRESS.getExitCode())); + assertEquals("Balancer did not exit with NO_MOVE_PROGRESS", + ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); waitForHeartBeat(totalUsedSpace, totalCapacity); LOG.info("Rebalancing with default factor."); } @@ -211,6 +214,30 @@ private Set getBlocksOnRack(List blks, String rack) return ret; } + private void verifyNetworkTopology() { + NetworkTopology topology = + cluster.getNamesystem().getBlockManager().getDatanodeManager(). + getNetworkTopology(); + assertTrue("must be an instance of NetworkTopologyWithNodeGroup", + topology instanceof NetworkTopologyWithNodeGroup); + } + + private void verifyProperBlockPlacement(String file, + long length, int numOfReplicas) throws IOException { + BlockPlacementPolicy placementPolicy = + cluster.getNamesystem().getBlockManager().getBlockPlacementPolicy(); + List locatedBlocks = client. + getBlockLocations(file, 0, length).getLocatedBlocks(); + assertFalse("No blocks found for file " + file, locatedBlocks.isEmpty()); + for (LocatedBlock locatedBlock : locatedBlocks) { + BlockPlacementStatus status = placementPolicy.verifyBlockPlacement( + locatedBlock.getLocations(), numOfReplicas); + assertTrue("Block placement policy was not satisfied for block " + + locatedBlock.getBlock().getBlockId(), + status.isPlacementPolicySatisfied()); + } + } + /** * Create a cluster with even distribution, and a new empty node is added to * the cluster, then test rack locality for balancer policy. @@ -232,6 +259,7 @@ public void testBalancerWithRackLocality() throws Exception { cluster = new MiniDFSClusterWithNodeGroup(builder); try { cluster.waitActive(); + verifyNetworkTopology(); client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); @@ -258,12 +286,14 @@ public void testBalancerWithRackLocality() throws Exception { totalCapacity += newCapacity; // run balancer and validate results - runBalancerCanFinish(conf, totalUsedSpace, totalCapacity); + runBalancer(conf, totalUsedSpace, totalCapacity); lbs = client.getBlockLocations(filePath.toUri().getPath(), 0, length); Set after = getBlocksOnRack(lbs.getLocatedBlocks(), RACK0); assertEquals(before, after); - + + verifyProperBlockPlacement(filePath.toUri().getPath(), length, + numOfDatanodes); } finally { cluster.shutdown(); } @@ -291,15 +321,18 @@ public void testBalancerWithNodeGroup() throws Exception { cluster = new MiniDFSClusterWithNodeGroup(builder); try { cluster.waitActive(); + verifyNetworkTopology(); client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); long totalCapacity = TestBalancer.sum(capacities); + int numOfReplicas = numOfDatanodes / 2; // fill up the cluster to be 20% full long totalUsedSpace = totalCapacity * 2 / 10; - TestBalancer.createFile(cluster, filePath, totalUsedSpace / (numOfDatanodes/2), - (short) (numOfDatanodes/2), 0); + long length = totalUsedSpace / numOfReplicas; + TestBalancer.createFile(cluster, filePath, length, + (short) numOfReplicas, 0); long newCapacity = CAPACITY; String newRack = RACK1; @@ -313,6 +346,9 @@ public void testBalancerWithNodeGroup() throws Exception { // run balancer and validate results runBalancer(conf, totalUsedSpace, totalCapacity); + verifyProperBlockPlacement(filePath.toUri().getPath(), length, + numOfReplicas); + } finally { cluster.shutdown(); } @@ -345,6 +381,7 @@ public void testBalancerEndInNoMoveProgress() throws Exception { cluster = new MiniDFSClusterWithNodeGroup(builder); try { cluster.waitActive(); + verifyNetworkTopology(); client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); @@ -352,12 +389,17 @@ public void testBalancerEndInNoMoveProgress() throws Exception { long totalCapacity = TestBalancer.sum(capacities); // fill up the cluster to be 60% full long totalUsedSpace = totalCapacity * 6 / 10; - TestBalancer.createFile(cluster, filePath, totalUsedSpace / 3, - (short) (3), 0); + int numOfReplicas = 3; + long length = totalUsedSpace / 3; + TestBalancer.createFile(cluster, filePath, length, + (short) numOfReplicas, 0); // run balancer which can finish in 5 iterations with no block movement. runBalancerCanFinish(conf, totalUsedSpace, totalCapacity); + verifyProperBlockPlacement(filePath.toUri().getPath(), length, + numOfReplicas); + } finally { cluster.shutdown(); }