diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 25386991c78..2b5853788ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -102,6 +102,9 @@ Release 2.7.4 - UNRELEASED HDFS-11648. Lazy construct the IIP pathname. (daryn via kihwal, backported by zhz) + HDFS-8549. Abort the balancer if an upgrade is in progress. (wang) + Backport HDFS-11808 by Akira Ajisaka. + OPTIMIZATIONS HDFS-10896. Move lock logging logic from FSNamesystem into FSNamesystemLock. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 4fb6f0e74bd..d5befee31d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -183,10 +183,17 @@ public class Balancer { + "\n\t[-include [-f | ]]" + "\tIncludes only the specified datanodes." + "\n\t[-idleiterations ]" - + "\tNumber of consecutive idle iterations (-1 for Infinite) before exit."; - + + "\tNumber of consecutive idle iterations (-1 for Infinite) before " + + "exit." + + "\n\t[-runDuringUpgrade]" + + "\tWhether to run the balancer during an ongoing HDFS upgrade." + + "This is usually not desired since it will not affect used space " + + "on over-utilized machines."; + private final Dispatcher dispatcher; + private final NameNodeConnector nnc; private final BalancingPolicy policy; + private final boolean runDuringUpgrade; private final double threshold; private final long maxSizeToMove; @@ -262,6 +269,7 @@ public class Balancer { DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT); + this.nnc = theblockpool; this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded, p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize, @@ -272,6 +280,7 @@ public class Balancer { this.maxSizeToMove = getLong(conf, DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY, DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_DEFAULT); + this.runDuringUpgrade = p.runDuringUpgrade; } private static long getCapacity(DatanodeStorageReport report, StorageType t) { @@ -333,7 +342,7 @@ public class Balancer { if (thresholdDiff <= 0) { // within threshold aboveAvgUtilized.add(s); } else { - overLoadedBytes += precentage2bytes(thresholdDiff, capacity); + overLoadedBytes += percentage2bytes(thresholdDiff, capacity); overUtilized.add(s); } g = s; @@ -342,7 +351,7 @@ public class Balancer { if (thresholdDiff <= 0) { // within threshold belowAvgUtilized.add(g); } else { - underLoadedBytes += precentage2bytes(thresholdDiff, capacity); + underLoadedBytes += percentage2bytes(thresholdDiff, capacity); underUtilized.add(g); } } @@ -364,17 +373,17 @@ public class Balancer { private static long computeMaxSize2Move(final long capacity, final long remaining, final double utilizationDiff, final double threshold, final long max) { final double diff = Math.min(threshold, Math.abs(utilizationDiff)); - long maxSizeToMove = precentage2bytes(diff, capacity); + long maxSizeToMove = percentage2bytes(diff, capacity); if (utilizationDiff < 0) { maxSizeToMove = Math.min(remaining, maxSizeToMove); } return Math.min(max, maxSizeToMove); } - private static long precentage2bytes(double precentage, long capacity) { - Preconditions.checkArgument(precentage >= 0, - "precentage = " + precentage + " < 0"); - return (long)(precentage * capacity / 100.0); + private static long percentage2bytes(double percentage, long capacity) { + Preconditions.checkArgument(percentage >= 0, "percentage = %s < 0", + percentage); + return (long)(percentage * capacity / 100.0); } /* log the over utilized & under utilized nodes */ @@ -565,7 +574,13 @@ public class Balancer { LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove) + " to make the cluster balanced." ); } - + + // Should not run the balancer during an unfinalized upgrade, since moved + // blocks are not deleted on the source datanode. + if (!runDuringUpgrade && nnc.isUpgrading()) { + return newResult(ExitStatus.UNFINALIZED_UPGRADE, bytesLeftToMove, -1); + } + /* Decide all the nodes that will participate in the block move and * the number of bytes that need to be moved from one node to another * in this iteration. Maximum bytes to be moved per node is @@ -700,7 +715,8 @@ public class Balancer { static final Parameters DEFAULT = new Parameters( BalancingPolicy.Node.INSTANCE, 10.0, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, - Collections. emptySet(), Collections. emptySet()); + Collections. emptySet(), Collections. emptySet(), + false); final BalancingPolicy policy; final double threshold; @@ -709,23 +725,34 @@ public class Balancer { Set nodesToBeExcluded; //include only these nodes in balancing operations Set nodesToBeIncluded; + /** + * Whether to run the balancer during upgrade. + */ + final boolean runDuringUpgrade; Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration, - Set nodesToBeExcluded, Set nodesToBeIncluded) { + Set nodesToBeExcluded, Set nodesToBeIncluded, + boolean runDuringUpgrade) { this.policy = policy; this.threshold = threshold; this.maxIdleIteration = maxIdleIteration; this.nodesToBeExcluded = nodesToBeExcluded; this.nodesToBeIncluded = nodesToBeIncluded; + this.runDuringUpgrade = runDuringUpgrade; } @Override public String toString() { - return Balancer.class.getSimpleName() + "." + getClass().getSimpleName() - + "[" + policy + ", threshold=" + threshold + - ", max idle iteration = " + maxIdleIteration + - ", number of nodes to be excluded = "+ nodesToBeExcluded.size() + - ", number of nodes to be included = "+ nodesToBeIncluded.size() +"]"; + return String.format("%s.%s [%s," + + " threshold = %s," + + " max idle iteration = %s, " + + "number of nodes to be excluded = %s," + + " number of nodes to be included = %s," + + " run during upgrade = %s]", + Balancer.class.getSimpleName(), getClass().getSimpleName(), + policy, threshold, maxIdleIteration, + nodesToBeExcluded.size(), nodesToBeIncluded.size(), + runDuringUpgrade); } } @@ -767,6 +794,7 @@ public class Balancer { int maxIdleIteration = Parameters.DEFAULT.maxIdleIteration; Set nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded; Set nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded; + boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade; if (args != null) { try { @@ -822,9 +850,16 @@ public class Balancer { } } else if ("-idleiterations".equalsIgnoreCase(args[i])) { checkArgument(++i < args.length, - "idleiterations value is missing: args = " + Arrays.toString(args)); + "idleiterations value is missing: args = " + Arrays + .toString(args)); maxIdleIteration = Integer.parseInt(args[i]); LOG.info("Using a idleiterations of " + maxIdleIteration); + } else if ("-runDuringUpgrade".equalsIgnoreCase(args[i])) { + runDuringUpgrade = true; + LOG.info("Will run the balancer even during an ongoing HDFS " + + "upgrade. Most users will not want to run the balancer " + + "during an upgrade since it will not affect used space " + + "on over-utilized machines."); } else { throw new IllegalArgumentException("args = " + Arrays.toString(args)); @@ -838,7 +873,8 @@ public class Balancer { } } - return new Parameters(policy, threshold, maxIdleIteration, nodesTobeExcluded, nodesTobeIncluded); + return new Parameters(policy, threshold, maxIdleIteration, + nodesTobeExcluded, nodesTobeIncluded, runDuringUpgrade); } private static void printUsage(PrintStream out) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java index e36258ffca7..6bf298640f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java @@ -29,7 +29,8 @@ public enum ExitStatus { NO_MOVE_PROGRESS(-3), IO_EXCEPTION(-4), ILLEGAL_ARGUMENTS(-5), - INTERRUPTED(-6); + INTERRUPTED(-6), + UNFINALIZED_UPGRADE(-7); private final int code; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 30c6fab5b79..00418415aff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -44,7 +44,9 @@ import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; @@ -164,6 +166,20 @@ public class NameNodeConnector implements Closeable { return namenode.getBlocks(datanode, size); } + /** + * @return true if an upgrade is in progress, false if not. + * @throws IOException + */ + public boolean isUpgrading() throws IOException { + // fsimage upgrade + final boolean isUpgrade = !namenode.isUpgradeFinalized(); + // rolling upgrade + RollingUpgradeInfo info = fs.rollingUpgrade( + HdfsConstants.RollingUpgradeAction.QUERY); + final boolean isRollingUpgrade = (info != null && !info.isFinalized()); + return (isUpgrade || isRollingUpgrade); + } + /** @return live datanode storage reports. */ public DatanodeStorageReport[] getLiveDatanodeStorageReport() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 398ad5a18f4..a884ecbce77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -740,7 +740,8 @@ public class TestBalancer { Balancer.Parameters.DEFAULT.policy, Balancer.Parameters.DEFAULT.threshold, Balancer.Parameters.DEFAULT.maxIdleIteration, - nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded()); + nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(), + false); } int expectedExcludedNodes = 0; @@ -986,7 +987,8 @@ public class TestBalancer { Balancer.Parameters.DEFAULT.policy, Balancer.Parameters.DEFAULT.threshold, Balancer.Parameters.DEFAULT.maxIdleIteration, - datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded); + datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded, + false); final int r = Balancer.run(namenodes, p, conf); assertEquals(ExitStatus.SUCCESS.getExitCode(), r); } finally { @@ -1417,12 +1419,7 @@ public class TestBalancer { Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); // Run Balancer - Balancer.Parameters p = new Balancer.Parameters( - Parameters.DEFAULT.policy, - Parameters.DEFAULT.threshold, - Balancer.Parameters.DEFAULT.maxIdleIteration, - Parameters.DEFAULT.nodesToBeExcluded, - Parameters.DEFAULT.nodesToBeIncluded); + final Balancer.Parameters p = Parameters.DEFAULT; final int r = Balancer.run(namenodes, p, conf); // Validate no RAM_DISK block should be moved @@ -1437,7 +1434,76 @@ public class TestBalancer { } /** + * Check that the balancer exits when there is an unfinalized upgrade. + */ + @Test(timeout=300000) + public void testBalancerDuringUpgrade() throws Exception { + final int SEED = 0xFADED; + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L); + final int BLOCK_SIZE = 1024*1024; + cluster = new MiniDFSCluster + .Builder(conf) + .numDataNodes(1) + .storageCapacities(new long[] { BLOCK_SIZE * 10 }) + .storageTypes(new StorageType[] { DEFAULT }) + .storagesPerDatanode(1) + .build(); + + try { + cluster.waitActive(); + // Create a file on the single DN + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); + + DistributedFileSystem fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 2, BLOCK_SIZE, + (short) 1, SEED); + + // Add another DN with the same capacity, cluster is now unbalanced + cluster.startDataNodes(conf, 1, true, null, null); + cluster.triggerHeartbeats(); + Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); + + // Run balancer + final Balancer.Parameters p = Parameters.DEFAULT; + + fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); + fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE); + fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); + + // Rolling upgrade should abort the balancer + assertEquals(ExitStatus.UNFINALIZED_UPGRADE.getExitCode(), + Balancer.run(namenodes, p, conf)); + + // Should work with the -runDuringUpgrade flag. + final Balancer.Parameters runDuringUpgrade = + new Balancer.Parameters(Parameters.DEFAULT.policy, + Parameters.DEFAULT.threshold, + Parameters.DEFAULT.maxIdleIteration, + Parameters.DEFAULT.nodesToBeExcluded, + Parameters.DEFAULT.nodesToBeIncluded, + true); + assertEquals(ExitStatus.SUCCESS.getExitCode(), + Balancer.run(namenodes, runDuringUpgrade, conf)); + + // Finalize the rolling upgrade + fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.FINALIZE); + + // Should also work after finalization. + assertEquals(ExitStatus.SUCCESS.getExitCode(), + Balancer.run(namenodes, p, conf)); + + } finally { + cluster.shutdown(); + } + } + + /** * Test special case. Two replicas belong to same block should not in same node. * We have 2 nodes. * We have a block in (DN0,SSD) and (DN1,DISK).