HDFS-16088. Standby NameNode process getLiveDatanodeStorageReport req… (#3140)

This commit is contained in:
litao 2021-07-08 14:10:45 +08:00 committed by GitHub
parent 7581413156
commit b4c2647d0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 192 additions and 30 deletions

View File

@ -259,37 +259,19 @@ public class NameNodeConnector implements Closeable {
getBlocksRateLimiter.acquire();
}
boolean isRequestStandby = false;
NamenodeProtocol nnproxy = null;
NamenodeProtocol nnProxy = null;
try {
if (requestToStandby && nsId != null
&& HAUtil.isHAEnabled(config, nsId)) {
List<ClientProtocol> namenodes =
HAUtil.getProxiesForAllNameNodesInNameservice(config, nsId);
for (ClientProtocol proxy : namenodes) {
try {
if (proxy.getHAServiceState().equals(
HAServiceProtocol.HAServiceState.STANDBY)) {
NamenodeProtocol sbn = NameNodeProxies.createNonHAProxy(
ProxyPair proxyPair = getProxy();
isRequestStandby = proxyPair.isRequestStandby;
ClientProtocol proxy = proxyPair.clientProtocol;
if (isRequestStandby) {
nnProxy = NameNodeProxies.createNonHAProxy(
config, RPC.getServerAddress(proxy), NamenodeProtocol.class,
UserGroupInformation.getCurrentUser(), false).getProxy();
nnproxy = sbn;
isRequestStandby = true;
break;
}
} catch (Exception e) {
// Ignore the exception while connecting to a namenode.
LOG.debug("Error while connecting to namenode", e);
}
}
if (nnproxy == null) {
LOG.warn("Request #getBlocks to Standby NameNode but meet exception,"
+ " will fallback to normal way.");
nnproxy = namenode;
}
} else {
nnproxy = namenode;
nnProxy = namenode;
}
return nnproxy.getBlocks(datanode, size, minBlockSize, timeInterval);
return nnProxy.getBlocks(datanode, size, minBlockSize, timeInterval);
} finally {
if (isRequestStandby) {
LOG.info("Request #getBlocks to Standby NameNode success.");
@ -314,7 +296,54 @@ public class NameNodeConnector implements Closeable {
/** @return live datanode storage reports. */
public DatanodeStorageReport[] getLiveDatanodeStorageReport()
throws IOException {
return namenode.getDatanodeStorageReport(DatanodeReportType.LIVE);
boolean isRequestStandby = false;
try {
ProxyPair proxyPair = getProxy();
isRequestStandby = proxyPair.isRequestStandby;
ClientProtocol proxy = proxyPair.clientProtocol;
return proxy.getDatanodeStorageReport(DatanodeReportType.LIVE);
} finally {
if (isRequestStandby) {
LOG.info("Request #getLiveDatanodeStorageReport to Standby " +
"NameNode success.");
}
}
}
/**
* get the proxy.
* @return ProxyPair(clientProtocol and isRequestStandby)
* @throws IOException
*/
private ProxyPair getProxy() throws IOException {
boolean isRequestStandby = false;
ClientProtocol clientProtocol = null;
if (requestToStandby && nsId != null
&& HAUtil.isHAEnabled(config, nsId)) {
List<ClientProtocol> namenodes =
HAUtil.getProxiesForAllNameNodesInNameservice(config, nsId);
for (ClientProtocol proxy : namenodes) {
try {
if (proxy.getHAServiceState().equals(
HAServiceProtocol.HAServiceState.STANDBY)) {
clientProtocol = proxy;
isRequestStandby = true;
break;
}
} catch (Exception e) {
// Ignore the exception while connecting to a namenode.
LOG.debug("Error while connecting to namenode", e);
}
}
if (clientProtocol == null) {
LOG.warn("Request to Standby" +
" NameNode but meet exception, will fallback to normal way.");
clientProtocol = namenode;
}
} else {
clientProtocol = namenode;
}
return new ProxyPair(clientProtocol, isRequestStandby);
}
/** @return the key manager */
@ -432,4 +461,14 @@ public class NameNodeConnector implements Closeable {
return getClass().getSimpleName() + "[namenodeUri=" + nameNodeUri
+ ", bpid=" + blockpoolID + "]";
}
private static class ProxyPair {
private final ClientProtocol clientProtocol;
private final boolean isRequestStandby;
ProxyPair(ClientProtocol clientProtocol, boolean isRequestStandby) {
this.clientProtocol = clientProtocol;
this.isRequestStandby = isRequestStandby;
}
}
}

View File

@ -34,6 +34,8 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -49,6 +51,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.Test;
import org.slf4j.LoggerFactory;
@ -171,6 +174,8 @@ public class TestBalancerWithHANameNodes {
// Check getBlocks request to Standby NameNode.
assertTrue(log.getOutput().contains(
"Request #getBlocks to Standby NameNode success."));
assertTrue(log.getOutput().contains(
"Request #getLiveDatanodeStorageReport to Standby NameNode success"));
} finally {
cluster.shutdown();
}
@ -236,4 +241,122 @@ public class TestBalancerWithHANameNodes {
}
}
}
/**
* Comparing the results of getLiveDatanodeStorageReport()
* from the active and standby NameNodes,
* the results should be the same.
*/
@Test(timeout = 60000)
public void testGetLiveDatanodeStorageReport() throws Exception {
Configuration conf = new HdfsConfiguration();
TestBalancer.initConf(conf);
assertEquals(TEST_CAPACITIES.length, TEST_RACKS.length);
NNConf nn1Conf = new MiniDFSNNTopology.NNConf("nn1");
nn1Conf.setIpcPort(HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);
Configuration copiedConf = new Configuration(conf);
// Try capture NameNodeConnector log.
LogCapturer log =LogCapturer.captureLogs(
LoggerFactory.getLogger(NameNodeConnector.class));
// We needs to assert datanode info from ANN and SNN, so the
// heartbeat should disabled for the duration of method execution.
copiedConf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 60000);
cluster = new MiniDFSCluster.Builder(copiedConf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(TEST_CAPACITIES.length)
.racks(TEST_RACKS)
.simulatedCapacities(TEST_CAPACITIES)
.build();
HATestUtil.setFailoverConfigurations(cluster, conf);
try {
cluster.waitActive();
cluster.transitionToActive(0);
URI namenode = (URI) DFSUtil.getInternalNsRpcUris(conf)
.toArray()[0];
String nsId = DFSUtilClient.getNameServiceIds(conf)
.toArray()[0].toString();
// Request to active namenode.
NameNodeConnector nncActive = new NameNodeConnector(
"nncActive", namenode,
nsId, new Path("/test"),
null, conf, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
DatanodeStorageReport[] datanodeStorageReportFromAnn =
nncActive.getLiveDatanodeStorageReport();
assertTrue(!log.getOutput().contains(
"Request #getLiveDatanodeStorageReport to Standby NameNode success"));
nncActive.close();
// Request to standby namenode.
conf.setBoolean(DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY,
true);
NameNodeConnector nncStandby = new NameNodeConnector(
"nncStandby", namenode,
nsId, new Path("/test"),
null, conf, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
DatanodeStorageReport[] datanodeStorageReportFromSnn =
nncStandby.getLiveDatanodeStorageReport();
assertTrue(log.getOutput().contains(
"Request #getLiveDatanodeStorageReport to Standby NameNode success"));
nncStandby.close();
// Assert datanode info.
assertEquals(
datanodeStorageReportFromAnn[0].getDatanodeInfo()
.getDatanodeReport(),
datanodeStorageReportFromSnn[0].getDatanodeInfo()
.getDatanodeReport());
assertEquals(
datanodeStorageReportFromAnn[1].getDatanodeInfo()
.getDatanodeReport(),
datanodeStorageReportFromSnn[1].getDatanodeInfo()
.getDatanodeReport());
// Assert all fields datanode storage info.
for (int i = 0; i < TEST_CAPACITIES.length; i++) {
assertEquals(
datanodeStorageReportFromAnn[i].getStorageReports()[0]
.getStorage().toString(),
datanodeStorageReportFromSnn[i].getStorageReports()[0]
.getStorage().toString());
assertEquals(
datanodeStorageReportFromAnn[i].getStorageReports()[0]
.getCapacity(),
datanodeStorageReportFromSnn[i].getStorageReports()[0]
.getCapacity());
assertEquals(
datanodeStorageReportFromAnn[i].getStorageReports()[0]
.getBlockPoolUsed(),
datanodeStorageReportFromSnn[i].getStorageReports()[0]
.getBlockPoolUsed());
assertEquals(
datanodeStorageReportFromAnn[i].getStorageReports()[0]
.getDfsUsed(),
datanodeStorageReportFromSnn[i].getStorageReports()[0]
.getDfsUsed());
assertEquals(
datanodeStorageReportFromAnn[i].getStorageReports()[0]
.getRemaining(),
datanodeStorageReportFromSnn[i].getStorageReports()[0]
.getRemaining());
assertEquals(
datanodeStorageReportFromAnn[i].getStorageReports()[0]
.getMount(),
datanodeStorageReportFromSnn[i].getStorageReports()[0]
.getMount());
assertEquals(
datanodeStorageReportFromAnn[i].getStorageReports()[0]
.getNonDfsUsed(),
datanodeStorageReportFromSnn[i].getStorageReports()[0]
.getNonDfsUsed());
assertEquals(
datanodeStorageReportFromAnn[i].getStorageReports()[0]
.isFailed(),
datanodeStorageReportFromSnn[i].getStorageReports()[0]
.isFailed());
}
} finally {
cluster.shutdown();
}
}
}