HDFS-14904. Add Option to let Balancer prefer highly utilized nodes in each iteration (#2483). Contributed by Leon Gao.
This commit is contained in:
parent
60201cbf69
commit
6ff2409b31
|
@ -29,10 +29,12 @@ import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ -199,7 +201,10 @@ public class Balancer {
|
||||||
+ "\tWhether to run the balancer during an ongoing HDFS upgrade."
|
+ "\tWhether to run the balancer during an ongoing HDFS upgrade."
|
||||||
+ "This is usually not desired since it will not affect used space "
|
+ "This is usually not desired since it will not affect used space "
|
||||||
+ "on over-utilized machines."
|
+ "on over-utilized machines."
|
||||||
+ "\n\t[-asService]\tRun as a long running service.";
|
+ "\n\t[-asService]\tRun as a long running service."
|
||||||
|
+ "\n\t[-sortTopNodes]"
|
||||||
|
+ "\tSort datanodes based on the utilization so "
|
||||||
|
+ "that highly utilized datanodes get scheduled first.";
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
private static volatile boolean serviceRunning = false;
|
private static volatile boolean serviceRunning = false;
|
||||||
|
@ -215,6 +220,7 @@ public class Balancer {
|
||||||
private final double threshold;
|
private final double threshold;
|
||||||
private final long maxSizeToMove;
|
private final long maxSizeToMove;
|
||||||
private final long defaultBlockSize;
|
private final long defaultBlockSize;
|
||||||
|
private final boolean sortTopNodes;
|
||||||
|
|
||||||
// all data node lists
|
// all data node lists
|
||||||
private final Collection<Source> overUtilized = new LinkedList<Source>();
|
private final Collection<Source> overUtilized = new LinkedList<Source>();
|
||||||
|
@ -328,6 +334,7 @@ public class Balancer {
|
||||||
this.policy = p.getBalancingPolicy();
|
this.policy = p.getBalancingPolicy();
|
||||||
this.sourceNodes = p.getSourceNodes();
|
this.sourceNodes = p.getSourceNodes();
|
||||||
this.runDuringUpgrade = p.getRunDuringUpgrade();
|
this.runDuringUpgrade = p.getRunDuringUpgrade();
|
||||||
|
this.sortTopNodes = p.getSortTopNodes();
|
||||||
|
|
||||||
this.maxSizeToMove = getLongBytes(conf,
|
this.maxSizeToMove = getLongBytes(conf,
|
||||||
DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY,
|
DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY,
|
||||||
|
@ -374,6 +381,8 @@ public class Balancer {
|
||||||
policy.accumulateSpaces(r);
|
policy.accumulateSpaces(r);
|
||||||
}
|
}
|
||||||
policy.initAvgUtilization();
|
policy.initAvgUtilization();
|
||||||
|
// Store the capacity % of over utilized nodes for sorting, if needed.
|
||||||
|
Map<Source, Double> overUtilizedPercentage = new HashMap<>();
|
||||||
|
|
||||||
// create network topology and classify utilization collections:
|
// create network topology and classify utilization collections:
|
||||||
// over-utilized, above-average, below-average and under-utilized.
|
// over-utilized, above-average, below-average and under-utilized.
|
||||||
|
@ -409,6 +418,7 @@ public class Balancer {
|
||||||
} else {
|
} else {
|
||||||
overLoadedBytes += percentage2bytes(thresholdDiff, capacity);
|
overLoadedBytes += percentage2bytes(thresholdDiff, capacity);
|
||||||
overUtilized.add(s);
|
overUtilized.add(s);
|
||||||
|
overUtilizedPercentage.put(s, utilization);
|
||||||
}
|
}
|
||||||
g = s;
|
g = s;
|
||||||
} else {
|
} else {
|
||||||
|
@ -424,6 +434,10 @@ public class Balancer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (sortTopNodes) {
|
||||||
|
sortOverUtilized(overUtilizedPercentage);
|
||||||
|
}
|
||||||
|
|
||||||
logUtilizationCollections();
|
logUtilizationCollections();
|
||||||
|
|
||||||
Preconditions.checkState(dispatcher.getStorageGroupMap().size()
|
Preconditions.checkState(dispatcher.getStorageGroupMap().size()
|
||||||
|
@ -435,6 +449,21 @@ public class Balancer {
|
||||||
return Math.max(overLoadedBytes, underLoadedBytes);
|
return Math.max(overLoadedBytes, underLoadedBytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void sortOverUtilized(Map<Source, Double> overUtilizedPercentage) {
|
||||||
|
Preconditions.checkState(overUtilized instanceof List,
|
||||||
|
"Collection overUtilized is not a List.");
|
||||||
|
|
||||||
|
LOG.info("Sorting over-utilized nodes by capacity" +
|
||||||
|
" to bring down top used datanode capacity faster");
|
||||||
|
|
||||||
|
List<Source> list = (List<Source>) overUtilized;
|
||||||
|
list.sort(
|
||||||
|
(Source source1, Source source2) ->
|
||||||
|
(Double.compare(overUtilizedPercentage.get(source2),
|
||||||
|
overUtilizedPercentage.get(source1)))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
private static long computeMaxSize2Move(final long capacity, final long remaining,
|
private static long computeMaxSize2Move(final long capacity, final long remaining,
|
||||||
final double utilizationDiff, final long max) {
|
final double utilizationDiff, final long max) {
|
||||||
final double diff = Math.abs(utilizationDiff);
|
final double diff = Math.abs(utilizationDiff);
|
||||||
|
@ -961,6 +990,10 @@ public class Balancer {
|
||||||
} else if ("-asService".equalsIgnoreCase(args[i])) {
|
} else if ("-asService".equalsIgnoreCase(args[i])) {
|
||||||
b.setRunAsService(true);
|
b.setRunAsService(true);
|
||||||
LOG.info("Balancer will run as a long running service");
|
LOG.info("Balancer will run as a long running service");
|
||||||
|
} else if ("-sortTopNodes".equalsIgnoreCase(args[i])) {
|
||||||
|
b.setSortTopNodes(true);
|
||||||
|
LOG.info("Balancer will sort nodes by" +
|
||||||
|
" capacity usage percentage to prioritize top used nodes");
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("args = "
|
throw new IllegalArgumentException("args = "
|
||||||
+ Arrays.toString(args));
|
+ Arrays.toString(args));
|
||||||
|
|
|
@ -47,6 +47,8 @@ final class BalancerParameters {
|
||||||
|
|
||||||
private final boolean runAsService;
|
private final boolean runAsService;
|
||||||
|
|
||||||
|
private final boolean sortTopNodes;
|
||||||
|
|
||||||
static final BalancerParameters DEFAULT = new BalancerParameters();
|
static final BalancerParameters DEFAULT = new BalancerParameters();
|
||||||
|
|
||||||
private BalancerParameters() {
|
private BalancerParameters() {
|
||||||
|
@ -63,6 +65,7 @@ final class BalancerParameters {
|
||||||
this.blockpools = builder.blockpools;
|
this.blockpools = builder.blockpools;
|
||||||
this.runDuringUpgrade = builder.runDuringUpgrade;
|
this.runDuringUpgrade = builder.runDuringUpgrade;
|
||||||
this.runAsService = builder.runAsService;
|
this.runAsService = builder.runAsService;
|
||||||
|
this.sortTopNodes = builder.sortTopNodes;
|
||||||
}
|
}
|
||||||
|
|
||||||
BalancingPolicy getBalancingPolicy() {
|
BalancingPolicy getBalancingPolicy() {
|
||||||
|
@ -101,16 +104,21 @@ final class BalancerParameters {
|
||||||
return this.runAsService;
|
return this.runAsService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean getSortTopNodes() {
|
||||||
|
return this.sortTopNodes;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return String.format("%s.%s [%s," + " threshold = %s,"
|
return String.format("%s.%s [%s," + " threshold = %s,"
|
||||||
+ " max idle iteration = %s," + " #excluded nodes = %s,"
|
+ " max idle iteration = %s," + " #excluded nodes = %s,"
|
||||||
+ " #included nodes = %s," + " #source nodes = %s,"
|
+ " #included nodes = %s," + " #source nodes = %s,"
|
||||||
+ " #blockpools = %s," + " run during upgrade = %s]",
|
+ " #blockpools = %s," + " run during upgrade = %s]"
|
||||||
|
+ " sort top nodes = %s",
|
||||||
Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
|
Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
|
||||||
threshold, maxIdleIteration, excludedNodes.size(),
|
threshold, maxIdleIteration, excludedNodes.size(),
|
||||||
includedNodes.size(), sourceNodes.size(), blockpools.size(),
|
includedNodes.size(), sourceNodes.size(), blockpools.size(),
|
||||||
runDuringUpgrade);
|
runDuringUpgrade, sortTopNodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
static class Builder {
|
static class Builder {
|
||||||
|
@ -125,6 +133,7 @@ final class BalancerParameters {
|
||||||
private Set<String> blockpools = Collections.<String> emptySet();
|
private Set<String> blockpools = Collections.<String> emptySet();
|
||||||
private boolean runDuringUpgrade = false;
|
private boolean runDuringUpgrade = false;
|
||||||
private boolean runAsService = false;
|
private boolean runAsService = false;
|
||||||
|
private boolean sortTopNodes = false;
|
||||||
|
|
||||||
Builder() {
|
Builder() {
|
||||||
}
|
}
|
||||||
|
@ -174,6 +183,11 @@ final class BalancerParameters {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Builder setSortTopNodes(boolean shouldSortTopNodes) {
|
||||||
|
this.sortTopNodes = shouldSortTopNodes;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
BalancerParameters build() {
|
BalancerParameters build() {
|
||||||
return new BalancerParameters(this);
|
return new BalancerParameters(this);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.balancer;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
|
||||||
import static org.apache.hadoop.fs.StorageType.DEFAULT;
|
import static org.apache.hadoop.fs.StorageType.DEFAULT;
|
||||||
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
|
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||||
|
@ -46,6 +47,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBER
|
||||||
import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
|
import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -2209,6 +2211,106 @@ public class TestBalancer {
|
||||||
getBlocksMaxQps, getBlockCallsPerSecond <= getBlocksMaxQps);
|
getBlocksMaxQps, getBlockCallsPerSecond <= getBlocksMaxQps);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testBalancerWithSortTopNodes() throws Exception {
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
initConf(conf);
|
||||||
|
conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 30000);
|
||||||
|
|
||||||
|
final long capacity = 1000L;
|
||||||
|
final int diffBetweenNodes = 50;
|
||||||
|
|
||||||
|
// Set up the datanodes with two groups:
|
||||||
|
// 5 over-utilized nodes with 80%, 85%, 90%, 95%, 100% usage
|
||||||
|
// 2 under-utilizaed nodes with 0%, 5% usage
|
||||||
|
// With sortTopNodes option, 100% and 95% used ones will be chosen.
|
||||||
|
final int numOfOverUtilizedDn = 5;
|
||||||
|
final int numOfUnderUtilizedDn = 2;
|
||||||
|
final int totalNumOfDn = numOfOverUtilizedDn + numOfUnderUtilizedDn;
|
||||||
|
final long[] capacityArray = new long[totalNumOfDn];
|
||||||
|
Arrays.fill(capacityArray, capacity);
|
||||||
|
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(totalNumOfDn)
|
||||||
|
.simulatedCapacities(capacityArray)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
cluster.setDataNodesDead();
|
||||||
|
|
||||||
|
List<DataNode> dataNodes = cluster.getDataNodes();
|
||||||
|
|
||||||
|
// Create top used nodes
|
||||||
|
for (int i = 0; i < numOfOverUtilizedDn; i++) {
|
||||||
|
// Bring one node alive
|
||||||
|
DataNodeTestUtils.triggerHeartbeat(dataNodes.get(i));
|
||||||
|
DataNodeTestUtils.triggerBlockReport(dataNodes.get(i));
|
||||||
|
// Create nodes with: 80%, 85%, 90%, 95%, 100%.
|
||||||
|
int capacityForThisDatanode = (int)capacity
|
||||||
|
- diffBetweenNodes * (numOfOverUtilizedDn - i - 1);
|
||||||
|
createFile(cluster, new Path("test_big" + i),
|
||||||
|
capacityForThisDatanode, (short) 1, 0);
|
||||||
|
cluster.setDataNodesDead();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create under utilized nodes
|
||||||
|
for (int i = numOfUnderUtilizedDn - 1; i >= 0; i--) {
|
||||||
|
int index = i + numOfOverUtilizedDn;
|
||||||
|
// Bring one node alive
|
||||||
|
DataNodeTestUtils.triggerHeartbeat(dataNodes.get(index));
|
||||||
|
DataNodeTestUtils.triggerBlockReport(dataNodes.get(index));
|
||||||
|
// Create nodes with: 5%, 0%
|
||||||
|
int capacityForThisDatanode = diffBetweenNodes * i;
|
||||||
|
createFile(cluster,
|
||||||
|
new Path("test_small" + i),
|
||||||
|
capacityForThisDatanode, (short) 1, 0);
|
||||||
|
cluster.setDataNodesDead();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bring all nodes alive
|
||||||
|
cluster.triggerHeartbeats();
|
||||||
|
cluster.triggerBlockReports();
|
||||||
|
cluster.waitFirstBRCompleted(0, 6000);
|
||||||
|
|
||||||
|
final BalancerParameters p = Balancer.Cli.parse(new String[] {
|
||||||
|
"-policy", BalancingPolicy.Node.INSTANCE.getName(),
|
||||||
|
"-threshold", "1",
|
||||||
|
"-sortTopNodes"
|
||||||
|
});
|
||||||
|
|
||||||
|
client = NameNodeProxies.createProxy(conf,
|
||||||
|
cluster.getFileSystem(0).getUri(),
|
||||||
|
ClientProtocol.class).getProxy();
|
||||||
|
|
||||||
|
// Set max-size-to-move to small number
|
||||||
|
// so only top two nodes will be chosen in one iteration.
|
||||||
|
conf.setLong(DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY, 99L);
|
||||||
|
|
||||||
|
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||||
|
|
||||||
|
List<NameNodeConnector> connectors = NameNodeConnector
|
||||||
|
.newNameNodeConnectors(namenodes,
|
||||||
|
Balancer.class.getSimpleName(), Balancer.BALANCER_ID_PATH, conf,
|
||||||
|
BalancerParameters.DEFAULT.getMaxIdleIteration());
|
||||||
|
final Balancer b = new Balancer(connectors.get(0), p, conf);
|
||||||
|
Result balancerResult = b.runOneIteration();
|
||||||
|
|
||||||
|
cluster.triggerDeletionReports();
|
||||||
|
cluster.triggerBlockReports();
|
||||||
|
cluster.triggerHeartbeats();
|
||||||
|
|
||||||
|
DatanodeInfo[] datanodeReport = client
|
||||||
|
.getDatanodeReport(DatanodeReportType.ALL);
|
||||||
|
|
||||||
|
long maxUsage = 0;
|
||||||
|
for (int i = 0; i < totalNumOfDn; i++) {
|
||||||
|
maxUsage = Math.max(maxUsage, datanodeReport[i].getDfsUsed());
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(200, balancerResult.bytesAlreadyMoved);
|
||||||
|
// 100% and 95% used nodes will be balanced, so top used will be 900
|
||||||
|
assertEquals(900, maxUsage);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param args
|
* @param args
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue