HDFS-13183. Standby NameNode process getBlocks request to reduce Active load. Contributed by Xiaoqiao He.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit a3f44dacc1)
This commit is contained in:
He Xiaoqiao 2020-05-18 07:08:32 -07:00 committed by Wei-Chiu Chuang
parent 3915d1afc7
commit eb045ea056
3 changed files with 163 additions and 17 deletions

View File

@ -37,6 +37,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
@ -688,7 +689,7 @@ public class Balancer {
* execute a {@link Balancer} to work through all datanodes once.
*/
static private int doBalance(Collection<URI> namenodes,
final BalancerParameters p, Configuration conf)
Collection<String> nsIds, final BalancerParameters p, Configuration conf)
throws IOException, InterruptedException {
final long sleeptime =
conf.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
@ -707,13 +708,12 @@ public class Balancer {
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
List<NameNodeConnector> connectors = Collections.emptyList();
try {
connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
p.getMaxIdleIteration());
boolean done = false;
for(int iteration = 0; !done; iteration++) {
try {
connectors = NameNodeConnector.newNameNodeConnectors(namenodes, nsIds,
Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
p.getMaxIdleIteration());
done = true;
Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) {
@ -741,19 +741,25 @@ public class Balancer {
if (!done) {
Thread.sleep(sleeptime);
}
}
} finally {
for(NameNodeConnector nnc : connectors) {
IOUtils.cleanupWithLogger(LOG, nnc);
}
}
}
return ExitStatus.SUCCESS.getExitCode();
}
static int run(Collection<URI> namenodes, final BalancerParameters p,
Configuration conf) throws IOException, InterruptedException {
return run(namenodes, null, p, conf);
}
static int run(Collection<URI> namenodes, Collection<String> nsIds,
final BalancerParameters p, Configuration conf)
throws IOException, InterruptedException {
if (!p.getRunAsService()) {
return doBalance(namenodes, p, conf);
return doBalance(namenodes, nsIds, p, conf);
}
if (!serviceRunning) {
serviceRunning = true;
@ -772,7 +778,7 @@ public class Balancer {
while (serviceRunning) {
try {
int retCode = doBalance(namenodes, p, conf);
int retCode = doBalance(namenodes, nsIds, p, conf);
if (retCode < 0) {
LOG.info("Balance failed, error code: " + retCode);
failedTimesSinceLastSuccessfulBalance++;
@ -856,7 +862,8 @@ public class Balancer {
checkReplicationPolicyCompatibility(conf);
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
return Balancer.run(namenodes, parse(args), conf);
final Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
return Balancer.run(namenodes, nsIds, parse(args), conf);
} catch (IOException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.IO_EXCEPTION.getExitCode();

View File

@ -25,6 +25,7 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@ -32,7 +33,12 @@ import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -100,6 +106,32 @@ public class NameNodeConnector implements Closeable {
return connectors;
}
public static List<NameNodeConnector> newNameNodeConnectors(
Collection<URI> namenodes, Collection<String> nsIds, String name,
Path idPath, Configuration conf, int maxIdleIterations)
throws IOException {
final List<NameNodeConnector> connectors = new ArrayList<NameNodeConnector>(
namenodes.size());
Map<URI, String> uriToNsId = new HashMap<>();
if (nsIds != null) {
for (URI uri : namenodes) {
for (String nsId : nsIds) {
if (uri.getAuthority().equals(nsId)) {
uriToNsId.put(uri, nsId);
}
}
}
}
for (URI uri : namenodes) {
String nsId = uriToNsId.get(uri);
NameNodeConnector nnc = new NameNodeConnector(name, uri, nsId, idPath,
null, conf, maxIdleIterations);
nnc.getKeyManager().startBlockKeyUpdater();
connectors.add(nnc);
}
return connectors;
}
@VisibleForTesting
public static void setWrite2IdFile(boolean write2IdFile) {
NameNodeConnector.write2IdFile = write2IdFile;
@ -114,6 +146,13 @@ public class NameNodeConnector implements Closeable {
private final String blockpoolID;
private final BalancerProtocols namenode;
/**
* If set balancerShouldRequestStandby true, Balancer will getBlocks from
* Standby NameNode only and it can reduce the performance impact of Active
* NameNode, especially in a busy HA mode cluster.
*/
private boolean balancerShouldRequestStandby;
private NamenodeProtocol standbyNameNode;
private final KeyManager keyManager;
final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
@ -149,6 +188,11 @@ public class NameNodeConnector implements Closeable {
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
this.balancerShouldRequestStandby = conf.getBoolean(
DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY,
DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT);
this.standbyNameNode = null;
this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
final NamespaceInfo namespaceinfo = namenode.versionRequest();
@ -167,6 +211,31 @@ public class NameNodeConnector implements Closeable {
}
}
public NameNodeConnector(String name, URI nameNodeUri, String nsId,
Path idPath, List<Path> targetPaths,
Configuration conf, int maxNotChangedIterations)
throws IOException {
this(name, nameNodeUri, idPath, targetPaths, conf, maxNotChangedIterations);
if (nsId != null && HAUtil.isHAEnabled(conf, nsId)) {
List<ClientProtocol> namenodes =
HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId);
for (ClientProtocol proxy : namenodes) {
try {
if (proxy.getHAServiceState().equals(
HAServiceProtocol.HAServiceState.STANDBY)) {
this.standbyNameNode = NameNodeProxies.createNonHAProxy(
conf, RPC.getServerAddress(proxy), NamenodeProtocol.class,
UserGroupInformation.getCurrentUser(), false).getProxy();
break;
}
} catch (Exception e) {
//Ignore the exception while connecting to a namenode.
LOG.debug("Error while connecting to namenode", e);
}
}
}
}
public DistributedFileSystem getDistributedFileSystem() {
return fs;
}
@ -186,6 +255,22 @@ public class NameNodeConnector implements Closeable {
if (getBlocksRateLimiter != null) {
getBlocksRateLimiter.acquire();
}
boolean isRequestStandby = true;
try {
if (balancerShouldRequestStandby && standbyNameNode != null) {
return standbyNameNode.getBlocks(datanode, size, minBlockSize);
} else {
isRequestStandby = false;
}
} catch (Exception e) {
LOG.warn("Request #getBlocks to Standby NameNode but meet exception, " +
"will fallback to normal way", e);
isRequestStandby = false;
} finally {
if (isRequestStandby) {
LOG.info("Request #getBlocks to Standby NameNode success.");
}
}
return namenode.getBlocks(datanode, size, minBlockSize);
}

View File

@ -17,7 +17,11 @@
*/
package org.apache.hadoop.hdfs.server.balancer;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.times;
@ -31,6 +35,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -44,7 +49,9 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.Test;
import org.slf4j.LoggerFactory;
/**
* Test balancer with HA NameNodes
@ -106,6 +113,12 @@ public class TestBalancerWithHANameNodes {
TestBalancer.createFile(cluster, TestBalancer.filePath, totalUsedSpace
/ numOfDatanodes, (short) numOfDatanodes, 0);
boolean isRequestStandby = conf.getBoolean(
DFS_HA_ALLOW_STALE_READ_KEY, DFS_HA_ALLOW_STALE_READ_DEFAULT);
if (isRequestStandby) {
HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
cluster.getNameNode(1));
}
// start up an empty node with the same capacity and on the same rack
long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
String newNodeRack = TestBalancer.RACK2; // new node's rack
@ -115,13 +128,54 @@ public class TestBalancerWithHANameNodes {
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
cluster);
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
assertEquals(1, namenodes.size());
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
final int r = Balancer.run(namenodes, nsIds, BalancerParameters.DEFAULT,
conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
cluster, BalancerParameters.DEFAULT);
}
/**
* Test Balancer request Standby NameNode when enable this feature.
*/
@Test(timeout = 60000)
public void testBalancerRequestSBNWithHA() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFS_HA_ALLOW_STALE_READ_KEY, true);
conf.setLong(DFS_HA_TAILEDITS_PERIOD_KEY, 1);
//conf.setBoolean(DFS_HA_BALANCER_REQUEST_STANDBY_KEY, true);
TestBalancer.initConf(conf);
assertEquals(TEST_CAPACITIES.length, TEST_RACKS.length);
NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
Configuration copiedConf = new Configuration(conf);
cluster = new MiniDFSCluster.Builder(copiedConf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(TEST_CAPACITIES.length)
.racks(TEST_RACKS)
.simulatedCapacities(TEST_CAPACITIES)
.build();
// Try capture NameNodeConnector log.
LogCapturer log =LogCapturer.captureLogs(
LoggerFactory.getLogger(NameNodeConnector.class));
HATestUtil.setFailoverConfigurations(cluster, conf);
try {
cluster.waitActive();
cluster.transitionToActive(0);
Thread.sleep(500);
client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
ClientProtocol.class).getProxy();
doTest(conf);
// Check getBlocks request to Standby NameNode.
assertTrue(log.getOutput().contains(
"Request #getBlocks to Standby NameNode success."));
} finally {
cluster.shutdown();
}
}
/**
* Test Balancer with ObserverNodes.
*/