HDFS-13183. Addendum: Standby NameNode process getBlocks request to reduce Active load. Contributed by Xiaoqiao He.
This commit is contained in:
parent
10db97df1c
commit
9b38be43c6
|
@ -708,12 +708,12 @@ public class Balancer {
|
||||||
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
|
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
|
||||||
|
|
||||||
List<NameNodeConnector> connectors = Collections.emptyList();
|
List<NameNodeConnector> connectors = Collections.emptyList();
|
||||||
boolean done = false;
|
try {
|
||||||
for(int iteration = 0; !done; iteration++) {
|
connectors = NameNodeConnector.newNameNodeConnectors(namenodes, nsIds,
|
||||||
try {
|
Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
|
||||||
connectors = NameNodeConnector.newNameNodeConnectors(namenodes, nsIds,
|
p.getMaxIdleIteration());
|
||||||
Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
|
boolean done = false;
|
||||||
p.getMaxIdleIteration());
|
for(int iteration = 0; !done; iteration++) {
|
||||||
done = true;
|
done = true;
|
||||||
Collections.shuffle(connectors);
|
Collections.shuffle(connectors);
|
||||||
for(NameNodeConnector nnc : connectors) {
|
for(NameNodeConnector nnc : connectors) {
|
||||||
|
@ -741,10 +741,10 @@ public class Balancer {
|
||||||
if (!done) {
|
if (!done) {
|
||||||
Thread.sleep(sleeptime);
|
Thread.sleep(sleeptime);
|
||||||
}
|
}
|
||||||
} finally {
|
}
|
||||||
for(NameNodeConnector nnc : connectors) {
|
} finally {
|
||||||
IOUtils.cleanupWithLogger(LOG, nnc);
|
for(NameNodeConnector nnc : connectors) {
|
||||||
}
|
IOUtils.cleanupWithLogger(LOG, nnc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ExitStatus.SUCCESS.getExitCode();
|
return ExitStatus.SUCCESS.getExitCode();
|
||||||
|
|
|
@ -147,12 +147,13 @@ public class NameNodeConnector implements Closeable {
|
||||||
|
|
||||||
private final BalancerProtocols namenode;
|
private final BalancerProtocols namenode;
|
||||||
/**
|
/**
|
||||||
* If set balancerShouldRequestStandby true, Balancer will getBlocks from
|
* If set requestToStandby true, Balancer will getBlocks from
|
||||||
* Standby NameNode only and it can reduce the performance impact of Active
|
* Standby NameNode only and it can reduce the performance impact of Active
|
||||||
* NameNode, especially in a busy HA mode cluster.
|
* NameNode, especially in a busy HA mode cluster.
|
||||||
*/
|
*/
|
||||||
private boolean balancerShouldRequestStandby;
|
private boolean requestToStandby;
|
||||||
private NamenodeProtocol standbyNameNode;
|
private String nsId;
|
||||||
|
private Configuration config;
|
||||||
private final KeyManager keyManager;
|
private final KeyManager keyManager;
|
||||||
final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
|
final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
@ -188,10 +189,10 @@ public class NameNodeConnector implements Closeable {
|
||||||
|
|
||||||
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
|
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
|
||||||
BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
|
BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
|
||||||
this.balancerShouldRequestStandby = conf.getBoolean(
|
this.requestToStandby = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY,
|
DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY,
|
||||||
DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT);
|
DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT);
|
||||||
this.standbyNameNode = null;
|
this.config = conf;
|
||||||
|
|
||||||
this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
|
this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
|
||||||
|
|
||||||
|
@ -216,24 +217,7 @@ public class NameNodeConnector implements Closeable {
|
||||||
Configuration conf, int maxNotChangedIterations)
|
Configuration conf, int maxNotChangedIterations)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this(name, nameNodeUri, idPath, targetPaths, conf, maxNotChangedIterations);
|
this(name, nameNodeUri, idPath, targetPaths, conf, maxNotChangedIterations);
|
||||||
if (nsId != null && HAUtil.isHAEnabled(conf, nsId)) {
|
this.nsId = nsId;
|
||||||
List<ClientProtocol> namenodes =
|
|
||||||
HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId);
|
|
||||||
for (ClientProtocol proxy : namenodes) {
|
|
||||||
try {
|
|
||||||
if (proxy.getHAServiceState().equals(
|
|
||||||
HAServiceProtocol.HAServiceState.STANDBY)) {
|
|
||||||
this.standbyNameNode = NameNodeProxies.createNonHAProxy(
|
|
||||||
conf, RPC.getServerAddress(proxy), NamenodeProtocol.class,
|
|
||||||
UserGroupInformation.getCurrentUser(), false).getProxy();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
//Ignore the exception while connecting to a namenode.
|
|
||||||
LOG.debug("Error while connecting to namenode", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public DistributedFileSystem getDistributedFileSystem() {
|
public DistributedFileSystem getDistributedFileSystem() {
|
||||||
|
@ -255,23 +239,43 @@ public class NameNodeConnector implements Closeable {
|
||||||
if (getBlocksRateLimiter != null) {
|
if (getBlocksRateLimiter != null) {
|
||||||
getBlocksRateLimiter.acquire();
|
getBlocksRateLimiter.acquire();
|
||||||
}
|
}
|
||||||
boolean isRequestStandby = true;
|
boolean isRequestStandby = false;
|
||||||
|
NamenodeProtocol nnproxy = null;
|
||||||
try {
|
try {
|
||||||
if (balancerShouldRequestStandby && standbyNameNode != null) {
|
if (requestToStandby && nsId != null
|
||||||
return standbyNameNode.getBlocks(datanode, size, minBlockSize);
|
&& HAUtil.isHAEnabled(config, nsId)) {
|
||||||
|
List<ClientProtocol> namenodes =
|
||||||
|
HAUtil.getProxiesForAllNameNodesInNameservice(config, nsId);
|
||||||
|
for (ClientProtocol proxy : namenodes) {
|
||||||
|
try {
|
||||||
|
if (proxy.getHAServiceState().equals(
|
||||||
|
HAServiceProtocol.HAServiceState.STANDBY)) {
|
||||||
|
NamenodeProtocol sbn = NameNodeProxies.createNonHAProxy(
|
||||||
|
config, RPC.getServerAddress(proxy), NamenodeProtocol.class,
|
||||||
|
UserGroupInformation.getCurrentUser(), false).getProxy();
|
||||||
|
nnproxy = sbn;
|
||||||
|
isRequestStandby = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
// Ignore the exception while connecting to a namenode.
|
||||||
|
LOG.debug("Error while connecting to namenode", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (nnproxy == null) {
|
||||||
|
LOG.warn("Request #getBlocks to Standby NameNode but meet exception,"
|
||||||
|
+ " will fallback to normal way.");
|
||||||
|
nnproxy = namenode;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
isRequestStandby = false;
|
nnproxy = namenode;
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
return nnproxy.getBlocks(datanode, size, minBlockSize);
|
||||||
LOG.warn("Request #getBlocks to Standby NameNode but meet exception, " +
|
|
||||||
"will fallback to normal way", e);
|
|
||||||
isRequestStandby = false;
|
|
||||||
} finally {
|
} finally {
|
||||||
if (isRequestStandby) {
|
if (isRequestStandby) {
|
||||||
LOG.info("Request #getBlocks to Standby NameNode success.");
|
LOG.info("Request #getBlocks to Standby NameNode success.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return namenode.getBlocks(datanode, size, minBlockSize);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue