HDFS-13183. Addendum: Standby NameNode process getBlocks request to reduce Active load. Contributed by Xiaoqiao He.

This commit is contained in:
Ayush Saxena 2020-05-28 13:00:04 +05:30
parent 586d9427e3
commit 6b7040a1cb
2 changed files with 46 additions and 42 deletions

View File

@ -708,12 +708,12 @@ public class Balancer {
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
List<NameNodeConnector> connectors = Collections.emptyList();
boolean done = false;
for(int iteration = 0; !done; iteration++) {
try {
connectors = NameNodeConnector.newNameNodeConnectors(namenodes, nsIds,
Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
p.getMaxIdleIteration());
boolean done = false;
for(int iteration = 0; !done; iteration++) {
done = true;
Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) {
@ -741,12 +741,12 @@ public class Balancer {
if (!done) {
Thread.sleep(sleeptime);
}
}
} finally {
for(NameNodeConnector nnc : connectors) {
IOUtils.cleanupWithLogger(LOG, nnc);
}
}
}
return ExitStatus.SUCCESS.getExitCode();
}

View File

@ -147,12 +147,13 @@ public class NameNodeConnector implements Closeable {
private final BalancerProtocols namenode;
/**
* If set balancerShouldRequestStandby true, Balancer will getBlocks from
* If set requestToStandby true, Balancer will getBlocks from
* Standby NameNode only and it can reduce the performance impact of Active
* NameNode, especially in a busy HA mode cluster.
*/
private boolean balancerShouldRequestStandby;
private NamenodeProtocol standbyNameNode;
private boolean requestToStandby;
private String nsId;
private Configuration config;
private final KeyManager keyManager;
final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
@ -188,10 +189,10 @@ public class NameNodeConnector implements Closeable {
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
this.balancerShouldRequestStandby = conf.getBoolean(
this.requestToStandby = conf.getBoolean(
DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY,
DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT);
this.standbyNameNode = null;
this.config = conf;
this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
@ -216,24 +217,7 @@ public class NameNodeConnector implements Closeable {
Configuration conf, int maxNotChangedIterations)
throws IOException {
this(name, nameNodeUri, idPath, targetPaths, conf, maxNotChangedIterations);
if (nsId != null && HAUtil.isHAEnabled(conf, nsId)) {
List<ClientProtocol> namenodes =
HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId);
for (ClientProtocol proxy : namenodes) {
try {
if (proxy.getHAServiceState().equals(
HAServiceProtocol.HAServiceState.STANDBY)) {
this.standbyNameNode = NameNodeProxies.createNonHAProxy(
conf, RPC.getServerAddress(proxy), NamenodeProtocol.class,
UserGroupInformation.getCurrentUser(), false).getProxy();
break;
}
} catch (Exception e) {
//Ignore the exception while connecting to a namenode.
LOG.debug("Error while connecting to namenode", e);
}
}
}
this.nsId = nsId;
}
public DistributedFileSystem getDistributedFileSystem() {
@ -255,23 +239,43 @@ public class NameNodeConnector implements Closeable {
if (getBlocksRateLimiter != null) {
getBlocksRateLimiter.acquire();
}
boolean isRequestStandby = true;
boolean isRequestStandby = false;
NamenodeProtocol nnproxy = null;
try {
if (balancerShouldRequestStandby && standbyNameNode != null) {
return standbyNameNode.getBlocks(datanode, size, minBlockSize);
} else {
isRequestStandby = false;
if (requestToStandby && nsId != null
&& HAUtil.isHAEnabled(config, nsId)) {
List<ClientProtocol> namenodes =
HAUtil.getProxiesForAllNameNodesInNameservice(config, nsId);
for (ClientProtocol proxy : namenodes) {
try {
if (proxy.getHAServiceState().equals(
HAServiceProtocol.HAServiceState.STANDBY)) {
NamenodeProtocol sbn = NameNodeProxies.createNonHAProxy(
config, RPC.getServerAddress(proxy), NamenodeProtocol.class,
UserGroupInformation.getCurrentUser(), false).getProxy();
nnproxy = sbn;
isRequestStandby = true;
break;
}
} catch (Exception e) {
LOG.warn("Request #getBlocks to Standby NameNode but meet exception, " +
"will fallback to normal way", e);
isRequestStandby = false;
// Ignore the exception while connecting to a namenode.
LOG.debug("Error while connecting to namenode", e);
}
}
if (nnproxy == null) {
LOG.warn("Request #getBlocks to Standby NameNode but meet exception,"
+ " will fallback to normal way.");
nnproxy = namenode;
}
} else {
nnproxy = namenode;
}
return nnproxy.getBlocks(datanode, size, minBlockSize);
} finally {
if (isRequestStandby) {
LOG.info("Request #getBlocks to Standby NameNode success.");
}
}
return namenode.getBlocks(datanode, size, minBlockSize);
}
/**