From 1f26cc8705b5af12eefedda019e7ab5c261d9bfb Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Tue, 30 Jul 2019 15:42:55 -0700 Subject: [PATCH] HDFS-13783. Add an option to the Balancer to make it run as a long-running service. Contributed by Chen Zhang. --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 +- .../hadoop/hdfs/server/balancer/Balancer.java | 80 +++++++- .../server/balancer/BalancerParameters.java | 13 ++ .../src/main/resources/hdfs-default.xml | 17 ++ .../server/balancer/TestBalancerService.java | 189 ++++++++++++++++++ 5 files changed, 302 insertions(+), 5 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerService.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index fb83baf8f67..b839e517d36 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hdfs.web.URLConnectionFactory; import org.apache.hadoop.http.HttpConfig; +import java.util.concurrent.TimeUnit; + /** * This class contains constants for configuration keys and default values * used in hdfs. @@ -623,7 +625,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute public static final String DFS_BALANCER_MAX_ITERATION_TIME_KEY = "dfs.balancer.max-iteration-time"; public static final long DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT = 20 * 60 * 1000L; // 20 mins - + public static final String DFS_BALANCER_SERVICE_INTERVAL_KEY = "dfs.balancer.service.interval"; + public static final long DFS_BALANCER_SERVICE_INTERVAL_DEFAULT = TimeUnit.MINUTES.toMillis(5); //5 mins + public static final String DFS_BALANCER_SERVICE_RETRIES_ON_EXCEPTION = "dfs.balancer.service.retries.on.exception"; + public static final int DFS_BALANCER_SERVICE_RETRIES_ON_EXCEPTION_DEFAULT = 5; public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth"; public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L; @@ -1639,5 +1644,4 @@ public class DFSConfigKeys extends CommonConfigurationKeys { @Deprecated public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT = HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT; - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index fe187cbc686..684b2d9b51d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -196,7 +197,14 @@ public class Balancer { + "\n\t[-runDuringUpgrade]" + "\tWhether to run the balancer during an ongoing HDFS upgrade." + "This is usually not desired since it will not affect used space " - + "on over-utilized machines."; + + "on over-utilized machines." + + "\n\t[-asService]\tRun as a long running service."; + + @VisibleForTesting + private static volatile boolean serviceRunning = false; + + private static volatile int exceptionsSinceLastBalance = 0; + private static volatile int failedTimesSinceLastSuccessfulBalance = 0; private final Dispatcher dispatcher; private final NameNodeConnector nnc; @@ -256,6 +264,14 @@ static int getInt(Configuration conf, String key, int defaultValue) { return v; } + static int getExceptionsSinceLastBalance() { + return exceptionsSinceLastBalance; + } + + static int getFailedTimesSinceLastSuccessfulBalance() { + return failedTimesSinceLastSuccessfulBalance; + } + /** * Construct a balancer. * Initialize balancer. It sets the value of the threshold, and @@ -672,8 +688,9 @@ Result runOneIteration() { * for each namenode, * execute a {@link Balancer} to work through all datanodes once. */ - static int run(Collection namenodes, final BalancerParameters p, - Configuration conf) throws IOException, InterruptedException { + static private int doBalance(Collection namenodes, + final BalancerParameters p, Configuration conf) + throws IOException, InterruptedException { final long sleeptime = conf.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, @@ -731,6 +748,60 @@ static int run(Collection namenodes, final BalancerParameters p, return ExitStatus.SUCCESS.getExitCode(); } + static int run(Collection namenodes, final BalancerParameters p, + Configuration conf) throws IOException, InterruptedException { + if (!p.getRunAsService()) { + return doBalance(namenodes, p, conf); + } + if (!serviceRunning) { + serviceRunning = true; + } else { + LOG.warn("Balancer already running as a long-service!"); + return ExitStatus.ALREADY_RUNNING.getExitCode(); + } + + long scheduleInterval = conf.getTimeDuration( + DFSConfigKeys.DFS_BALANCER_SERVICE_INTERVAL_KEY, + DFSConfigKeys.DFS_BALANCER_SERVICE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + int retryOnException = + conf.getInt(DFSConfigKeys.DFS_BALANCER_SERVICE_RETRIES_ON_EXCEPTION, + DFSConfigKeys.DFS_BALANCER_SERVICE_RETRIES_ON_EXCEPTION_DEFAULT); + + while (serviceRunning) { + try { + int retCode = doBalance(namenodes, p, conf); + if (retCode < 0) { + LOG.info("Balance failed, error code: " + retCode); + failedTimesSinceLastSuccessfulBalance++; + } else { + LOG.info("Balance succeed!"); + failedTimesSinceLastSuccessfulBalance = 0; + } + exceptionsSinceLastBalance = 0; + } catch (Exception e) { + if (++exceptionsSinceLastBalance > retryOnException) { + // The caller will process and log the exception + throw e; + } + LOG.warn( + "Encounter exception while do balance work. Already tried {} times", + exceptionsSinceLastBalance, e); + } + + // sleep for next round, will retry for next round when it's interrupted + LOG.info("Finished one round, will wait for {} for next round", + time2Str(scheduleInterval)); + Thread.sleep(scheduleInterval); + } + // normal stop + return 0; + } + + static void stop() { + serviceRunning = false; + } + private static void checkKeytabAndInit(Configuration conf) throws IOException { if (conf.getBoolean(DFSConfigKeys.DFS_BALANCER_KEYTAB_ENABLED_KEY, @@ -867,6 +938,9 @@ static BalancerParameters parse(String[] args) { + "upgrade. Most users will not want to run the balancer " + "during an upgrade since it will not affect used space " + "on over-utilized machines."); + } else if ("-asService".equalsIgnoreCase(args[i])) { + b.setRunAsService(true); + LOG.info("Balancer will run as a long running service"); } else { throw new IllegalArgumentException("args = " + Arrays.toString(args)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java index 5d5e9b18d7b..cdca39fe292 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java @@ -45,6 +45,8 @@ final class BalancerParameters { */ private final boolean runDuringUpgrade; + private final boolean runAsService; + static final BalancerParameters DEFAULT = new BalancerParameters(); private BalancerParameters() { @@ -60,6 +62,7 @@ private BalancerParameters(Builder builder) { this.sourceNodes = builder.sourceNodes; this.blockpools = builder.blockpools; this.runDuringUpgrade = builder.runDuringUpgrade; + this.runAsService = builder.runAsService; } BalancingPolicy getBalancingPolicy() { @@ -94,6 +97,10 @@ boolean getRunDuringUpgrade() { return this.runDuringUpgrade; } + boolean getRunAsService() { + return this.runAsService; + } + @Override public String toString() { return String.format("%s.%s [%s," + " threshold = %s," @@ -117,6 +124,7 @@ static class Builder { private Set sourceNodes = Collections. emptySet(); private Set blockpools = Collections. emptySet(); private boolean runDuringUpgrade = false; + private boolean runAsService = false; Builder() { } @@ -161,6 +169,11 @@ Builder setRunDuringUpgrade(boolean run) { return this; } + Builder setRunAsService(boolean asService) { + this.runAsService = asService; + return this; + } + BalancerParameters build() { return new BalancerParameters(this); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 890d03475a0..72715037fba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3654,6 +3654,23 @@ + + dfs.balancer.service.interval + 5m + + The schedule interval of balancer when running as a long service. + + + + + dfs.balancer.service.retries.on.exception + 5 + + When the balancer is executed as a long-running service, it will retry upon encountering an exception. This + configuration determines how many times it will retry before considering the exception to be fatal and quitting. + + + dfs.block.misreplication.processing.limit 10000 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerService.java new file mode 100644 index 00000000000..43cc75175e0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerService.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.balancer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Tool; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Test balancer run as a service. + */ +public class TestBalancerService { + private MiniDFSCluster cluster; + private ClientProtocol client; + private long totalUsedSpace; + + // array of racks for original nodes in cluster + private static final String[] TEST_RACKS = + {TestBalancer.RACK0, TestBalancer.RACK1}; + // array of capacities for original nodes in cluster + private static final long[] TEST_CAPACITIES = + {TestBalancer.CAPACITY, TestBalancer.CAPACITY}; + private static final double USED = 0.3; + + static { + TestBalancer.initTestSetup(); + } + + private void setupCluster(Configuration conf) throws Exception { + MiniDFSNNTopology.NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1"); + nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT); + Configuration copiedConf = new Configuration(conf); + cluster = new MiniDFSCluster.Builder(copiedConf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(TEST_CAPACITIES.length).racks(TEST_RACKS) + .simulatedCapacities(TEST_CAPACITIES).build(); + HATestUtil.setFailoverConfigurations(cluster, conf); + cluster.waitActive(); + cluster.transitionToActive(0); + client = NameNodeProxies + .createProxy(conf, FileSystem.getDefaultUri(conf), ClientProtocol.class) + .getProxy(); + + int numOfDatanodes = TEST_CAPACITIES.length; + long totalCapacity = TestBalancer.sum(TEST_CAPACITIES); + // fill up the cluster to be 30% full + totalUsedSpace = (long) (totalCapacity * USED); + TestBalancer.createFile(cluster, TestBalancer.filePath, + totalUsedSpace / numOfDatanodes, (short) numOfDatanodes, 0); + } + + private long addOneDataNode(Configuration conf) throws Exception { + // start up an empty node with the same capacity and on the same rack + cluster.startDataNodes(conf, 1, true, null, + new String[] {TestBalancer.RACK2}, + new long[] {TestBalancer.CAPACITY}); + long totalCapacity = cluster.getDataNodes().size() * TestBalancer.CAPACITY; + TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client, + cluster); + return totalCapacity; + } + + private Thread newBalancerService(Configuration conf, String[] args) { + return new Thread(new Runnable() { + @Override + public void run() { + Tool cli = new Balancer.Cli(); + cli.setConf(conf); + try { + cli.run(args); + } catch (Exception e) { + fail("balancer failed for " + e); + } + } + }); + } + + /** + * The normal test case. Start with an imbalanced cluster, then balancer + * should balance succeed but not exit, then make the cluster imbalanced and + * wait for balancer to balance it again + */ + @Test(timeout = 60000) + public void testBalancerServiceBalanceTwice() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setTimeDuration(DFSConfigKeys.DFS_BALANCER_SERVICE_INTERVAL_KEY, 5, + TimeUnit.SECONDS); + TestBalancer.initConf(conf); + try { + setupCluster(conf); + long totalCapacity = addOneDataNode(conf); // make cluster imbalanced + + Thread balancerThread = + newBalancerService(conf, new String[] {"-asService"}); + balancerThread.start(); + + TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, + cluster, BalancerParameters.DEFAULT); + cluster.triggerHeartbeats(); + cluster.triggerBlockReports(); + + // add another empty datanode, wait for cluster become balance again + totalCapacity = addOneDataNode(conf); + TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, + cluster, BalancerParameters.DEFAULT); + + Balancer.stop(); + balancerThread.join(); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test(timeout = 60000) + public void testBalancerServiceOnError() throws Exception { + Configuration conf = new HdfsConfiguration(); + // retry for every 5 seconds + conf.setTimeDuration(DFSConfigKeys.DFS_BALANCER_SERVICE_INTERVAL_KEY, 5, + TimeUnit.SECONDS); + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); + TestBalancer.initConf(conf); + try { + setupCluster(conf); + + Thread balancerThread = + newBalancerService(conf, new String[] {"-asService"}); + balancerThread.start(); + + // cluster is out of service for 10+ secs, the balancer service will retry + // for 2+ times + cluster.shutdownNameNode(0); + GenericTestUtils.waitFor( + () -> Balancer.getExceptionsSinceLastBalance() > 0, 1000, 10000); + assertTrue(Balancer.getExceptionsSinceLastBalance() > 0); + cluster.restartNameNode(0); + cluster.transitionToActive(0); + cluster.waitActive(); + + long totalCapacity = addOneDataNode(conf); + TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, + cluster, BalancerParameters.DEFAULT); + + Balancer.stop(); + balancerThread.join(); + + // reset to 0 once the balancer finished without exception + assertEquals(Balancer.getExceptionsSinceLastBalance(), 0); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } +}