diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java index ae77e6c3333..cac43c90e5b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java @@ -80,13 +80,14 @@ public static void readFileToSetWithFileInputStream(String type, String[] nodes = line.split("[ \t\n\f\r]+"); if (nodes != null) { for (int i = 0; i < nodes.length; i++) { - if (nodes[i].trim().startsWith("#")) { + nodes[i] = nodes[i].trim(); + if (nodes[i].startsWith("#")) { // Everything from now on is a comment break; } if (!nodes[i].isEmpty()) { - LOG.info("Adding " + nodes[i] + " to the list of " + type + - " hosts from " + filename); + LOG.info("Adding a node \"" + nodes[i] + "\" to the list of " + + type + " hosts from " + filename); set.add(nodes[i]); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java index 73f9c4f7ccc..153270f8f11 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java @@ -18,7 +18,6 @@ package org.apache.hadoop.util; -import com.google.common.base.Preconditions; import java.io.PrintWriter; import java.io.StringWriter; import java.net.URI; @@ -28,7 +27,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Date; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; @@ -45,6 +43,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.net.NetUtils; +import com.google.common.base.Preconditions; import com.google.common.net.InetAddresses; /** @@ -379,19 +378,6 @@ public static String[] getTrimmedStrings(String str){ return str.trim().split("\\s*,\\s*"); } - /** - * Trims all the strings in a Collection and returns a Set. - * @param strings - * @return - */ - public static Set getTrimmedStrings(Collection strings) { - Set trimmedStrings = new HashSet(); - for (String string: strings) { - trimmedStrings.add(string.trim()); - } - return trimmedStrings; - } - final public static String[] emptyStringArray = {}; final public static char COMMA = ','; final public static String COMMA_STR = ","; diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ce4305c60a8..889025b415c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -463,6 +463,9 @@ Release 2.8.0 - UNRELEASED HDFS-8435. Support CreateFlag in WebHDFS. (Jakob Homan via cdouglas) + HDFS-8826. In Balancer, add an option to specify the source node list + so that balancer only selects blocks to move from those nodes. (szetszwo) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 16945df1a80..9d3ddd4ede4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -27,6 +27,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -54,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; @@ -189,6 +191,7 @@ public class Balancer { private final Dispatcher dispatcher; private final NameNodeConnector nnc; private final BalancingPolicy policy; + private final Set sourceNodes; private final boolean runDuringUpgrade; private final double threshold; private final long maxSizeToMove; @@ -261,11 +264,12 @@ static int getInt(Configuration conf, String key, int defaultValue) { DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT); this.nnc = theblockpool; - this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded, - p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads, + this.dispatcher = new Dispatcher(theblockpool, p.includedNodes, + p.excludedNodes, movedWinWidth, moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize, conf); this.threshold = p.threshold; this.policy = p.policy; + this.sourceNodes = p.sourceNodes; this.runDuringUpgrade = p.runDuringUpgrade; this.maxSizeToMove = getLong(conf, @@ -319,14 +323,23 @@ private long init(List reports) { long overLoadedBytes = 0L, underLoadedBytes = 0L; for(DatanodeStorageReport r : reports) { final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo()); + final boolean isSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo()); for(StorageType t : StorageType.getMovableTypes()) { final Double utilization = policy.getUtilization(r, t); if (utilization == null) { // datanode does not have such storage type continue; } + final double average = policy.getAvgUtilization(t); + if (utilization >= average && !isSource) { + LOG.info(dn + "[" + t + "] has utilization=" + utilization + + " >= average=" + average + + " but it is not specified as a source; skipping it."); + continue; + } + + final double utilizationDiff = utilization - average; final long capacity = getCapacity(r, t); - final double utilizationDiff = utilization - policy.getAvgUtilization(t); final double thresholdDiff = Math.abs(utilizationDiff) - threshold; final long maxSize2Move = computeMaxSize2Move(capacity, getRemaining(r, t), utilizationDiff, maxSizeToMove); @@ -624,6 +637,9 @@ static int run(Collection namenodes, final Parameters p, DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000; LOG.info("namenodes = " + namenodes); LOG.info("parameters = " + p); + LOG.info("included nodes = " + p.includedNodes); + LOG.info("excluded nodes = " + p.excludedNodes); + LOG.info("source nodes = " + p.sourceNodes); System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); @@ -687,29 +703,35 @@ static class Parameters { static final Parameters DEFAULT = new Parameters( BalancingPolicy.Node.INSTANCE, 10.0, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, - Collections. emptySet(), Collections. emptySet(), + Collections.emptySet(), Collections.emptySet(), + Collections.emptySet(), false); final BalancingPolicy policy; final double threshold; final int maxIdleIteration; - // exclude the nodes in this set from balancing operations - Set nodesToBeExcluded; - //include only these nodes in balancing operations - Set nodesToBeIncluded; + /** Exclude the nodes in this set. */ + final Set excludedNodes; + /** If empty, include any node; otherwise, include only these nodes. */ + final Set includedNodes; + /** If empty, any node can be a source; + * otherwise, use only these nodes as source nodes. + */ + final Set sourceNodes; /** * Whether to run the balancer during upgrade. */ final boolean runDuringUpgrade; Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration, - Set nodesToBeExcluded, Set nodesToBeIncluded, - boolean runDuringUpgrade) { + Set excludedNodes, Set includedNodes, + Set sourceNodes, boolean runDuringUpgrade) { this.policy = policy; this.threshold = threshold; this.maxIdleIteration = maxIdleIteration; - this.nodesToBeExcluded = nodesToBeExcluded; - this.nodesToBeIncluded = nodesToBeIncluded; + this.excludedNodes = excludedNodes; + this.includedNodes = includedNodes; + this.sourceNodes = sourceNodes; this.runDuringUpgrade = runDuringUpgrade; } @@ -717,13 +739,14 @@ Collections. emptySet(), Collections. emptySet(), public String toString() { return String.format("%s.%s [%s," + " threshold = %s," - + " max idle iteration = %s, " - + "number of nodes to be excluded = %s," - + " number of nodes to be included = %s," + + " max idle iteration = %s," + + " #excluded nodes = %s," + + " #included nodes = %s," + + " #source nodes = %s," + " run during upgrade = %s]", Balancer.class.getSimpleName(), getClass().getSimpleName(), policy, threshold, maxIdleIteration, - nodesToBeExcluded.size(), nodesToBeIncluded.size(), + excludedNodes.size(), includedNodes.size(), sourceNodes.size(), runDuringUpgrade); } } @@ -764,8 +787,9 @@ static Parameters parse(String[] args) { BalancingPolicy policy = Parameters.DEFAULT.policy; double threshold = Parameters.DEFAULT.threshold; int maxIdleIteration = Parameters.DEFAULT.maxIdleIteration; - Set nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded; - Set nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded; + Set excludedNodes = Parameters.DEFAULT.excludedNodes; + Set includedNodes = Parameters.DEFAULT.includedNodes; + Set sourceNodes = Parameters.DEFAULT.sourceNodes; boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade; if (args != null) { @@ -797,29 +821,14 @@ static Parameters parse(String[] args) { throw e; } } else if ("-exclude".equalsIgnoreCase(args[i])) { - checkArgument(++i < args.length, - "List of nodes to exclude | -f is missing: args = " - + Arrays.toString(args)); - if ("-f".equalsIgnoreCase(args[i])) { - checkArgument(++i < args.length, - "File containing nodes to exclude is not specified: args = " - + Arrays.toString(args)); - nodesTobeExcluded = Util.getHostListFromFile(args[i], "exclude"); - } else { - nodesTobeExcluded = Util.parseHostList(args[i]); - } + excludedNodes = new HashSet<>(); + i = processHostList(args, i, "exclude", excludedNodes); } else if ("-include".equalsIgnoreCase(args[i])) { - checkArgument(++i < args.length, - "List of nodes to include | -f is missing: args = " - + Arrays.toString(args)); - if ("-f".equalsIgnoreCase(args[i])) { - checkArgument(++i < args.length, - "File containing nodes to include is not specified: args = " - + Arrays.toString(args)); - nodesTobeIncluded = Util.getHostListFromFile(args[i], "include"); - } else { - nodesTobeIncluded = Util.parseHostList(args[i]); - } + includedNodes = new HashSet<>(); + i = processHostList(args, i, "include", includedNodes); + } else if ("-source".equalsIgnoreCase(args[i])) { + sourceNodes = new HashSet<>(); + i = processHostList(args, i, "source", sourceNodes); } else if ("-idleiterations".equalsIgnoreCase(args[i])) { checkArgument(++i < args.length, "idleiterations value is missing: args = " + Arrays @@ -837,7 +846,7 @@ static Parameters parse(String[] args) { + Arrays.toString(args)); } } - checkArgument(nodesTobeExcluded.isEmpty() || nodesTobeIncluded.isEmpty(), + checkArgument(excludedNodes.isEmpty() || includedNodes.isEmpty(), "-exclude and -include options cannot be specified together."); } catch(RuntimeException e) { printUsage(System.err); @@ -846,7 +855,31 @@ static Parameters parse(String[] args) { } return new Parameters(policy, threshold, maxIdleIteration, - nodesTobeExcluded, nodesTobeIncluded, runDuringUpgrade); + excludedNodes, includedNodes, sourceNodes, runDuringUpgrade); + } + + private static int processHostList(String[] args, int i, String type, + Set nodes) { + Preconditions.checkArgument(++i < args.length, + "List of %s nodes | -f is missing: args=%s", + type, Arrays.toString(args)); + if ("-f".equalsIgnoreCase(args[i])) { + Preconditions.checkArgument(++i < args.length, + "File containing %s nodes is not specified: args=%s", + type, Arrays.toString(args)); + + final String filename = args[i]; + try { + HostsFileReader.readFileToSet(type, filename, nodes); + } catch (IOException e) { + throw new IllegalArgumentException( + "Failed to read " + type + " node list from file: " + filename); + } + } else { + final String[] addresses = StringUtils.getTrimmedStrings(args[i]); + nodes.addAll(Arrays.asList(addresses)); + } + return i; } private static void printUsage(PrintStream out) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index b06219c9a01..bd497b8c70f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -28,7 +28,6 @@ import java.io.OutputStream; import java.net.Socket; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.EnumMap; import java.util.HashMap; @@ -72,7 +71,6 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; @@ -798,7 +796,11 @@ private void dispatchBlocks() { if (shouldFetchMoreBlocks()) { // fetch new blocks try { - blocksToReceive -= getBlockList(); + final long received = getBlockList(); + if (received == 0) { + return; + } + blocksToReceive -= received; continue; } catch (IOException e) { LOG.warn("Exception while getting block list", e); @@ -927,8 +929,11 @@ private boolean shouldIgnore(DatanodeInfo dn) { if (decommissioned || decommissioning || excluded || notIncluded) { if (LOG.isTraceEnabled()) { - LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", " - + decommissioning + ", " + excluded + ", " + notIncluded); + LOG.trace("Excluding datanode " + dn + + ": decommissioned=" + decommissioned + + ", decommissioning=" + decommissioning + + ", excluded=" + excluded + + ", notIncluded=" + notIncluded); } return true; } @@ -1215,31 +1220,5 @@ private static boolean isIn(Set nodes, String host, int port) { } return (nodes.contains(host) || nodes.contains(host + ":" + port)); } - - /** - * Parse a comma separated string to obtain set of host names - * - * @return set of host names - */ - static Set parseHostList(String string) { - String[] addrs = StringUtils.getTrimmedStrings(string); - return new HashSet(Arrays.asList(addrs)); - } - - /** - * Read set of host names from a file - * - * @return set of host names - */ - static Set getHostListFromFile(String fileName, String type) { - Set nodes = new HashSet(); - try { - HostsFileReader.readFileToSet(type, fileName, nodes); - return StringUtils.getTrimmedStrings(nodes); - } catch (IOException e) { - throw new IllegalArgumentException( - "Failed to read host list from file: " + fileName); - } - } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 5e1b45b337d..5a8c9f84f9a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -33,6 +33,7 @@ import java.io.File; import java.io.IOException; +import java.io.OutputStream; import java.io.PrintWriter; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -397,11 +398,11 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity, long timeout = TIMEOUT; long failtime = (timeout <= 0L) ? Long.MAX_VALUE : Time.monotonicNow() + timeout; - if (!p.nodesToBeIncluded.isEmpty()) { - totalCapacity = p.nodesToBeIncluded.size() * CAPACITY; + if (!p.includedNodes.isEmpty()) { + totalCapacity = p.includedNodes.size() * CAPACITY; } - if (!p.nodesToBeExcluded.isEmpty()) { - totalCapacity -= p.nodesToBeExcluded.size() * CAPACITY; + if (!p.excludedNodes.isEmpty()) { + totalCapacity -= p.excludedNodes.size() * CAPACITY; } final double avgUtilization = ((double)totalUsedSpace) / totalCapacity; boolean balanced; @@ -414,12 +415,12 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity, for (DatanodeInfo datanode : datanodeReport) { double nodeUtilization = ((double)datanode.getDfsUsed()) / datanode.getCapacity(); - if (Dispatcher.Util.isExcluded(p.nodesToBeExcluded, datanode)) { + if (Dispatcher.Util.isExcluded(p.excludedNodes, datanode)) { assertTrue(nodeUtilization == 0); actualExcludedNodeCount++; continue; } - if (!Dispatcher.Util.isIncluded(p.nodesToBeIncluded, datanode)) { + if (!Dispatcher.Util.isIncluded(p.includedNodes, datanode)) { assertTrue(nodeUtilization == 0); actualExcludedNodeCount++; continue; @@ -642,6 +643,7 @@ private void doTest(Configuration conf, long[] capacities, Balancer.Parameters.DEFAULT.threshold, Balancer.Parameters.DEFAULT.maxIdleIteration, nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(), + Balancer.Parameters.DEFAULT.sourceNodes, false); } @@ -754,36 +756,36 @@ private void runBalancerCli(Configuration conf, args.add("datanode"); File excludeHostsFile = null; - if (!p.nodesToBeExcluded.isEmpty()) { + if (!p.excludedNodes.isEmpty()) { args.add("-exclude"); if (useFile) { excludeHostsFile = new File ("exclude-hosts-file"); PrintWriter pw = new PrintWriter(excludeHostsFile); - for (String host: p.nodesToBeExcluded) { + for (String host: p.excludedNodes) { pw.write( host + "\n"); } pw.close(); args.add("-f"); args.add("exclude-hosts-file"); } else { - args.add(StringUtils.join(p.nodesToBeExcluded, ',')); + args.add(StringUtils.join(p.excludedNodes, ',')); } } File includeHostsFile = null; - if (!p.nodesToBeIncluded.isEmpty()) { + if (!p.includedNodes.isEmpty()) { args.add("-include"); if (useFile) { includeHostsFile = new File ("include-hosts-file"); PrintWriter pw = new PrintWriter(includeHostsFile); - for (String host: p.nodesToBeIncluded){ + for (String host: p.includedNodes){ pw.write( host + "\n"); } pw.close(); args.add("-f"); args.add("include-hosts-file"); } else { - args.add(StringUtils.join(p.nodesToBeIncluded, ',')); + args.add(StringUtils.join(p.includedNodes, ',')); } } @@ -881,7 +883,8 @@ public void testUnknownDatanode() throws Exception { Balancer.Parameters.DEFAULT.policy, Balancer.Parameters.DEFAULT.threshold, Balancer.Parameters.DEFAULT.maxIdleIteration, - datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded, + datanodes, Balancer.Parameters.DEFAULT.includedNodes, + Balancer.Parameters.DEFAULT.sourceNodes, false); final int r = Balancer.run(namenodes, p, conf); assertEquals(ExitStatus.SUCCESS.getExitCode(), r); @@ -1094,7 +1097,7 @@ public void testBalancerWithExcludeList() throws Exception { excludeHosts.add( "datanodeZ"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, - excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), false, false); + excludeHosts, Parameters.DEFAULT.includedNodes), false, false); } /** @@ -1124,7 +1127,7 @@ public void testBalancerCliWithExcludeList() throws Exception { excludeHosts.add( "datanodeZ"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts, - Parameters.DEFAULT.nodesToBeIncluded), true, false); + Parameters.DEFAULT.includedNodes), true, false); } /** @@ -1154,7 +1157,7 @@ public void testBalancerCliWithExcludeListInAFile() throws Exception { excludeHosts.add( "datanodeZ"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, - excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), true, true); + excludeHosts, Parameters.DEFAULT.includedNodes), true, true); } /** @@ -1183,7 +1186,7 @@ public void testBalancerWithIncludeList() throws Exception { includeHosts.add( "datanodeY"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, - Parameters.DEFAULT.nodesToBeExcluded, includeHosts), false, false); + Parameters.DEFAULT.excludedNodes, includeHosts), false, false); } /** @@ -1212,7 +1215,7 @@ public void testBalancerCliWithIncludeList() throws Exception { includeHosts.add( "datanodeY"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, - Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, false); + Parameters.DEFAULT.excludedNodes, includeHosts), true, false); } /** @@ -1241,7 +1244,7 @@ public void testBalancerCliWithIncludeListInAFile() throws Exception { includeHosts.add( "datanodeY"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, - Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, true); + Parameters.DEFAULT.excludedNodes, includeHosts), true, true); } /** @@ -1381,8 +1384,9 @@ public void testBalancerDuringUpgrade() throws Exception { new Balancer.Parameters(Parameters.DEFAULT.policy, Parameters.DEFAULT.threshold, Parameters.DEFAULT.maxIdleIteration, - Parameters.DEFAULT.nodesToBeExcluded, - Parameters.DEFAULT.nodesToBeIncluded, + Parameters.DEFAULT.excludedNodes, + Parameters.DEFAULT.includedNodes, + Parameters.DEFAULT.sourceNodes, true); assertEquals(ExitStatus.SUCCESS.getExitCode(), Balancer.run(namenodes, runDuringUpgrade, conf)); @@ -1538,6 +1542,116 @@ public void testManyBalancerSimultaneously() throws Exception { } } + /** Balancer should not move blocks with size < minBlockSize. */ + @Test(timeout=60000) + public void testMinBlockSizeAndSourceNodes() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + + final short replication = 3; + final long[] lengths = {10, 10, 10, 10}; + final long[] capacities = new long[replication]; + final long totalUsed = capacities.length * sum(lengths); + Arrays.fill(capacities, 1000); + + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(capacities.length) + .simulatedCapacities(capacities) + .build(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + + try { + cluster.waitActive(); + client = NameNodeProxies.createProxy(conf, dfs.getUri(), + ClientProtocol.class).getProxy(); + + // fill up the cluster to be 80% full + for(int i = 0; i < lengths.length; i++) { + final long size = lengths[i]; + final Path p = new Path("/file" + i + "_size" + size); + try(final OutputStream out = dfs.create(p)) { + for(int j = 0; j < size; j++) { + out.write(j); + } + } + } + + // start up an empty node with the same capacity + cluster.startDataNodes(conf, capacities.length, true, null, null, capacities); + LOG.info("capacities = " + Arrays.toString(capacities)); + LOG.info("totalUsedSpace= " + totalUsed); + LOG.info("lengths = " + Arrays.toString(lengths) + ", #=" + lengths.length); + waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, cluster); + + final Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + + { // run Balancer with min-block-size=50 + final Parameters p = new Parameters( + BalancingPolicy.Node.INSTANCE, 1, + NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, + Collections. emptySet(), Collections. emptySet(), + Collections. emptySet(), false); + + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); + final int r = Balancer.run(namenodes, p, conf); + assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); + } + + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); + + { // run Balancer with empty nodes as source nodes + final Set sourceNodes = new HashSet<>(); + final List datanodes = cluster.getDataNodes(); + for(int i = capacities.length; i < datanodes.size(); i++) { + sourceNodes.add(datanodes.get(i).getDisplayName()); + } + final Parameters p = new Parameters( + BalancingPolicy.Node.INSTANCE, 1, + NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, + Collections. emptySet(), Collections. emptySet(), + sourceNodes, false); + + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); + final int r = Balancer.run(namenodes, p, conf); + assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r); + } + + { // run Balancer with a filled node as a source node + final Set sourceNodes = new HashSet<>(); + final List datanodes = cluster.getDataNodes(); + sourceNodes.add(datanodes.get(0).getDisplayName()); + final Parameters p = new Parameters( + BalancingPolicy.Node.INSTANCE, 1, + NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, + Collections. emptySet(), Collections. emptySet(), + sourceNodes, false); + + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); + final int r = Balancer.run(namenodes, p, conf); + assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r); + } + + { // run Balancer with all filled node as source nodes + final Set sourceNodes = new HashSet<>(); + final List datanodes = cluster.getDataNodes(); + for(int i = 0; i < capacities.length; i++) { + sourceNodes.add(datanodes.get(i).getDisplayName()); + } + final Parameters p = new Parameters( + BalancingPolicy.Node.INSTANCE, 1, + NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, + Collections. emptySet(), Collections. emptySet(), + sourceNodes, false); + + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); + final int r = Balancer.run(namenodes, p, conf); + assertEquals(ExitStatus.SUCCESS.getExitCode(), r); + } + } finally { + cluster.shutdown(); + } + } + /** * @param args */