HDFS-13783. Add an option to the Balancer to make it run as a long-running service. Contributed by Chen Zhang.

This commit is contained in:
Erik Krogen 2019-07-30 15:42:55 -07:00
parent 7849bdcf70
commit 1f26cc8705
5 changed files with 302 additions and 5 deletions

View File

@ -31,6 +31,8 @@
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.http.HttpConfig;
import java.util.concurrent.TimeUnit;
/**
* This class contains constants for configuration keys and default values
* used in hdfs.
@ -623,7 +625,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute
public static final String DFS_BALANCER_MAX_ITERATION_TIME_KEY = "dfs.balancer.max-iteration-time";
public static final long DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT = 20 * 60 * 1000L; // 20 mins
public static final String DFS_BALANCER_SERVICE_INTERVAL_KEY = "dfs.balancer.service.interval";
public static final long DFS_BALANCER_SERVICE_INTERVAL_DEFAULT = TimeUnit.MINUTES.toMillis(5); //5 mins
public static final String DFS_BALANCER_SERVICE_RETRIES_ON_EXCEPTION = "dfs.balancer.service.retries.on.exception";
public static final int DFS_BALANCER_SERVICE_RETRIES_ON_EXCEPTION_DEFAULT = 5;
public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
@ -1639,5 +1644,4 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
@Deprecated
public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT;
}

View File

@ -36,6 +36,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
@ -196,7 +197,14 @@ public class Balancer {
+ "\n\t[-runDuringUpgrade]"
+ "\tWhether to run the balancer during an ongoing HDFS upgrade."
+ "This is usually not desired since it will not affect used space "
+ "on over-utilized machines.";
+ "on over-utilized machines."
+ "\n\t[-asService]\tRun as a long running service.";
@VisibleForTesting
private static volatile boolean serviceRunning = false;
private static volatile int exceptionsSinceLastBalance = 0;
private static volatile int failedTimesSinceLastSuccessfulBalance = 0;
private final Dispatcher dispatcher;
private final NameNodeConnector nnc;
@ -256,6 +264,14 @@ static int getInt(Configuration conf, String key, int defaultValue) {
return v;
}
static int getExceptionsSinceLastBalance() {
return exceptionsSinceLastBalance;
}
static int getFailedTimesSinceLastSuccessfulBalance() {
return failedTimesSinceLastSuccessfulBalance;
}
/**
* Construct a balancer.
* Initialize balancer. It sets the value of the threshold, and
@ -672,8 +688,9 @@ Result runOneIteration() {
* for each namenode,
* execute a {@link Balancer} to work through all datanodes once.
*/
static int run(Collection<URI> namenodes, final BalancerParameters p,
Configuration conf) throws IOException, InterruptedException {
static private int doBalance(Collection<URI> namenodes,
final BalancerParameters p, Configuration conf)
throws IOException, InterruptedException {
final long sleeptime =
conf.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT,
@ -731,6 +748,60 @@ static int run(Collection<URI> namenodes, final BalancerParameters p,
return ExitStatus.SUCCESS.getExitCode();
}
static int run(Collection<URI> namenodes, final BalancerParameters p,
Configuration conf) throws IOException, InterruptedException {
if (!p.getRunAsService()) {
return doBalance(namenodes, p, conf);
}
if (!serviceRunning) {
serviceRunning = true;
} else {
LOG.warn("Balancer already running as a long-service!");
return ExitStatus.ALREADY_RUNNING.getExitCode();
}
long scheduleInterval = conf.getTimeDuration(
DFSConfigKeys.DFS_BALANCER_SERVICE_INTERVAL_KEY,
DFSConfigKeys.DFS_BALANCER_SERVICE_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
int retryOnException =
conf.getInt(DFSConfigKeys.DFS_BALANCER_SERVICE_RETRIES_ON_EXCEPTION,
DFSConfigKeys.DFS_BALANCER_SERVICE_RETRIES_ON_EXCEPTION_DEFAULT);
while (serviceRunning) {
try {
int retCode = doBalance(namenodes, p, conf);
if (retCode < 0) {
LOG.info("Balance failed, error code: " + retCode);
failedTimesSinceLastSuccessfulBalance++;
} else {
LOG.info("Balance succeed!");
failedTimesSinceLastSuccessfulBalance = 0;
}
exceptionsSinceLastBalance = 0;
} catch (Exception e) {
if (++exceptionsSinceLastBalance > retryOnException) {
// The caller will process and log the exception
throw e;
}
LOG.warn(
"Encounter exception while do balance work. Already tried {} times",
exceptionsSinceLastBalance, e);
}
// sleep for next round, will retry for next round when it's interrupted
LOG.info("Finished one round, will wait for {} for next round",
time2Str(scheduleInterval));
Thread.sleep(scheduleInterval);
}
// normal stop
return 0;
}
static void stop() {
serviceRunning = false;
}
private static void checkKeytabAndInit(Configuration conf)
throws IOException {
if (conf.getBoolean(DFSConfigKeys.DFS_BALANCER_KEYTAB_ENABLED_KEY,
@ -867,6 +938,9 @@ static BalancerParameters parse(String[] args) {
+ "upgrade. Most users will not want to run the balancer "
+ "during an upgrade since it will not affect used space "
+ "on over-utilized machines.");
} else if ("-asService".equalsIgnoreCase(args[i])) {
b.setRunAsService(true);
LOG.info("Balancer will run as a long running service");
} else {
throw new IllegalArgumentException("args = "
+ Arrays.toString(args));

View File

@ -45,6 +45,8 @@ final class BalancerParameters {
*/
private final boolean runDuringUpgrade;
private final boolean runAsService;
static final BalancerParameters DEFAULT = new BalancerParameters();
private BalancerParameters() {
@ -60,6 +62,7 @@ private BalancerParameters(Builder builder) {
this.sourceNodes = builder.sourceNodes;
this.blockpools = builder.blockpools;
this.runDuringUpgrade = builder.runDuringUpgrade;
this.runAsService = builder.runAsService;
}
BalancingPolicy getBalancingPolicy() {
@ -94,6 +97,10 @@ boolean getRunDuringUpgrade() {
return this.runDuringUpgrade;
}
boolean getRunAsService() {
return this.runAsService;
}
@Override
public String toString() {
return String.format("%s.%s [%s," + " threshold = %s,"
@ -117,6 +124,7 @@ static class Builder {
private Set<String> sourceNodes = Collections.<String> emptySet();
private Set<String> blockpools = Collections.<String> emptySet();
private boolean runDuringUpgrade = false;
private boolean runAsService = false;
Builder() {
}
@ -161,6 +169,11 @@ Builder setRunDuringUpgrade(boolean run) {
return this;
}
Builder setRunAsService(boolean asService) {
this.runAsService = asService;
return this;
}
BalancerParameters build() {
return new BalancerParameters(this);
}

View File

@ -3654,6 +3654,23 @@
</description>
</property>
<property>
<name>dfs.balancer.service.interval</name>
<value>5m</value>
<description>
The schedule interval of balancer when running as a long service.
</description>
</property>
<property>
<name>dfs.balancer.service.retries.on.exception</name>
<value>5</value>
<description>
When the balancer is executed as a long-running service, it will retry upon encountering an exception. This
configuration determines how many times it will retry before considering the exception to be fatal and quitting.
</description>
</property>
<property>
<name>dfs.block.misreplication.processing.limit</name>
<value>10000</value>

View File

@ -0,0 +1,189 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Tool;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Test balancer run as a service.
*/
public class TestBalancerService {
private MiniDFSCluster cluster;
private ClientProtocol client;
private long totalUsedSpace;
// array of racks for original nodes in cluster
private static final String[] TEST_RACKS =
{TestBalancer.RACK0, TestBalancer.RACK1};
// array of capacities for original nodes in cluster
private static final long[] TEST_CAPACITIES =
{TestBalancer.CAPACITY, TestBalancer.CAPACITY};
private static final double USED = 0.3;
static {
TestBalancer.initTestSetup();
}
private void setupCluster(Configuration conf) throws Exception {
MiniDFSNNTopology.NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
Configuration copiedConf = new Configuration(conf);
cluster = new MiniDFSCluster.Builder(copiedConf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(TEST_CAPACITIES.length).racks(TEST_RACKS)
.simulatedCapacities(TEST_CAPACITIES).build();
HATestUtil.setFailoverConfigurations(cluster, conf);
cluster.waitActive();
cluster.transitionToActive(0);
client = NameNodeProxies
.createProxy(conf, FileSystem.getDefaultUri(conf), ClientProtocol.class)
.getProxy();
int numOfDatanodes = TEST_CAPACITIES.length;
long totalCapacity = TestBalancer.sum(TEST_CAPACITIES);
// fill up the cluster to be 30% full
totalUsedSpace = (long) (totalCapacity * USED);
TestBalancer.createFile(cluster, TestBalancer.filePath,
totalUsedSpace / numOfDatanodes, (short) numOfDatanodes, 0);
}
private long addOneDataNode(Configuration conf) throws Exception {
// start up an empty node with the same capacity and on the same rack
cluster.startDataNodes(conf, 1, true, null,
new String[] {TestBalancer.RACK2},
new long[] {TestBalancer.CAPACITY});
long totalCapacity = cluster.getDataNodes().size() * TestBalancer.CAPACITY;
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
cluster);
return totalCapacity;
}
private Thread newBalancerService(Configuration conf, String[] args) {
return new Thread(new Runnable() {
@Override
public void run() {
Tool cli = new Balancer.Cli();
cli.setConf(conf);
try {
cli.run(args);
} catch (Exception e) {
fail("balancer failed for " + e);
}
}
});
}
/**
* The normal test case. Start with an imbalanced cluster, then balancer
* should balance succeed but not exit, then make the cluster imbalanced and
* wait for balancer to balance it again
*/
@Test(timeout = 60000)
public void testBalancerServiceBalanceTwice() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setTimeDuration(DFSConfigKeys.DFS_BALANCER_SERVICE_INTERVAL_KEY, 5,
TimeUnit.SECONDS);
TestBalancer.initConf(conf);
try {
setupCluster(conf);
long totalCapacity = addOneDataNode(conf); // make cluster imbalanced
Thread balancerThread =
newBalancerService(conf, new String[] {"-asService"});
balancerThread.start();
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
cluster, BalancerParameters.DEFAULT);
cluster.triggerHeartbeats();
cluster.triggerBlockReports();
// add another empty datanode, wait for cluster become balance again
totalCapacity = addOneDataNode(conf);
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
cluster, BalancerParameters.DEFAULT);
Balancer.stop();
balancerThread.join();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test(timeout = 60000)
public void testBalancerServiceOnError() throws Exception {
Configuration conf = new HdfsConfiguration();
// retry for every 5 seconds
conf.setTimeDuration(DFSConfigKeys.DFS_BALANCER_SERVICE_INTERVAL_KEY, 5,
TimeUnit.SECONDS);
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
TestBalancer.initConf(conf);
try {
setupCluster(conf);
Thread balancerThread =
newBalancerService(conf, new String[] {"-asService"});
balancerThread.start();
// cluster is out of service for 10+ secs, the balancer service will retry
// for 2+ times
cluster.shutdownNameNode(0);
GenericTestUtils.waitFor(
() -> Balancer.getExceptionsSinceLastBalance() > 0, 1000, 10000);
assertTrue(Balancer.getExceptionsSinceLastBalance() > 0);
cluster.restartNameNode(0);
cluster.transitionToActive(0);
cluster.waitActive();
long totalCapacity = addOneDataNode(conf);
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
cluster, BalancerParameters.DEFAULT);
Balancer.stop();
balancerThread.join();
// reset to 0 once the balancer finished without exception
assertEquals(Balancer.getExceptionsSinceLastBalance(), 0);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}