From 065d8f2a34296b566e7ca541a284f7991212f14c Mon Sep 17 00:00:00 2001 From: Vinayakumar B Date: Wed, 13 May 2015 17:27:34 +0530 Subject: [PATCH] HDFS-6300. Prevent multiple balancers from running simultaneously (Contributed by Rakesh R) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/balancer/NameNodeConnector.java | 18 +++-- .../hdfs/server/balancer/TestBalancer.java | 77 +++++++++++++++++++ 3 files changed, 93 insertions(+), 5 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 135b50c6426..4fabf97187f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -826,6 +826,9 @@ Release 2.7.1 - UNRELEASED HDFS-7916. 'reportBadBlocks' from datanodes to standby Node BPServiceActor goes for infinite loop (Rushabh S Shah via kihwal) + HDFS-6300. Prevent multiple balancers from running simultaneously + (Rakesh R via vinayakumarb) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index cf5f36f9e4d..2e4f21475fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -219,12 +219,20 @@ public class NameNodeConnector implements Closeable { */ private OutputStream checkAndMarkRunning() throws IOException { try { - final FSDataOutputStream out = fs.create(idPath); - if (write2IdFile) { - out.writeBytes(InetAddress.getLocalHost().getHostName()); - out.hflush(); + if (fs.exists(idPath)) { + // try appending to it so that it will fail fast if another balancer is + // running. + IOUtils.closeStream(fs.append(idPath)); + fs.delete(idPath, true); } - return out; + final FSDataOutputStream fsout = fs.create(idPath, false); + // mark balancer idPath to be deleted during filesystem closure + fs.deleteOnExit(idPath); + if (write2IdFile) { + fsout.writeBytes(InetAddress.getLocalHost().getHostName()); + fsout.hflush(); + } + return fsout; } catch(RemoteException e) { if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){ return null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index edffb822547..e756f0b3a7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -28,6 +28,7 @@ import static org.junit.Assume.assumeTrue; import java.io.File; import java.io.IOException; import java.io.PrintWriter; +import java.net.InetAddress; import java.net.URI; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -45,6 +46,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; @@ -1370,6 +1372,81 @@ public class TestBalancer { cluster.shutdown(); } } + + /** + * Test running many balancer simultaneously. + * + * Case-1: First balancer is running. Now, running second one should get + * "Another balancer is running. Exiting.." IOException and fail immediately + * + * Case-2: When running second balancer 'balancer.id' file exists but the + * lease doesn't exists. Now, the second balancer should run successfully. + */ + @Test(timeout = 100000) + public void testManyBalancerSimultaneously() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + // add an empty node with half of the capacities(4 * CAPACITY) & the same + // rack + long[] capacities = new long[] { 4 * CAPACITY }; + String[] racks = new String[] { RACK0 }; + long newCapacity = 2 * CAPACITY; + String newRack = RACK0; + LOG.info("capacities = " + long2String(capacities)); + LOG.info("racks = " + Arrays.asList(racks)); + LOG.info("newCapacity= " + newCapacity); + LOG.info("newRack = " + newRack); + LOG.info("useTool = " + false); + assertEquals(capacities.length, racks.length); + int numOfDatanodes = capacities.length; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length) + .racks(racks).simulatedCapacities(capacities).build(); + try { + cluster.waitActive(); + client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); + + long totalCapacity = sum(capacities); + + // fill up the cluster to be 30% full + final long totalUsedSpace = totalCapacity * 3 / 10; + createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, + (short) numOfDatanodes, 0); + // start up an empty node with the same capacity and on the same rack + cluster.startDataNodes(conf, 1, true, null, new String[] { newRack }, + new long[] { newCapacity }); + + // Case1: Simulate first balancer by creating 'balancer.id' file. It + // will keep this file until the balancing operation is completed. + FileSystem fs = cluster.getFileSystem(0); + final FSDataOutputStream out = fs + .create(Balancer.BALANCER_ID_PATH, false); + out.writeBytes(InetAddress.getLocalHost().getHostName()); + out.hflush(); + assertTrue("'balancer.id' file doesn't exist!", + fs.exists(Balancer.BALANCER_ID_PATH)); + + // start second balancer + final String[] args = { "-policy", "datanode" }; + final Tool tool = new Cli(); + tool.setConf(conf); + int exitCode = tool.run(args); // start balancing + assertEquals("Exit status code mismatches", + ExitStatus.IO_EXCEPTION.getExitCode(), exitCode); + + // Case2: Release lease so that another balancer would be able to + // perform balancing. + out.close(); + assertTrue("'balancer.id' file doesn't exist!", + fs.exists(Balancer.BALANCER_ID_PATH)); + exitCode = tool.run(args); // start balancing + assertEquals("Exit status code mismatches", + ExitStatus.SUCCESS.getExitCode(), exitCode); + } finally { + cluster.shutdown(); + } + } + /** * @param args */