diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index e8b49718fed..f643590f759 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -37,6 +37,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdfs.DFSUtilClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -688,7 +689,7 @@ public class Balancer { * execute a {@link Balancer} to work through all datanodes once. */ static private int doBalance(Collection namenodes, - final BalancerParameters p, Configuration conf) + Collection nsIds, final BalancerParameters p, Configuration conf) throws IOException, InterruptedException { final long sleeptime = conf.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, @@ -707,13 +708,12 @@ public class Balancer { System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); List connectors = Collections.emptyList(); - try { - connectors = NameNodeConnector.newNameNodeConnectors(namenodes, - Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf, - p.getMaxIdleIteration()); - - boolean done = false; - for(int iteration = 0; !done; iteration++) { + boolean done = false; + for(int iteration = 0; !done; iteration++) { + try { + connectors = NameNodeConnector.newNameNodeConnectors(namenodes, nsIds, + Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf, + p.getMaxIdleIteration()); done = true; Collections.shuffle(connectors); for(NameNodeConnector nnc : connectors) { @@ -741,19 +741,25 @@ public class Balancer { if (!done) { Thread.sleep(sleeptime); } - } - } finally { - for(NameNodeConnector nnc : connectors) { - IOUtils.cleanupWithLogger(LOG, nnc); + } finally { + for(NameNodeConnector nnc : connectors) { + IOUtils.cleanupWithLogger(LOG, nnc); + } } } return ExitStatus.SUCCESS.getExitCode(); } static int run(Collection namenodes, final BalancerParameters p, - Configuration conf) throws IOException, InterruptedException { + Configuration conf) throws IOException, InterruptedException { + return run(namenodes, null, p, conf); + } + + static int run(Collection namenodes, Collection nsIds, + final BalancerParameters p, Configuration conf) + throws IOException, InterruptedException { if (!p.getRunAsService()) { - return doBalance(namenodes, p, conf); + return doBalance(namenodes, nsIds, p, conf); } if (!serviceRunning) { serviceRunning = true; @@ -772,7 +778,7 @@ public class Balancer { while (serviceRunning) { try { - int retCode = doBalance(namenodes, p, conf); + int retCode = doBalance(namenodes, nsIds, p, conf); if (retCode < 0) { LOG.info("Balance failed, error code: " + retCode); failedTimesSinceLastSuccessfulBalance++; @@ -856,7 +862,8 @@ public class Balancer { checkReplicationPolicyCompatibility(conf); final Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); - return Balancer.run(namenodes, parse(args), conf); + final Collection nsIds = DFSUtilClient.getNameServiceIds(conf); + return Balancer.run(namenodes, nsIds, parse(args), conf); } catch (IOException e) { System.out.println(e + ". Exiting ..."); return ExitStatus.IO_EXCEPTION.getExitCode(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 2844ad5a943..8403f82c269 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -25,6 +25,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,7 +33,12 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.RateLimiter; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -100,6 +106,32 @@ public class NameNodeConnector implements Closeable { return connectors; } + public static List newNameNodeConnectors( + Collection namenodes, Collection nsIds, String name, + Path idPath, Configuration conf, int maxIdleIterations) + throws IOException { + final List connectors = new ArrayList( + namenodes.size()); + Map uriToNsId = new HashMap<>(); + if (nsIds != null) { + for (URI uri : namenodes) { + for (String nsId : nsIds) { + if (uri.getAuthority().equals(nsId)) { + uriToNsId.put(uri, nsId); + } + } + } + } + for (URI uri : namenodes) { + String nsId = uriToNsId.get(uri); + NameNodeConnector nnc = new NameNodeConnector(name, uri, nsId, idPath, + null, conf, maxIdleIterations); + nnc.getKeyManager().startBlockKeyUpdater(); + connectors.add(nnc); + } + return connectors; + } + @VisibleForTesting public static void setWrite2IdFile(boolean write2IdFile) { NameNodeConnector.write2IdFile = write2IdFile; @@ -114,6 +146,13 @@ public class NameNodeConnector implements Closeable { private final String blockpoolID; private final BalancerProtocols namenode; + /** + * If set balancerShouldRequestStandby true, Balancer will getBlocks from + * Standby NameNode only and it can reduce the performance impact of Active + * NameNode, especially in a busy HA mode cluster. + */ + private boolean balancerShouldRequestStandby; + private NamenodeProtocol standbyNameNode; private final KeyManager keyManager; final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false); @@ -149,6 +188,11 @@ public class NameNodeConnector implements Closeable { this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri, BalancerProtocols.class, fallbackToSimpleAuth).getProxy(); + this.balancerShouldRequestStandby = conf.getBoolean( + DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY, + DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT); + this.standbyNameNode = null; + this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf); final NamespaceInfo namespaceinfo = namenode.versionRequest(); @@ -167,6 +211,31 @@ public class NameNodeConnector implements Closeable { } } + public NameNodeConnector(String name, URI nameNodeUri, String nsId, + Path idPath, List targetPaths, + Configuration conf, int maxNotChangedIterations) + throws IOException { + this(name, nameNodeUri, idPath, targetPaths, conf, maxNotChangedIterations); + if (nsId != null && HAUtil.isHAEnabled(conf, nsId)) { + List namenodes = + HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId); + for (ClientProtocol proxy : namenodes) { + try { + if (proxy.getHAServiceState().equals( + HAServiceProtocol.HAServiceState.STANDBY)) { + this.standbyNameNode = NameNodeProxies.createNonHAProxy( + conf, RPC.getServerAddress(proxy), NamenodeProtocol.class, + UserGroupInformation.getCurrentUser(), false).getProxy(); + break; + } + } catch (Exception e) { + //Ignore the exception while connecting to a namenode. + LOG.debug("Error while connecting to namenode", e); + } + } + } + } + public DistributedFileSystem getDistributedFileSystem() { return fs; } @@ -186,6 +255,22 @@ public class NameNodeConnector implements Closeable { if (getBlocksRateLimiter != null) { getBlocksRateLimiter.acquire(); } + boolean isRequestStandby = true; + try { + if (balancerShouldRequestStandby && standbyNameNode != null) { + return standbyNameNode.getBlocks(datanode, size, minBlockSize); + } else { + isRequestStandby = false; + } + } catch (Exception e) { + LOG.warn("Request #getBlocks to Standby NameNode but meet exception, " + + "will fallback to normal way", e); + isRequestStandby = false; + } finally { + if (isRequestStandby) { + LOG.info("Request #getBlocks to Standby NameNode success."); + } + } return namenode.getBlocks(datanode, size, minBlockSize); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java index c604315fb26..185df1246d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java @@ -17,7 +17,11 @@ */ package org.apache.hadoop.hdfs.server.balancer; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.times; @@ -31,6 +35,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -44,7 +49,9 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider; +import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.junit.Test; +import org.slf4j.LoggerFactory; /** * Test balancer with HA NameNodes @@ -106,6 +113,12 @@ public class TestBalancerWithHANameNodes { TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace / numOfDatanodes, (short) numOfDatanodes, 0); + boolean isRequestStandby = conf.getBoolean( + DFS_HA_ALLOW_STALE_READ_KEY, DFS_HA_ALLOW_STALE_READ_DEFAULT); + if (isRequestStandby) { + HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0), + cluster.getNameNode(1)); + } // start up an empty node with the same capacity and on the same rack long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity String newNodeRack = TestBalancer.RACK2; // new node's rack @@ -115,13 +128,54 @@ public class TestBalancerWithHANameNodes { TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); + Collection nsIds = DFSUtilClient.getNameServiceIds(conf); assertEquals(1, namenodes.size()); - final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); + final int r = Balancer.run(namenodes, nsIds, BalancerParameters.DEFAULT, + conf); assertEquals(ExitStatus.SUCCESS.getExitCode(), r); TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, BalancerParameters.DEFAULT); } + /** + * Test Balancer request Standby NameNode when enable this feature. + */ + @Test(timeout = 60000) + public void testBalancerRequestSBNWithHA() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFS_HA_ALLOW_STALE_READ_KEY, true); + conf.setLong(DFS_HA_TAILEDITS_PERIOD_KEY, 1); + //conf.setBoolean(DFS_HA_BALANCER_REQUEST_STANDBY_KEY, true); + TestBalancer.initConf(conf); + assertEquals(TEST_CAPACITIES.length, TEST_RACKS.length); + NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1"); + nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT); + Configuration copiedConf = new Configuration(conf); + cluster = new MiniDFSCluster.Builder(copiedConf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(TEST_CAPACITIES.length) + .racks(TEST_RACKS) + .simulatedCapacities(TEST_CAPACITIES) + .build(); + // Try capture NameNodeConnector log. + LogCapturer log =LogCapturer.captureLogs( + LoggerFactory.getLogger(NameNodeConnector.class)); + HATestUtil.setFailoverConfigurations(cluster, conf); + try { + cluster.waitActive(); + cluster.transitionToActive(0); + Thread.sleep(500); + client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), + ClientProtocol.class).getProxy(); + doTest(conf); + // Check getBlocks request to Standby NameNode. + assertTrue(log.getOutput().contains( + "Request #getBlocks to Standby NameNode success.")); + } finally { + cluster.shutdown(); + } + } + /** * Test Balancer with ObserverNodes. */