HDFS-9354. Fix TestBalancer#testBalancerWithZeroThreadsForMove on Windows. Contributed by Xiaoyu Yao.

(cherry picked from commit 095ac83402)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
This commit is contained in:
cnauroth 2015-11-03 10:51:21 -08:00
parent 3190446df9
commit 9d11d2a8f2
2 changed files with 264 additions and 286 deletions

View File

@ -1376,6 +1376,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9313. Possible NullPointerException in BlockManager if no excess HDFS-9313. Possible NullPointerException in BlockManager if no excess
replica can be chosen. (mingma) replica can be chosen. (mingma)
HDFS-9354. Fix TestBalancer#testBalancerWithZeroThreadsForMove on Windows.
(Xiaoyu Yao via cnauroth)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -84,6 +84,7 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Test; import org.junit.Test;
/** /**
@ -104,6 +105,14 @@ public class TestBalancer {
final static Path filePath = new Path(fileName); final static Path filePath = new Path(fileName);
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
@After
public void shutdown() throws Exception {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
ClientProtocol client; ClientProtocol client;
static final long TIMEOUT = 40000L; //msec static final long TIMEOUT = 40000L; //msec
@ -348,44 +357,38 @@ public class TestBalancer {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
.hosts(hosts).racks(racks).simulatedCapacities(capacities).build(); .hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
try { cluster.waitActive();
cluster.waitActive(); client = NameNodeProxies.createProxy(conf,
client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
// fill up the cluster to be 80% full
long totalCapacity = sum(capacities);
long totalUsedSpace = totalCapacity * 8 / 10;
InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
for (int i = 0; i < favoredNodes.length; i++) {
// DFSClient will attempt reverse lookup. In case it resolves
// "127.0.0.1" to "localhost", we manually specify the hostname.
int port = cluster.getDataNodes().get(i).getXferAddress().getPort();
favoredNodes[i] = new InetSocketAddress(hosts[i], port);
}
DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024, // fill up the cluster to be 80% full
totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE, long totalCapacity = sum(capacities);
(short) numOfDatanodes, 0, false, favoredNodes); long totalUsedSpace = totalCapacity * 8 / 10;
InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
// start up an empty node with the same capacity for (int i = 0; i < favoredNodes.length; i++) {
cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 }, // DFSClient will attempt reverse lookup. In case it resolves
new long[] { CAPACITY }); // "127.0.0.1" to "localhost", we manually specify the hostname.
int port = cluster.getDataNodes().get(i).getXferAddress().getPort();
totalCapacity += CAPACITY; favoredNodes[i] = new InetSocketAddress(hosts[i], port);
// run balancer and validate results
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
} finally {
cluster.shutdown();
} }
DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE,
(short) numOfDatanodes, 0, false, favoredNodes);
// start up an empty node with the same capacity
cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 },
new long[] { CAPACITY });
totalCapacity += CAPACITY;
// run balancer and validate results
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
} }
/** /**
@ -569,7 +572,7 @@ public class TestBalancer {
private void doTest(Configuration conf, long[] capacities, private void doTest(Configuration conf, long[] capacities,
String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, String[] racks, long newCapacity, String newRack, NewNodeInfo nodes,
boolean useTool, boolean useFile) throws Exception { boolean useTool, boolean useFile) throws Exception {
LOG.info("capacities = " + long2String(capacities)); LOG.info("capacities = " + long2String(capacities));
LOG.info("racks = " + Arrays.asList(racks)); LOG.info("racks = " + Arrays.asList(racks));
LOG.info("newCapacity= " + newCapacity); LOG.info("newCapacity= " + newCapacity);
LOG.info("newRack = " + newRack); LOG.info("newRack = " + newRack);
@ -587,7 +590,7 @@ public class TestBalancer {
ClientProtocol.class).getProxy(); ClientProtocol.class).getProxy();
long totalCapacity = sum(capacities); long totalCapacity = sum(capacities);
// fill up the cluster to be 30% full // fill up the cluster to be 30% full
long totalUsedSpace = totalCapacity*3/10; long totalUsedSpace = totalCapacity*3/10;
createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
@ -811,7 +814,7 @@ public class TestBalancer {
/** one-node cluster test*/ /** one-node cluster test*/
private void oneNodeTest(Configuration conf, boolean useTool) throws Exception { private void oneNodeTest(Configuration conf, boolean useTool) throws Exception {
// add an empty node with half of the CAPACITY & the same rack // add an empty node with half of the CAPACITY & the same rack
doTest(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, doTest(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2,
RACK0, useTool); RACK0, useTool);
} }
@ -865,31 +868,27 @@ public class TestBalancer {
.racks(racks) .racks(racks)
.simulatedCapacities(capacities) .simulatedCapacities(capacities)
.build(); .build();
try { cluster.waitActive();
cluster.waitActive(); client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
ClientProtocol.class).getProxy();
for(int i = 0; i < 3; i++) { for(int i = 0; i < 3; i++) {
cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null); cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null);
}
cluster.startDataNodes(conf, 1, true, null,
new String[]{RACK0}, null,new long[]{CAPACITY});
cluster.triggerHeartbeats();
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Set<String> datanodes = new HashSet<String>();
datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
BalancerParameters.Builder pBuilder =
new BalancerParameters.Builder();
pBuilder.setExcludedNodes(datanodes);
pBuilder.setRunDuringUpgrade(false);
final int r = Balancer.run(namenodes, pBuilder.build(), conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
} finally {
cluster.shutdown();
} }
cluster.startDataNodes(conf, 1, true, null,
new String[]{RACK0}, null,new long[]{CAPACITY});
cluster.triggerHeartbeats();
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Set<String> datanodes = new HashSet<String>();
datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
BalancerParameters.Builder pBuilder =
new BalancerParameters.Builder();
pBuilder.setExcludedNodes(datanodes);
pBuilder.setRunDuringUpgrade(false);
final int r = Balancer.run(namenodes, pBuilder.build(), conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
} }
/** /**
@ -1322,47 +1321,44 @@ public class TestBalancer {
.storageTypes(new StorageType[] { RAM_DISK, DEFAULT }) .storageTypes(new StorageType[] { RAM_DISK, DEFAULT })
.build(); .build();
try { cluster.waitActive();
cluster.waitActive(); // Create few files on RAM_DISK
// Create few files on RAM_DISK final String METHOD_NAME = GenericTestUtils.getMethodName();
final String METHOD_NAME = GenericTestUtils.getMethodName(); final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
final Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); final Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
final Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
DistributedFileSystem fs = cluster.getFileSystem(); DistributedFileSystem fs = cluster.getFileSystem();
DFSClient client = fs.getClient(); DFSClient client = fs.getClient();
DFSTestUtil.createFile(fs, path1, true, DFSTestUtil.createFile(fs, path1, true,
DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE, DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE,
DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true); DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
DFSTestUtil.createFile(fs, path2, true, DFSTestUtil.createFile(fs, path2, true,
DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE, DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE,
DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true); DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
// Sleep for a short time to allow the lazy writer thread to do its job // Sleep for a short time to allow the lazy writer thread to do its job
Thread.sleep(6 * 1000); Thread.sleep(6 * 1000);
// Add another fresh DN with the same type/capacity without files on RAM_DISK // Add another fresh DN with the same type/capacity without files on RAM_DISK
StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}}; StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}};
long[][] storageCapacities = new long[][]{{ramDiskStorageLimit, diskStorageLimit}}; long[][] storageCapacities = new long[][]{{ramDiskStorageLimit,
cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null, diskStorageLimit}};
null, null, storageCapacities, null, false, false, false, null); cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null,
null, null, storageCapacities, null, false, false, false, null);
cluster.triggerHeartbeats(); cluster.triggerHeartbeats();
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
// Run Balancer // Run Balancer
final BalancerParameters p = BalancerParameters.DEFAULT; final BalancerParameters p = BalancerParameters.DEFAULT;
final int r = Balancer.run(namenodes, p, conf); final int r = Balancer.run(namenodes, p, conf);
// Validate no RAM_DISK block should be moved // Validate no RAM_DISK block should be moved
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
// Verify files are still on RAM_DISK // Verify files are still on RAM_DISK
DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK); DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK);
DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK); DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK);
} finally {
cluster.shutdown();
}
} }
/** /**
@ -1386,51 +1382,45 @@ public class TestBalancer {
.storageTypes(new StorageType[] { DEFAULT }) .storageTypes(new StorageType[] { DEFAULT })
.storagesPerDatanode(1) .storagesPerDatanode(1)
.build(); .build();
cluster.waitActive();
// Create a file on the single DN
final String METHOD_NAME = GenericTestUtils.getMethodName();
final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
try { DistributedFileSystem fs = cluster.getFileSystem();
cluster.waitActive(); DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 2, BLOCK_SIZE,
// Create a file on the single DN (short) 1, SEED);
final String METHOD_NAME = GenericTestUtils.getMethodName();
final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
DistributedFileSystem fs = cluster.getFileSystem(); // Add another DN with the same capacity, cluster is now unbalanced
DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 2, BLOCK_SIZE, cluster.startDataNodes(conf, 1, true, null, null);
(short) 1, SEED); cluster.triggerHeartbeats();
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
// Add another DN with the same capacity, cluster is now unbalanced // Run balancer
cluster.startDataNodes(conf, 1, true, null, null); final BalancerParameters p = BalancerParameters.DEFAULT;
cluster.triggerHeartbeats();
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
// Run balancer fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
final BalancerParameters p = BalancerParameters.DEFAULT; fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE);
fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); // Rolling upgrade should abort the balancer
fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE); assertEquals(ExitStatus.UNFINALIZED_UPGRADE.getExitCode(),
fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); Balancer.run(namenodes, p, conf));
// Rolling upgrade should abort the balancer // Should work with the -runDuringUpgrade flag.
assertEquals(ExitStatus.UNFINALIZED_UPGRADE.getExitCode(), BalancerParameters.Builder b =
Balancer.run(namenodes, p, conf)); new BalancerParameters.Builder();
b.setRunDuringUpgrade(true);
final BalancerParameters runDuringUpgrade = b.build();
assertEquals(ExitStatus.SUCCESS.getExitCode(),
Balancer.run(namenodes, runDuringUpgrade, conf));
// Should work with the -runDuringUpgrade flag. // Finalize the rolling upgrade
BalancerParameters.Builder b = fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.FINALIZE);
new BalancerParameters.Builder();
b.setRunDuringUpgrade(true);
final BalancerParameters runDuringUpgrade = b.build();
assertEquals(ExitStatus.SUCCESS.getExitCode(),
Balancer.run(namenodes, runDuringUpgrade, conf));
// Finalize the rolling upgrade // Should also work after finalization.
fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.FINALIZE); assertEquals(ExitStatus.SUCCESS.getExitCode(),
Balancer.run(namenodes, p, conf));
// Should also work after finalization.
assertEquals(ExitStatus.SUCCESS.getExitCode(),
Balancer.run(namenodes, p, conf));
} finally {
cluster.shutdown();
}
} }
/** /**
@ -1452,7 +1442,7 @@ public class TestBalancer {
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
int numOfDatanodes =2; int numOfDatanodes =2;
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2) .numDataNodes(2)
.racks(new String[]{"/default/rack0", "/default/rack0"}) .racks(new String[]{"/default/rack0", "/default/rack0"})
.storagesPerDatanode(2) .storagesPerDatanode(2)
@ -1463,39 +1453,33 @@ public class TestBalancer {
{100 * blockSize, 20 * blockSize}, {100 * blockSize, 20 * blockSize},
{20 * blockSize, 100 * blockSize}}) {20 * blockSize, 100 * blockSize}})
.build(); .build();
cluster.waitActive();
try { //set "/bar" directory with ONE_SSD storage policy.
cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem();
Path barDir = new Path("/bar");
fs.mkdir(barDir,new FsPermission((short)777));
fs.setStoragePolicy(barDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
//set "/bar" directory with ONE_SSD storage policy. // Insert 30 blocks. So (DN0,SSD) and (DN1,DISK) are about half full,
DistributedFileSystem fs = cluster.getFileSystem(); // and (DN0,SSD) and (DN1,DISK) are about 15% full.
Path barDir = new Path("/bar"); long fileLen = 30 * blockSize;
fs.mkdir(barDir,new FsPermission((short)777)); // fooFile has ONE_SSD policy. So
fs.setStoragePolicy(barDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME); // (DN0,SSD) and (DN1,DISK) have 2 replicas belong to same block.
// (DN0,DISK) and (DN1,SSD) have 2 replicas belong to same block.
Path fooFile = new Path(barDir, "foo");
createFile(cluster, fooFile, fileLen, (short) numOfDatanodes, 0);
// update space info
cluster.triggerHeartbeats();
// Insert 30 blocks. So (DN0,SSD) and (DN1,DISK) are about half full, BalancerParameters p = BalancerParameters.DEFAULT;
// and (DN0,SSD) and (DN1,DISK) are about 15% full. Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
long fileLen = 30 * blockSize; final int r = Balancer.run(namenodes, p, conf);
// fooFile has ONE_SSD policy. So
// (DN0,SSD) and (DN1,DISK) have 2 replicas belong to same block.
// (DN0,DISK) and (DN1,SSD) have 2 replicas belong to same block.
Path fooFile = new Path(barDir, "foo");
createFile(cluster, fooFile, fileLen, (short) numOfDatanodes, 0);
// update space info
cluster.triggerHeartbeats();
BalancerParameters p = BalancerParameters.DEFAULT; // Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK)
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); // already has one. Otherwise DN1 will have 2 replicas.
final int r = Balancer.run(namenodes, p, conf); // For same reason, no replicas were moved.
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
// Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK)
// already has one. Otherwise DN1 will have 2 replicas.
// For same reason, no replicas were moved.
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
} finally {
cluster.shutdown();
}
} }
/** /**
@ -1526,50 +1510,46 @@ public class TestBalancer {
int numOfDatanodes = capacities.length; int numOfDatanodes = capacities.length;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
.racks(racks).simulatedCapacities(capacities).build(); .racks(racks).simulatedCapacities(capacities).build();
try { cluster.waitActive();
cluster.waitActive(); client = NameNodeProxies.createProxy(conf,
client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
long totalCapacity = sum(capacities); long totalCapacity = sum(capacities);
// fill up the cluster to be 30% full // fill up the cluster to be 30% full
final long totalUsedSpace = totalCapacity * 3 / 10; final long totalUsedSpace = totalCapacity * 3 / 10;
createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
(short) numOfDatanodes, 0); (short) numOfDatanodes, 0);
// start up an empty node with the same capacity and on the same rack // start up an empty node with the same capacity and on the same rack
cluster.startDataNodes(conf, 1, true, null, new String[] { newRack }, cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
new long[] { newCapacity }); new long[] { newCapacity });
// Case1: Simulate first balancer by creating 'balancer.id' file. It // Case1: Simulate first balancer by creating 'balancer.id' file. It
// will keep this file until the balancing operation is completed. // will keep this file until the balancing operation is completed.
FileSystem fs = cluster.getFileSystem(0); FileSystem fs = cluster.getFileSystem(0);
final FSDataOutputStream out = fs final FSDataOutputStream out = fs
.create(Balancer.BALANCER_ID_PATH, false); .create(Balancer.BALANCER_ID_PATH, false);
out.writeBytes(InetAddress.getLocalHost().getHostName()); out.writeBytes(InetAddress.getLocalHost().getHostName());
out.hflush(); out.hflush();
assertTrue("'balancer.id' file doesn't exist!", assertTrue("'balancer.id' file doesn't exist!",
fs.exists(Balancer.BALANCER_ID_PATH)); fs.exists(Balancer.BALANCER_ID_PATH));
// start second balancer // start second balancer
final String[] args = { "-policy", "datanode" }; final String[] args = { "-policy", "datanode" };
final Tool tool = new Cli(); final Tool tool = new Cli();
tool.setConf(conf); tool.setConf(conf);
int exitCode = tool.run(args); // start balancing int exitCode = tool.run(args); // start balancing
assertEquals("Exit status code mismatches", assertEquals("Exit status code mismatches",
ExitStatus.IO_EXCEPTION.getExitCode(), exitCode); ExitStatus.IO_EXCEPTION.getExitCode(), exitCode);
// Case2: Release lease so that another balancer would be able to // Case2: Release lease so that another balancer would be able to
// perform balancing. // perform balancing.
out.close(); out.close();
assertTrue("'balancer.id' file doesn't exist!", assertTrue("'balancer.id' file doesn't exist!",
fs.exists(Balancer.BALANCER_ID_PATH)); fs.exists(Balancer.BALANCER_ID_PATH));
exitCode = tool.run(args); // start balancing exitCode = tool.run(args); // start balancing
assertEquals("Exit status code mismatches", assertEquals("Exit status code mismatches",
ExitStatus.SUCCESS.getExitCode(), exitCode); ExitStatus.SUCCESS.getExitCode(), exitCode);
} finally {
cluster.shutdown();
}
} }
/** Balancer should not move blocks with size < minBlockSize. */ /** Balancer should not move blocks with size < minBlockSize. */
@ -1589,102 +1569,97 @@ public class TestBalancer {
.simulatedCapacities(capacities) .simulatedCapacities(capacities)
.build(); .build();
final DistributedFileSystem dfs = cluster.getFileSystem(); final DistributedFileSystem dfs = cluster.getFileSystem();
cluster.waitActive();
client = NameNodeProxies.createProxy(conf, dfs.getUri(),
ClientProtocol.class).getProxy();
try { // fill up the cluster to be 80% full
cluster.waitActive(); for(int i = 0; i < lengths.length; i++) {
client = NameNodeProxies.createProxy(conf, dfs.getUri(), final long size = lengths[i];
ClientProtocol.class).getProxy(); final Path p = new Path("/file" + i + "_size" + size);
try(final OutputStream out = dfs.create(p)) {
// fill up the cluster to be 80% full for(int j = 0; j < size; j++) {
for(int i = 0; i < lengths.length; i++) { out.write(j);
final long size = lengths[i];
final Path p = new Path("/file" + i + "_size" + size);
try(final OutputStream out = dfs.create(p)) {
for(int j = 0; j < size; j++) {
out.write(j);
}
} }
} }
}
// start up an empty node with the same capacity
cluster.startDataNodes(conf, capacities.length, true, null, null, capacities);
LOG.info("capacities = " + Arrays.toString(capacities));
LOG.info("totalUsedSpace= " + totalUsed);
LOG.info("lengths = " + Arrays.toString(lengths) + ", #=" + lengths.length);
waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, cluster);
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
{ // run Balancer with min-block-size=50 // start up an empty node with the same capacity
BalancerParameters.Builder b = cluster.startDataNodes(conf, capacities.length, true, null, null, capacities);
new BalancerParameters.Builder(); LOG.info("capacities = " + Arrays.toString(capacities));
b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE); LOG.info("totalUsedSpace= " + totalUsed);
b.setThreshold(1); LOG.info("lengths = " + Arrays.toString(lengths) + ", #=" + lengths.length);
final BalancerParameters p = b.build(); waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, cluster);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); { // run Balancer with min-block-size=50
BalancerParameters.Builder b =
new BalancerParameters.Builder();
b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
b.setThreshold(1);
final BalancerParameters p = b.build();
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
}
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
{ // run Balancer with empty nodes as source nodes
final Set<String> sourceNodes = new HashSet<>();
final List<DataNode> datanodes = cluster.getDataNodes();
for(int i = capacities.length; i < datanodes.size(); i++) {
sourceNodes.add(datanodes.get(i).getDisplayName());
} }
BalancerParameters.Builder b =
new BalancerParameters.Builder();
b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
b.setThreshold(1);
b.setSourceNodes(sourceNodes);
final BalancerParameters p = b.build();
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
}
{ // run Balancer with a filled node as a source node
final Set<String> sourceNodes = new HashSet<>();
final List<DataNode> datanodes = cluster.getDataNodes();
sourceNodes.add(datanodes.get(0).getDisplayName());
BalancerParameters.Builder b =
new BalancerParameters.Builder();
b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
b.setThreshold(1);
b.setSourceNodes(sourceNodes);
final BalancerParameters p = b.build();
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
}
{ // run Balancer with empty nodes as source nodes { // run Balancer with all filled node as source nodes
final Set<String> sourceNodes = new HashSet<>(); final Set<String> sourceNodes = new HashSet<>();
final List<DataNode> datanodes = cluster.getDataNodes(); final List<DataNode> datanodes = cluster.getDataNodes();
for(int i = capacities.length; i < datanodes.size(); i++) { for(int i = 0; i < capacities.length; i++) {
sourceNodes.add(datanodes.get(i).getDisplayName()); sourceNodes.add(datanodes.get(i).getDisplayName());
}
BalancerParameters.Builder b =
new BalancerParameters.Builder();
b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
b.setThreshold(1);
b.setSourceNodes(sourceNodes);
final BalancerParameters p = b.build();
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
} }
BalancerParameters.Builder b =
new BalancerParameters.Builder();
b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
b.setThreshold(1);
b.setSourceNodes(sourceNodes);
final BalancerParameters p = b.build();
{ // run Balancer with a filled node as a source node conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
final Set<String> sourceNodes = new HashSet<>(); final int r = Balancer.run(namenodes, p, conf);
final List<DataNode> datanodes = cluster.getDataNodes(); assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
sourceNodes.add(datanodes.get(0).getDisplayName());
BalancerParameters.Builder b =
new BalancerParameters.Builder();
b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
b.setThreshold(1);
b.setSourceNodes(sourceNodes);
final BalancerParameters p = b.build();
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
}
{ // run Balancer with all filled node as source nodes
final Set<String> sourceNodes = new HashSet<>();
final List<DataNode> datanodes = cluster.getDataNodes();
for(int i = 0; i < capacities.length; i++) {
sourceNodes.add(datanodes.get(i).getDisplayName());
}
BalancerParameters.Builder b =
new BalancerParameters.Builder();
b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
b.setThreshold(1);
b.setSourceNodes(sourceNodes);
final BalancerParameters p = b.build();
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
}
} finally {
cluster.shutdown();
} }
} }
/** /**
* @param args * @param args
*/ */