diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 8d2ddf5f888..6d2b2264eca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1376,6 +1376,9 @@ Release 2.8.0 - UNRELEASED HDFS-9313. Possible NullPointerException in BlockManager if no excess replica can be chosen. (mingma) + HDFS-9354. Fix TestBalancer#testBalancerWithZeroThreadsForMove on Windows. + (Xiaoyu Yao via cnauroth) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 095241dca68..e33e5862291 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -84,6 +84,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; import org.apache.log4j.Level; +import org.junit.After; import org.junit.Test; /** @@ -104,6 +105,14 @@ public class TestBalancer { final static Path filePath = new Path(fileName); private MiniDFSCluster cluster; + @After + public void shutdown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + ClientProtocol client; static final long TIMEOUT = 40000L; //msec @@ -348,44 +357,38 @@ public class TestBalancer { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length) .hosts(hosts).racks(racks).simulatedCapacities(capacities).build(); - try { - cluster.waitActive(); - client = NameNodeProxies.createProxy(conf, - cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); - - // fill up the cluster to be 80% full - long totalCapacity = sum(capacities); - long totalUsedSpace = totalCapacity * 8 / 10; - InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes]; - for (int i = 0; i < favoredNodes.length; i++) { - // DFSClient will attempt reverse lookup. In case it resolves - // "127.0.0.1" to "localhost", we manually specify the hostname. - int port = cluster.getDataNodes().get(i).getXferAddress().getPort(); - favoredNodes[i] = new InetSocketAddress(hosts[i], port); - } + cluster.waitActive(); + client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); - DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024, - totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE, - (short) numOfDatanodes, 0, false, favoredNodes); - - // start up an empty node with the same capacity - cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 }, - new long[] { CAPACITY }); - - totalCapacity += CAPACITY; - - // run balancer and validate results - waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); - - // start rebalancing - Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); - int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); - assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); - - } finally { - cluster.shutdown(); + // fill up the cluster to be 80% full + long totalCapacity = sum(capacities); + long totalUsedSpace = totalCapacity * 8 / 10; + InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes]; + for (int i = 0; i < favoredNodes.length; i++) { + // DFSClient will attempt reverse lookup. In case it resolves + // "127.0.0.1" to "localhost", we manually specify the hostname. + int port = cluster.getDataNodes().get(i).getXferAddress().getPort(); + favoredNodes[i] = new InetSocketAddress(hosts[i], port); } - + + DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024, + totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE, + (short) numOfDatanodes, 0, false, favoredNodes); + + // start up an empty node with the same capacity + cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 }, + new long[] { CAPACITY }); + + totalCapacity += CAPACITY; + + // run balancer and validate results + waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); + + // start rebalancing + Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); + assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); } /** @@ -569,7 +572,7 @@ public class TestBalancer { private void doTest(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, boolean useTool, boolean useFile) throws Exception { - LOG.info("capacities = " + long2String(capacities)); + LOG.info("capacities = " + long2String(capacities)); LOG.info("racks = " + Arrays.asList(racks)); LOG.info("newCapacity= " + newCapacity); LOG.info("newRack = " + newRack); @@ -587,7 +590,7 @@ public class TestBalancer { ClientProtocol.class).getProxy(); long totalCapacity = sum(capacities); - + // fill up the cluster to be 30% full long totalUsedSpace = totalCapacity*3/10; createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, @@ -811,7 +814,7 @@ public class TestBalancer { /** one-node cluster test*/ private void oneNodeTest(Configuration conf, boolean useTool) throws Exception { // add an empty node with half of the CAPACITY & the same rack - doTest(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, + doTest(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0, useTool); } @@ -865,31 +868,27 @@ public class TestBalancer { .racks(racks) .simulatedCapacities(capacities) .build(); - try { - cluster.waitActive(); - client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), - ClientProtocol.class).getProxy(); + cluster.waitActive(); + client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), + ClientProtocol.class).getProxy(); - for(int i = 0; i < 3; i++) { - cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null); - } - - cluster.startDataNodes(conf, 1, true, null, - new String[]{RACK0}, null,new long[]{CAPACITY}); - cluster.triggerHeartbeats(); - - Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); - Set datanodes = new HashSet(); - datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName()); - BalancerParameters.Builder pBuilder = - new BalancerParameters.Builder(); - pBuilder.setExcludedNodes(datanodes); - pBuilder.setRunDuringUpgrade(false); - final int r = Balancer.run(namenodes, pBuilder.build(), conf); - assertEquals(ExitStatus.SUCCESS.getExitCode(), r); - } finally { - cluster.shutdown(); + for(int i = 0; i < 3; i++) { + cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null); } + + cluster.startDataNodes(conf, 1, true, null, + new String[]{RACK0}, null,new long[]{CAPACITY}); + cluster.triggerHeartbeats(); + + Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + Set datanodes = new HashSet(); + datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName()); + BalancerParameters.Builder pBuilder = + new BalancerParameters.Builder(); + pBuilder.setExcludedNodes(datanodes); + pBuilder.setRunDuringUpgrade(false); + final int r = Balancer.run(namenodes, pBuilder.build(), conf); + assertEquals(ExitStatus.SUCCESS.getExitCode(), r); } /** @@ -1322,47 +1321,44 @@ public class TestBalancer { .storageTypes(new StorageType[] { RAM_DISK, DEFAULT }) .build(); - try { - cluster.waitActive(); - // Create few files on RAM_DISK - final String METHOD_NAME = GenericTestUtils.getMethodName(); - final Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); - final Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); + cluster.waitActive(); + // Create few files on RAM_DISK + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); + final Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); - DistributedFileSystem fs = cluster.getFileSystem(); - DFSClient client = fs.getClient(); - DFSTestUtil.createFile(fs, path1, true, - DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE, - DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true); - DFSTestUtil.createFile(fs, path2, true, - DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE, - DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true); + DistributedFileSystem fs = cluster.getFileSystem(); + DFSClient client = fs.getClient(); + DFSTestUtil.createFile(fs, path1, true, + DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE, + DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true); + DFSTestUtil.createFile(fs, path2, true, + DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE, + DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true); - // Sleep for a short time to allow the lazy writer thread to do its job - Thread.sleep(6 * 1000); + // Sleep for a short time to allow the lazy writer thread to do its job + Thread.sleep(6 * 1000); - // Add another fresh DN with the same type/capacity without files on RAM_DISK - StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}}; - long[][] storageCapacities = new long[][]{{ramDiskStorageLimit, diskStorageLimit}}; - cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null, - null, null, storageCapacities, null, false, false, false, null); + // Add another fresh DN with the same type/capacity without files on RAM_DISK + StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}}; + long[][] storageCapacities = new long[][]{{ramDiskStorageLimit, + diskStorageLimit}}; + cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null, + null, null, storageCapacities, null, false, false, false, null); - cluster.triggerHeartbeats(); - Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + cluster.triggerHeartbeats(); + Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); - // Run Balancer - final BalancerParameters p = BalancerParameters.DEFAULT; - final int r = Balancer.run(namenodes, p, conf); + // Run Balancer + final BalancerParameters p = BalancerParameters.DEFAULT; + final int r = Balancer.run(namenodes, p, conf); - // Validate no RAM_DISK block should be moved - assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); + // Validate no RAM_DISK block should be moved + assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); - // Verify files are still on RAM_DISK - DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK); - DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK); - } finally { - cluster.shutdown(); - } + // Verify files are still on RAM_DISK + DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK); + DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK); } /** @@ -1386,51 +1382,45 @@ public class TestBalancer { .storageTypes(new StorageType[] { DEFAULT }) .storagesPerDatanode(1) .build(); + cluster.waitActive(); + // Create a file on the single DN + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); - try { - cluster.waitActive(); - // Create a file on the single DN - final String METHOD_NAME = GenericTestUtils.getMethodName(); - final Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); + DistributedFileSystem fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 2, BLOCK_SIZE, + (short) 1, SEED); - DistributedFileSystem fs = cluster.getFileSystem(); - DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 2, BLOCK_SIZE, - (short) 1, SEED); + // Add another DN with the same capacity, cluster is now unbalanced + cluster.startDataNodes(conf, 1, true, null, null); + cluster.triggerHeartbeats(); + Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); - // Add another DN with the same capacity, cluster is now unbalanced - cluster.startDataNodes(conf, 1, true, null, null); - cluster.triggerHeartbeats(); - Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + // Run balancer + final BalancerParameters p = BalancerParameters.DEFAULT; - // Run balancer - final BalancerParameters p = BalancerParameters.DEFAULT; + fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); + fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE); + fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); - fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); - fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE); - fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); + // Rolling upgrade should abort the balancer + assertEquals(ExitStatus.UNFINALIZED_UPGRADE.getExitCode(), + Balancer.run(namenodes, p, conf)); - // Rolling upgrade should abort the balancer - assertEquals(ExitStatus.UNFINALIZED_UPGRADE.getExitCode(), - Balancer.run(namenodes, p, conf)); + // Should work with the -runDuringUpgrade flag. + BalancerParameters.Builder b = + new BalancerParameters.Builder(); + b.setRunDuringUpgrade(true); + final BalancerParameters runDuringUpgrade = b.build(); + assertEquals(ExitStatus.SUCCESS.getExitCode(), + Balancer.run(namenodes, runDuringUpgrade, conf)); - // Should work with the -runDuringUpgrade flag. - BalancerParameters.Builder b = - new BalancerParameters.Builder(); - b.setRunDuringUpgrade(true); - final BalancerParameters runDuringUpgrade = b.build(); - assertEquals(ExitStatus.SUCCESS.getExitCode(), - Balancer.run(namenodes, runDuringUpgrade, conf)); + // Finalize the rolling upgrade + fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.FINALIZE); - // Finalize the rolling upgrade - fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.FINALIZE); - - // Should also work after finalization. - assertEquals(ExitStatus.SUCCESS.getExitCode(), - Balancer.run(namenodes, p, conf)); - - } finally { - cluster.shutdown(); - } + // Should also work after finalization. + assertEquals(ExitStatus.SUCCESS.getExitCode(), + Balancer.run(namenodes, p, conf)); } /** @@ -1452,7 +1442,7 @@ public class TestBalancer { conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L); int numOfDatanodes =2; - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(2) .racks(new String[]{"/default/rack0", "/default/rack0"}) .storagesPerDatanode(2) @@ -1463,39 +1453,33 @@ public class TestBalancer { {100 * blockSize, 20 * blockSize}, {20 * blockSize, 100 * blockSize}}) .build(); + cluster.waitActive(); - try { - cluster.waitActive(); + //set "/bar" directory with ONE_SSD storage policy. + DistributedFileSystem fs = cluster.getFileSystem(); + Path barDir = new Path("/bar"); + fs.mkdir(barDir,new FsPermission((short)777)); + fs.setStoragePolicy(barDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME); - //set "/bar" directory with ONE_SSD storage policy. - DistributedFileSystem fs = cluster.getFileSystem(); - Path barDir = new Path("/bar"); - fs.mkdir(barDir,new FsPermission((short)777)); - fs.setStoragePolicy(barDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME); + // Insert 30 blocks. So (DN0,SSD) and (DN1,DISK) are about half full, + // and (DN0,SSD) and (DN1,DISK) are about 15% full. + long fileLen = 30 * blockSize; + // fooFile has ONE_SSD policy. So + // (DN0,SSD) and (DN1,DISK) have 2 replicas belong to same block. + // (DN0,DISK) and (DN1,SSD) have 2 replicas belong to same block. + Path fooFile = new Path(barDir, "foo"); + createFile(cluster, fooFile, fileLen, (short) numOfDatanodes, 0); + // update space info + cluster.triggerHeartbeats(); - // Insert 30 blocks. So (DN0,SSD) and (DN1,DISK) are about half full, - // and (DN0,SSD) and (DN1,DISK) are about 15% full. - long fileLen = 30 * blockSize; - // fooFile has ONE_SSD policy. So - // (DN0,SSD) and (DN1,DISK) have 2 replicas belong to same block. - // (DN0,DISK) and (DN1,SSD) have 2 replicas belong to same block. - Path fooFile = new Path(barDir, "foo"); - createFile(cluster, fooFile, fileLen, (short) numOfDatanodes, 0); - // update space info - cluster.triggerHeartbeats(); + BalancerParameters p = BalancerParameters.DEFAULT; + Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + final int r = Balancer.run(namenodes, p, conf); - BalancerParameters p = BalancerParameters.DEFAULT; - Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); - final int r = Balancer.run(namenodes, p, conf); - - // Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK) - // already has one. Otherwise DN1 will have 2 replicas. - // For same reason, no replicas were moved. - assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); - - } finally { - cluster.shutdown(); - } + // Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK) + // already has one. Otherwise DN1 will have 2 replicas. + // For same reason, no replicas were moved. + assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); } /** @@ -1526,50 +1510,46 @@ public class TestBalancer { int numOfDatanodes = capacities.length; cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length) .racks(racks).simulatedCapacities(capacities).build(); - try { - cluster.waitActive(); - client = NameNodeProxies.createProxy(conf, - cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); + cluster.waitActive(); + client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); - long totalCapacity = sum(capacities); + long totalCapacity = sum(capacities); - // fill up the cluster to be 30% full - final long totalUsedSpace = totalCapacity * 3 / 10; - createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, - (short) numOfDatanodes, 0); - // start up an empty node with the same capacity and on the same rack - cluster.startDataNodes(conf, 1, true, null, new String[] { newRack }, - new long[] { newCapacity }); + // fill up the cluster to be 30% full + final long totalUsedSpace = totalCapacity * 3 / 10; + createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, + (short) numOfDatanodes, 0); + // start up an empty node with the same capacity and on the same rack + cluster.startDataNodes(conf, 1, true, null, new String[] { newRack }, + new long[] { newCapacity }); - // Case1: Simulate first balancer by creating 'balancer.id' file. It - // will keep this file until the balancing operation is completed. - FileSystem fs = cluster.getFileSystem(0); - final FSDataOutputStream out = fs - .create(Balancer.BALANCER_ID_PATH, false); - out.writeBytes(InetAddress.getLocalHost().getHostName()); - out.hflush(); - assertTrue("'balancer.id' file doesn't exist!", - fs.exists(Balancer.BALANCER_ID_PATH)); + // Case1: Simulate first balancer by creating 'balancer.id' file. It + // will keep this file until the balancing operation is completed. + FileSystem fs = cluster.getFileSystem(0); + final FSDataOutputStream out = fs + .create(Balancer.BALANCER_ID_PATH, false); + out.writeBytes(InetAddress.getLocalHost().getHostName()); + out.hflush(); + assertTrue("'balancer.id' file doesn't exist!", + fs.exists(Balancer.BALANCER_ID_PATH)); - // start second balancer - final String[] args = { "-policy", "datanode" }; - final Tool tool = new Cli(); - tool.setConf(conf); - int exitCode = tool.run(args); // start balancing - assertEquals("Exit status code mismatches", - ExitStatus.IO_EXCEPTION.getExitCode(), exitCode); + // start second balancer + final String[] args = { "-policy", "datanode" }; + final Tool tool = new Cli(); + tool.setConf(conf); + int exitCode = tool.run(args); // start balancing + assertEquals("Exit status code mismatches", + ExitStatus.IO_EXCEPTION.getExitCode(), exitCode); - // Case2: Release lease so that another balancer would be able to - // perform balancing. - out.close(); - assertTrue("'balancer.id' file doesn't exist!", - fs.exists(Balancer.BALANCER_ID_PATH)); - exitCode = tool.run(args); // start balancing - assertEquals("Exit status code mismatches", - ExitStatus.SUCCESS.getExitCode(), exitCode); - } finally { - cluster.shutdown(); - } + // Case2: Release lease so that another balancer would be able to + // perform balancing. + out.close(); + assertTrue("'balancer.id' file doesn't exist!", + fs.exists(Balancer.BALANCER_ID_PATH)); + exitCode = tool.run(args); // start balancing + assertEquals("Exit status code mismatches", + ExitStatus.SUCCESS.getExitCode(), exitCode); } /** Balancer should not move blocks with size < minBlockSize. */ @@ -1589,102 +1569,97 @@ public class TestBalancer { .simulatedCapacities(capacities) .build(); final DistributedFileSystem dfs = cluster.getFileSystem(); + cluster.waitActive(); + client = NameNodeProxies.createProxy(conf, dfs.getUri(), + ClientProtocol.class).getProxy(); - try { - cluster.waitActive(); - client = NameNodeProxies.createProxy(conf, dfs.getUri(), - ClientProtocol.class).getProxy(); - - // fill up the cluster to be 80% full - for(int i = 0; i < lengths.length; i++) { - final long size = lengths[i]; - final Path p = new Path("/file" + i + "_size" + size); - try(final OutputStream out = dfs.create(p)) { - for(int j = 0; j < size; j++) { - out.write(j); - } + // fill up the cluster to be 80% full + for(int i = 0; i < lengths.length; i++) { + final long size = lengths[i]; + final Path p = new Path("/file" + i + "_size" + size); + try(final OutputStream out = dfs.create(p)) { + for(int j = 0; j < size; j++) { + out.write(j); } } - - // start up an empty node with the same capacity - cluster.startDataNodes(conf, capacities.length, true, null, null, capacities); - LOG.info("capacities = " + Arrays.toString(capacities)); - LOG.info("totalUsedSpace= " + totalUsed); - LOG.info("lengths = " + Arrays.toString(lengths) + ", #=" + lengths.length); - waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, cluster); - - final Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + } - { // run Balancer with min-block-size=50 - BalancerParameters.Builder b = - new BalancerParameters.Builder(); - b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE); - b.setThreshold(1); - final BalancerParameters p = b.build(); + // start up an empty node with the same capacity + cluster.startDataNodes(conf, capacities.length, true, null, null, capacities); + LOG.info("capacities = " + Arrays.toString(capacities)); + LOG.info("totalUsedSpace= " + totalUsed); + LOG.info("lengths = " + Arrays.toString(lengths) + ", #=" + lengths.length); + waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, cluster); - conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); - final int r = Balancer.run(namenodes, p, conf); - assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); + final Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + + { // run Balancer with min-block-size=50 + BalancerParameters.Builder b = + new BalancerParameters.Builder(); + b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE); + b.setThreshold(1); + final BalancerParameters p = b.build(); + + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); + final int r = Balancer.run(namenodes, p, conf); + assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); + } + + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); + + { // run Balancer with empty nodes as source nodes + final Set sourceNodes = new HashSet<>(); + final List datanodes = cluster.getDataNodes(); + for(int i = capacities.length; i < datanodes.size(); i++) { + sourceNodes.add(datanodes.get(i).getDisplayName()); } - + BalancerParameters.Builder b = + new BalancerParameters.Builder(); + b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE); + b.setThreshold(1); + b.setSourceNodes(sourceNodes); + final BalancerParameters p = b.build(); + + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); + final int r = Balancer.run(namenodes, p, conf); + assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r); + } + + { // run Balancer with a filled node as a source node + final Set sourceNodes = new HashSet<>(); + final List datanodes = cluster.getDataNodes(); + sourceNodes.add(datanodes.get(0).getDisplayName()); + BalancerParameters.Builder b = + new BalancerParameters.Builder(); + b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE); + b.setThreshold(1); + b.setSourceNodes(sourceNodes); + final BalancerParameters p = b.build(); + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); + final int r = Balancer.run(namenodes, p, conf); + assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r); + } - { // run Balancer with empty nodes as source nodes - final Set sourceNodes = new HashSet<>(); - final List datanodes = cluster.getDataNodes(); - for(int i = capacities.length; i < datanodes.size(); i++) { - sourceNodes.add(datanodes.get(i).getDisplayName()); - } - BalancerParameters.Builder b = - new BalancerParameters.Builder(); - b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE); - b.setThreshold(1); - b.setSourceNodes(sourceNodes); - final BalancerParameters p = b.build(); - - conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); - final int r = Balancer.run(namenodes, p, conf); - assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r); + { // run Balancer with all filled node as source nodes + final Set sourceNodes = new HashSet<>(); + final List datanodes = cluster.getDataNodes(); + for(int i = 0; i < capacities.length; i++) { + sourceNodes.add(datanodes.get(i).getDisplayName()); } + BalancerParameters.Builder b = + new BalancerParameters.Builder(); + b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE); + b.setThreshold(1); + b.setSourceNodes(sourceNodes); + final BalancerParameters p = b.build(); - { // run Balancer with a filled node as a source node - final Set sourceNodes = new HashSet<>(); - final List datanodes = cluster.getDataNodes(); - sourceNodes.add(datanodes.get(0).getDisplayName()); - BalancerParameters.Builder b = - new BalancerParameters.Builder(); - b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE); - b.setThreshold(1); - b.setSourceNodes(sourceNodes); - final BalancerParameters p = b.build(); - - conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); - final int r = Balancer.run(namenodes, p, conf); - assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r); - } - - { // run Balancer with all filled node as source nodes - final Set sourceNodes = new HashSet<>(); - final List datanodes = cluster.getDataNodes(); - for(int i = 0; i < capacities.length; i++) { - sourceNodes.add(datanodes.get(i).getDisplayName()); - } - BalancerParameters.Builder b = - new BalancerParameters.Builder(); - b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE); - b.setThreshold(1); - b.setSourceNodes(sourceNodes); - final BalancerParameters p = b.build(); - - conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); - final int r = Balancer.run(namenodes, p, conf); - assertEquals(ExitStatus.SUCCESS.getExitCode(), r); - } - } finally { - cluster.shutdown(); + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); + final int r = Balancer.run(namenodes, p, conf); + assertEquals(ExitStatus.SUCCESS.getExitCode(), r); } } - + /** * @param args */