HDFS-6300. Prevent multiple balancers from running simultaneously (Contributed by Rakesh R)

This commit is contained in:
Vinayakumar B 2015-05-13 17:27:34 +05:30
parent 92c38e41e1
commit 065d8f2a34
3 changed files with 93 additions and 5 deletions

View File

@ -826,6 +826,9 @@ Release 2.7.1 - UNRELEASED
HDFS-7916. 'reportBadBlocks' from datanodes to standby Node BPServiceActor HDFS-7916. 'reportBadBlocks' from datanodes to standby Node BPServiceActor
goes for infinite loop (Rushabh S Shah via kihwal) goes for infinite loop (Rushabh S Shah via kihwal)
HDFS-6300. Prevent multiple balancers from running simultaneously
(Rakesh R via vinayakumarb)
Release 2.7.0 - 2015-04-20 Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -219,12 +219,20 @@ public class NameNodeConnector implements Closeable {
*/ */
private OutputStream checkAndMarkRunning() throws IOException { private OutputStream checkAndMarkRunning() throws IOException {
try { try {
final FSDataOutputStream out = fs.create(idPath); if (fs.exists(idPath)) {
if (write2IdFile) { // try appending to it so that it will fail fast if another balancer is
out.writeBytes(InetAddress.getLocalHost().getHostName()); // running.
out.hflush(); IOUtils.closeStream(fs.append(idPath));
fs.delete(idPath, true);
} }
return out; final FSDataOutputStream fsout = fs.create(idPath, false);
// mark balancer idPath to be deleted during filesystem closure
fs.deleteOnExit(idPath);
if (write2IdFile) {
fsout.writeBytes(InetAddress.getLocalHost().getHostName());
fsout.hflush();
}
return fsout;
} catch(RemoteException e) { } catch(RemoteException e) {
if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){ if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
return null; return null;

View File

@ -28,6 +28,7 @@ import static org.junit.Assume.assumeTrue;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.URI; import java.net.URI;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
@ -45,6 +46,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
@ -1370,6 +1372,81 @@ public class TestBalancer {
cluster.shutdown(); cluster.shutdown();
} }
} }
/**
* Test running many balancer simultaneously.
*
* Case-1: First balancer is running. Now, running second one should get
* "Another balancer is running. Exiting.." IOException and fail immediately
*
* Case-2: When running second balancer 'balancer.id' file exists but the
* lease doesn't exists. Now, the second balancer should run successfully.
*/
@Test(timeout = 100000)
public void testManyBalancerSimultaneously() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
// add an empty node with half of the capacities(4 * CAPACITY) & the same
// rack
long[] capacities = new long[] { 4 * CAPACITY };
String[] racks = new String[] { RACK0 };
long newCapacity = 2 * CAPACITY;
String newRack = RACK0;
LOG.info("capacities = " + long2String(capacities));
LOG.info("racks = " + Arrays.asList(racks));
LOG.info("newCapacity= " + newCapacity);
LOG.info("newRack = " + newRack);
LOG.info("useTool = " + false);
assertEquals(capacities.length, racks.length);
int numOfDatanodes = capacities.length;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
.racks(racks).simulatedCapacities(capacities).build();
try {
cluster.waitActive();
client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
long totalCapacity = sum(capacities);
// fill up the cluster to be 30% full
final long totalUsedSpace = totalCapacity * 3 / 10;
createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
(short) numOfDatanodes, 0);
// start up an empty node with the same capacity and on the same rack
cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
new long[] { newCapacity });
// Case1: Simulate first balancer by creating 'balancer.id' file. It
// will keep this file until the balancing operation is completed.
FileSystem fs = cluster.getFileSystem(0);
final FSDataOutputStream out = fs
.create(Balancer.BALANCER_ID_PATH, false);
out.writeBytes(InetAddress.getLocalHost().getHostName());
out.hflush();
assertTrue("'balancer.id' file doesn't exist!",
fs.exists(Balancer.BALANCER_ID_PATH));
// start second balancer
final String[] args = { "-policy", "datanode" };
final Tool tool = new Cli();
tool.setConf(conf);
int exitCode = tool.run(args); // start balancing
assertEquals("Exit status code mismatches",
ExitStatus.IO_EXCEPTION.getExitCode(), exitCode);
// Case2: Release lease so that another balancer would be able to
// perform balancing.
out.close();
assertTrue("'balancer.id' file doesn't exist!",
fs.exists(Balancer.BALANCER_ID_PATH));
exitCode = tool.run(args); // start balancing
assertEquals("Exit status code mismatches",
ExitStatus.SUCCESS.getExitCode(), exitCode);
} finally {
cluster.shutdown();
}
}
/** /**
* @param args * @param args
*/ */