diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 33b5fa46d6e..e5f9e8c8061 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -29,10 +29,12 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -199,7 +201,10 @@ public class Balancer { + "\tWhether to run the balancer during an ongoing HDFS upgrade." + "This is usually not desired since it will not affect used space " + "on over-utilized machines." - + "\n\t[-asService]\tRun as a long running service."; + + "\n\t[-asService]\tRun as a long running service." + + "\n\t[-sortTopNodes]" + + "\tSort datanodes based on the utilization so " + + "that highly utilized datanodes get scheduled first."; @VisibleForTesting private static volatile boolean serviceRunning = false; @@ -215,6 +220,7 @@ public class Balancer { private final double threshold; private final long maxSizeToMove; private final long defaultBlockSize; + private final boolean sortTopNodes; // all data node lists private final Collection overUtilized = new LinkedList(); @@ -328,6 +334,7 @@ public class Balancer { this.policy = p.getBalancingPolicy(); this.sourceNodes = p.getSourceNodes(); this.runDuringUpgrade = p.getRunDuringUpgrade(); + this.sortTopNodes = p.getSortTopNodes(); this.maxSizeToMove = getLongBytes(conf, DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY, @@ -374,6 +381,8 @@ public class Balancer { policy.accumulateSpaces(r); } policy.initAvgUtilization(); + // Store the capacity % of over utilized nodes for sorting, if needed. + Map overUtilizedPercentage = new HashMap<>(); // create network topology and classify utilization collections: // over-utilized, above-average, below-average and under-utilized. @@ -383,7 +392,7 @@ public class Balancer { final boolean isSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo()); for(StorageType t : StorageType.getMovableTypes()) { final Double utilization = policy.getUtilization(r, t); - if (utilization == null) { // datanode does not have such storage type + if (utilization == null) { // datanode does not have such storage type continue; } @@ -409,6 +418,7 @@ public class Balancer { } else { overLoadedBytes += percentage2bytes(thresholdDiff, capacity); overUtilized.add(s); + overUtilizedPercentage.put(s, utilization); } g = s; } else { @@ -424,6 +434,10 @@ public class Balancer { } } + if (sortTopNodes) { + sortOverUtilized(overUtilizedPercentage); + } + logUtilizationCollections(); Preconditions.checkState(dispatcher.getStorageGroupMap().size() @@ -435,6 +449,21 @@ public class Balancer { return Math.max(overLoadedBytes, underLoadedBytes); } + private void sortOverUtilized(Map overUtilizedPercentage) { + Preconditions.checkState(overUtilized instanceof List, + "Collection overUtilized is not a List."); + + LOG.info("Sorting over-utilized nodes by capacity" + + " to bring down top used datanode capacity faster"); + + List list = (List) overUtilized; + list.sort( + (Source source1, Source source2) -> + (Double.compare(overUtilizedPercentage.get(source2), + overUtilizedPercentage.get(source1))) + ); + } + private static long computeMaxSize2Move(final long capacity, final long remaining, final double utilizationDiff, final long max) { final double diff = Math.abs(utilizationDiff); @@ -961,6 +990,10 @@ public class Balancer { } else if ("-asService".equalsIgnoreCase(args[i])) { b.setRunAsService(true); LOG.info("Balancer will run as a long running service"); + } else if ("-sortTopNodes".equalsIgnoreCase(args[i])) { + b.setSortTopNodes(true); + LOG.info("Balancer will sort nodes by" + + " capacity usage percentage to prioritize top used nodes"); } else { throw new IllegalArgumentException("args = " + Arrays.toString(args)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java index cdca39fe292..e614327d7ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java @@ -47,6 +47,8 @@ final class BalancerParameters { private final boolean runAsService; + private final boolean sortTopNodes; + static final BalancerParameters DEFAULT = new BalancerParameters(); private BalancerParameters() { @@ -63,6 +65,7 @@ final class BalancerParameters { this.blockpools = builder.blockpools; this.runDuringUpgrade = builder.runDuringUpgrade; this.runAsService = builder.runAsService; + this.sortTopNodes = builder.sortTopNodes; } BalancingPolicy getBalancingPolicy() { @@ -101,16 +104,21 @@ final class BalancerParameters { return this.runAsService; } + boolean getSortTopNodes() { + return this.sortTopNodes; + } + @Override public String toString() { return String.format("%s.%s [%s," + " threshold = %s," + " max idle iteration = %s," + " #excluded nodes = %s," + " #included nodes = %s," + " #source nodes = %s," - + " #blockpools = %s," + " run during upgrade = %s]", + + " #blockpools = %s," + " run during upgrade = %s]" + + " sort top nodes = %s", Balancer.class.getSimpleName(), getClass().getSimpleName(), policy, threshold, maxIdleIteration, excludedNodes.size(), includedNodes.size(), sourceNodes.size(), blockpools.size(), - runDuringUpgrade); + runDuringUpgrade, sortTopNodes); } static class Builder { @@ -125,6 +133,7 @@ final class BalancerParameters { private Set blockpools = Collections. emptySet(); private boolean runDuringUpgrade = false; private boolean runAsService = false; + private boolean sortTopNodes = false; Builder() { } @@ -174,6 +183,11 @@ final class BalancerParameters { return this; } + Builder setSortTopNodes(boolean shouldSortTopNodes) { + this.sortTopNodes = shouldSortTopNodes; + return this; + } + BalancerParameters build() { return new BalancerParameters(this); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index a0f95f70a4d..82d710d790f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.balancer; import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY; import static org.apache.hadoop.fs.StorageType.DEFAULT; import static org.apache.hadoop.fs.StorageType.RAM_DISK; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; @@ -46,6 +47,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBER import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.junit.AfterClass; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -2209,6 +2211,106 @@ public class TestBalancer { getBlocksMaxQps, getBlockCallsPerSecond <= getBlocksMaxQps); } + @Test(timeout = 60000) + public void testBalancerWithSortTopNodes() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 30000); + + final long capacity = 1000L; + final int diffBetweenNodes = 50; + + // Set up the datanodes with two groups: + // 5 over-utilized nodes with 80%, 85%, 90%, 95%, 100% usage + // 2 under-utilizaed nodes with 0%, 5% usage + // With sortTopNodes option, 100% and 95% used ones will be chosen. + final int numOfOverUtilizedDn = 5; + final int numOfUnderUtilizedDn = 2; + final int totalNumOfDn = numOfOverUtilizedDn + numOfUnderUtilizedDn; + final long[] capacityArray = new long[totalNumOfDn]; + Arrays.fill(capacityArray, capacity); + + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(totalNumOfDn) + .simulatedCapacities(capacityArray) + .build(); + + cluster.setDataNodesDead(); + + List dataNodes = cluster.getDataNodes(); + + // Create top used nodes + for (int i = 0; i < numOfOverUtilizedDn; i++) { + // Bring one node alive + DataNodeTestUtils.triggerHeartbeat(dataNodes.get(i)); + DataNodeTestUtils.triggerBlockReport(dataNodes.get(i)); + // Create nodes with: 80%, 85%, 90%, 95%, 100%. + int capacityForThisDatanode = (int)capacity + - diffBetweenNodes * (numOfOverUtilizedDn - i - 1); + createFile(cluster, new Path("test_big" + i), + capacityForThisDatanode, (short) 1, 0); + cluster.setDataNodesDead(); + } + + // Create under utilized nodes + for (int i = numOfUnderUtilizedDn - 1; i >= 0; i--) { + int index = i + numOfOverUtilizedDn; + // Bring one node alive + DataNodeTestUtils.triggerHeartbeat(dataNodes.get(index)); + DataNodeTestUtils.triggerBlockReport(dataNodes.get(index)); + // Create nodes with: 5%, 0% + int capacityForThisDatanode = diffBetweenNodes * i; + createFile(cluster, + new Path("test_small" + i), + capacityForThisDatanode, (short) 1, 0); + cluster.setDataNodesDead(); + } + + // Bring all nodes alive + cluster.triggerHeartbeats(); + cluster.triggerBlockReports(); + cluster.waitFirstBRCompleted(0, 6000); + + final BalancerParameters p = Balancer.Cli.parse(new String[] { + "-policy", BalancingPolicy.Node.INSTANCE.getName(), + "-threshold", "1", + "-sortTopNodes" + }); + + client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), + ClientProtocol.class).getProxy(); + + // Set max-size-to-move to small number + // so only top two nodes will be chosen in one iteration. + conf.setLong(DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY, 99L); + + final Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); + + List connectors = NameNodeConnector + .newNameNodeConnectors(namenodes, + Balancer.class.getSimpleName(), Balancer.BALANCER_ID_PATH, conf, + BalancerParameters.DEFAULT.getMaxIdleIteration()); + final Balancer b = new Balancer(connectors.get(0), p, conf); + Result balancerResult = b.runOneIteration(); + + cluster.triggerDeletionReports(); + cluster.triggerBlockReports(); + cluster.triggerHeartbeats(); + + DatanodeInfo[] datanodeReport = client + .getDatanodeReport(DatanodeReportType.ALL); + + long maxUsage = 0; + for (int i = 0; i < totalNumOfDn; i++) { + maxUsage = Math.max(maxUsage, datanodeReport[i].getDfsUsed()); + } + + assertEquals(200, balancerResult.bytesAlreadyMoved); + // 100% and 95% used nodes will be balanced, so top used will be 900 + assertEquals(900, maxUsage); + } + /** * @param args */