From f5aa36e194dc2f98205ee366a1daedf81701a39a Mon Sep 17 00:00:00 2001 From: Yiqun Lin Date: Wed, 28 Mar 2018 11:00:08 +0800 Subject: [PATCH] HDFS-13347. RBF: Cache datanode reports. Contributed by Inigo Goiri. (cherry picked from commit a71656c1c1bf6c680f1382a76ddcac870061f320) --- .../federation/metrics/FederationMetrics.java | 2 +- .../metrics/NamenodeBeanMetrics.java | 72 +++++++++++++++++-- .../hdfs/server/federation/router/Router.java | 13 ++++ .../router/RouterMetricsService.java | 9 +++ .../federation/router/RouterRpcServer.java | 16 ++--- .../federation/router/TestRouterRpc.java | 41 ++++++++++- 6 files changed, 138 insertions(+), 15 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java index 1a5a8beaf26..f59429ec532 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java @@ -422,7 +422,7 @@ public String getNodeUsage() { try { RouterRpcServer rpcServer = this.router.getRpcServer(); DatanodeInfo[] live = rpcServer.getDatanodeReport( - DatanodeReportType.LIVE, TIME_OUT); + DatanodeReportType.LIVE, false, TIME_OUT); if (live.length > 0) { float totalDfsUsed = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java index 93e9ea05b20..e097037027d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java @@ -29,11 +29,16 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; import javax.management.StandardMBean; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -41,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.Router; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; @@ -58,6 +64,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; + /** * Expose the Namenode metrics as the Router was one. */ @@ -67,6 +77,22 @@ public class NamenodeBeanMetrics private static final Logger LOG = LoggerFactory.getLogger(NamenodeBeanMetrics.class); + /** Prevent holding the page from loading too long. */ + private static final String DN_REPORT_TIME_OUT = + RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "dn-report.time-out"; + /** We only wait for 1 second. */ + private static final long DN_REPORT_TIME_OUT_DEFAULT = + TimeUnit.SECONDS.toMillis(1); + + /** Time to cache the DN information. */ + public static final String DN_REPORT_CACHE_EXPIRE = + RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "dn-report.cache-expire"; + /** We cache the DN information for 10 seconds by default. */ + public static final long DN_REPORT_CACHE_EXPIRE_DEFAULT = + TimeUnit.SECONDS.toMillis(10); + + + /** Instance of the Router being monitored. */ private final Router router; /** FSNamesystem bean. */ @@ -78,6 +104,11 @@ public class NamenodeBeanMetrics /** NameNodeStatus bean. */ private ObjectName nnStatusBeanName; + /** Timeout to get the DN report. */ + private final long dnReportTimeOut; + /** DN type -> full DN report in JSON. */ + private final LoadingCache dnCache; + public NamenodeBeanMetrics(Router router) { this.router = router; @@ -116,6 +147,23 @@ public NamenodeBeanMetrics(Router router) { } catch (NotCompliantMBeanException e) { throw new RuntimeException("Bad NameNodeStatus MBean setup", e); } + + // Initialize the cache for the DN reports + Configuration conf = router.getConfig(); + this.dnReportTimeOut = conf.getTimeDuration( + DN_REPORT_TIME_OUT, DN_REPORT_TIME_OUT_DEFAULT, TimeUnit.MILLISECONDS); + long dnCacheExpire = conf.getTimeDuration( + DN_REPORT_CACHE_EXPIRE, + DN_REPORT_CACHE_EXPIRE_DEFAULT, TimeUnit.MILLISECONDS); + this.dnCache = CacheBuilder.newBuilder() + .expireAfterWrite(dnCacheExpire, TimeUnit.MILLISECONDS) + .build( + new CacheLoader() { + @Override + public String load(DatanodeReportType type) throws Exception { + return getNodesImpl(type); + } + }); } /** @@ -284,16 +332,32 @@ public String getDecomNodes() { } /** - * Get all the nodes in the federation from a particular type. - * TODO this is expensive, we may want to cache it. + * Get all the nodes in the federation from a particular type. Getting this + * information is expensive and we use a cache. * @param type Type of the datanodes to check. * @return JSON with the nodes. */ - private String getNodes(DatanodeReportType type) { + private String getNodes(final DatanodeReportType type) { + try { + return this.dnCache.get(type); + } catch (ExecutionException e) { + LOG.error("Cannot get the DN storage report for {}", type, e); + } + // If we cannot get the report, return empty JSON + return "{}"; + } + + /** + * Get all the nodes in the federation from a particular type. + * @param type Type of the datanodes to check. + * @return JSON with the nodes. + */ + private String getNodesImpl(final DatanodeReportType type) { final Map> info = new HashMap<>(); try { RouterRpcServer rpcServer = this.router.getRpcServer(); - DatanodeInfo[] datanodes = rpcServer.getDatanodeReport(type); + DatanodeInfo[] datanodes = + rpcServer.getDatanodeReport(type, false, dnReportTimeOut); for (DatanodeInfo node : datanodes) { Map innerinfo = new HashMap<>(); innerinfo.put("infoAddr", node.getInfoAddr()); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java index 38f5d4ff968..df2a4486f9b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics; +import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.store.RouterStore; @@ -552,6 +553,18 @@ public FederationMetrics getMetrics() { return null; } + /** + * Get the Namenode metrics. + * + * @return Namenode metrics. + */ + public NamenodeBeanMetrics getNamenodeMetrics() { + if (this.metrics != null) { + return this.metrics.getNamenodeMetrics(); + } + return null; + } + /** * Get the subcluster resolver for files. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java index f4debce3aae..1887ed6bce2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java @@ -94,6 +94,15 @@ public FederationMetrics getFederationMetrics() { return this.federationMetrics; } + /** + * Get the Namenode metrics. + * + * @return Namenode metrics. + */ + public NamenodeBeanMetrics getNamenodeMetrics() { + return this.nnMetrics; + } + /** * Get the JVM metrics for the Router. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index a38e7e8779b..9691eaa69c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -1207,18 +1207,20 @@ public long[] getStats() throws IOException { public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) throws IOException { checkOperation(OperationCategory.UNCHECKED); - return getDatanodeReport(type, 0); + return getDatanodeReport(type, true, 0); } /** * Get the datanode report with a timeout. * @param type Type of the datanode. + * @param requireResponse If we require all the namespaces to report. * @param timeOutMs Time out for the reply in milliseconds. * @return List of datanodes. * @throws IOException If it cannot get the report. */ public DatanodeInfo[] getDatanodeReport( - DatanodeReportType type, long timeOutMs) throws IOException { + DatanodeReportType type, boolean requireResponse, long timeOutMs) + throws IOException { checkOperation(OperationCategory.UNCHECKED); Map datanodesMap = new LinkedHashMap<>(); @@ -1227,8 +1229,8 @@ public DatanodeInfo[] getDatanodeReport( Set nss = namenodeResolver.getNamespaces(); Map results = - rpcClient.invokeConcurrent( - nss, method, true, false, timeOutMs, DatanodeInfo[].class); + rpcClient.invokeConcurrent(nss, method, requireResponse, false, + timeOutMs, DatanodeInfo[].class); for (Entry entry : results.entrySet()) { FederationNamespaceInfo ns = entry.getKey(); @@ -1248,9 +1250,7 @@ public DatanodeInfo[] getDatanodeReport( } // Map -> Array Collection datanodes = datanodesMap.values(); - DatanodeInfo[] combinedData = new DatanodeInfo[datanodes.size()]; - combinedData = datanodes.toArray(combinedData); - return combinedData; + return toArray(datanodes, DatanodeInfo.class); } @Override // ClientProtocol @@ -2154,7 +2154,7 @@ protected static T[] merge( * @param clazz Class of the values. * @return Array with the values in set. */ - private static T[] toArray(Set set, Class clazz) { + private static T[] toArray(Collection set, Class clazz) { @SuppressWarnings("unchecked") T[] combinedData = (T[]) Array.newInstance(clazz, set.size()); combinedData = set.toArray(combinedData); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index e37d2df3c96..a48475134f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -26,6 +26,7 @@ import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.TEST_STRING; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -40,6 +41,7 @@ import java.util.Random; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CryptoProtocolVersion; @@ -61,16 +63,18 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.test.GenericTestUtils; +import org.codehaus.jettison.json.JSONObject; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -127,7 +131,14 @@ public static void globalSetUp() throws Exception { cluster.startCluster(); // Start routers with only an RPC service - cluster.addRouterOverrides((new RouterConfigBuilder()).rpc().build()); + Configuration routerConf = new RouterConfigBuilder() + .metrics() + .rpc() + .build(); + // We decrease the DN cache times to make the test faster + routerConf.setTimeDuration( + NamenodeBeanMetrics.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); + cluster.addRouterOverrides(routerConf); cluster.startRouters(); // Register and verify all NNs with all routers @@ -897,4 +908,30 @@ public void testProxyGetFileInfoAcessException() throws IOException { assertEquals(routerFailure.getClass(), nnFailure.getClass()); } + + @Test + public void testNamenodeMetrics() throws Exception { + final NamenodeBeanMetrics metrics = + router.getRouter().getNamenodeMetrics(); + final String jsonString0 = metrics.getLiveNodes(); + + // We should have 12 nodes in total + JSONObject jsonObject = new JSONObject(jsonString0); + assertEquals(12, jsonObject.names().length()); + + // We should be caching this information + String jsonString1 = metrics.getLiveNodes(); + assertEquals(jsonString0, jsonString1); + + // We wait until the cached value is updated + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return !jsonString0.equals(metrics.getLiveNodes()); + } + }, 500, 5 * 1000); + + // The cache should be updated now + assertNotEquals(jsonString0, metrics.getLiveNodes()); + } } \ No newline at end of file