From bc9e588a19c0aaf518de8dab719362be4a8d6a54 Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Fri, 8 Sep 2017 09:37:10 -0700 Subject: [PATCH] HDFS-12335. Federation Metrics. Contributed by Inigo Goiri. (cherry picked from commit 3b19e77752afce87936f5c0d1e6d272fba798d7b) --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 11 + .../federation/metrics/FederationMBean.java | 204 ++++++ .../federation/metrics/FederationMetrics.java | 673 ++++++++++++++++++ .../metrics/FederationRPCMBean.java | 90 +++ .../metrics/FederationRPCMetrics.java | 239 +++++++ .../FederationRPCPerformanceMonitor.java | 211 ++++++ .../metrics/NamenodeBeanMetrics.java | 624 ++++++++++++++++ .../federation/metrics/StateStoreMBean.java | 45 ++ .../federation/metrics/StateStoreMetrics.java | 144 ++++ .../federation/metrics/package-info.java | 27 + .../federation/router/ConnectionManager.java | 23 + .../federation/router/ConnectionPool.java | 23 + .../hdfs/server/federation/router/Router.java | 62 ++ .../federation/router/RouterMetrics.java | 73 ++ .../router/RouterMetricsService.java | 108 +++ .../federation/router/RouterRpcClient.java | 39 +- .../federation/router/RouterRpcMonitor.java | 95 +++ .../federation/router/RouterRpcServer.java | 63 +- .../federation/store/CachedRecordStore.java | 8 + .../federation/store/StateStoreService.java | 42 +- .../store/driver/StateStoreDriver.java | 17 +- .../impl/StateStoreSerializableImpl.java | 6 +- .../driver/impl/StateStoreZooKeeperImpl.java | 26 + .../store/records/MembershipState.java | 2 +- .../federation/store/records/MountTable.java | 23 + .../impl/pb/MembershipStatePBImpl.java | 5 +- .../src/main/resources/hdfs-default.xml | 19 +- .../federation/FederationTestUtils.java | 13 + .../federation/RouterConfigBuilder.java | 13 + .../metrics/TestFederationMetrics.java | 237 ++++++ .../federation/metrics/TestMetricsBase.java | 150 ++++ .../server/federation/router/TestRouter.java | 23 +- .../driver/TestStateStoreDriverBase.java | 69 ++ 33 files changed, 3383 insertions(+), 24 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMBean.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMBean.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/package-info.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 629ad00fa62..48521a3b63b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFau import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; @@ -1152,6 +1154,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys { FEDERATION_ROUTER_PREFIX + "rpc.enable"; public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true; + public static final String DFS_ROUTER_METRICS_ENABLE = + FEDERATION_ROUTER_PREFIX + "metrics.enable"; + public static final boolean DFS_ROUTER_METRICS_ENABLE_DEFAULT = true; + public static final String DFS_ROUTER_METRICS_CLASS = + FEDERATION_ROUTER_PREFIX + "metrics.class"; + public static final Class + DFS_ROUTER_METRICS_CLASS_DEFAULT = + FederationRPCPerformanceMonitor.class; + // HDFS Router heartbeat public static final String DFS_ROUTER_HEARTBEAT_ENABLE = FEDERATION_ROUTER_PREFIX + "heartbeat.enable"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMBean.java new file mode 100644 index 00000000000..43efb3c053f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMBean.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * JMX interface for the federation statistics. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface FederationMBean { + + /** + * Get information about all the namenodes in the federation or null if + * failure. + * @return JSON with all the Namenodes. + */ + String getNamenodes(); + + /** + * Get the latest info for each registered nameservice. + * @return JSON with all the nameservices. + */ + String getNameservices(); + + /** + * Get the mount table for the federated filesystem or null if failure. + * @return JSON with the mount table. + */ + String getMountTable(); + + /** + * Get the total capacity of the federated cluster. + * @return Total capacity of the federated cluster. + */ + long getTotalCapacity(); + + /** + * Get the used capacity of the federated cluster. + * @return Used capacity of the federated cluster. + */ + long getUsedCapacity(); + + /** + * Get the remaining capacity of the federated cluster. + * @return Remaining capacity of the federated cluster. + */ + long getRemainingCapacity(); + + /** + * Get the number of nameservices in the federation. + * @return Number of nameservices in the federation. + */ + int getNumNameservices(); + + /** + * Get the number of namenodes. + * @return Number of namenodes. + */ + int getNumNamenodes(); + + /** + * Get the number of expired namenodes. + * @return Number of expired namenodes. + */ + int getNumExpiredNamenodes(); + + /** + * Get the number of live datanodes. + * @return Number of live datanodes. + */ + int getNumLiveNodes(); + + /** + * Get the number of dead datanodes. + * @return Number of dead datanodes. + */ + int getNumDeadNodes(); + + /** + * Get the number of decommissioning datanodes. + * @return Number of decommissioning datanodes. + */ + int getNumDecommissioningNodes(); + + /** + * Get the number of live decommissioned datanodes. + * @return Number of live decommissioned datanodes. + */ + int getNumDecomLiveNodes(); + + /** + * Get the number of dead decommissioned datanodes. + * @return Number of dead decommissioned datanodes. + */ + int getNumDecomDeadNodes(); + + /** + * Get Max, Median, Min and Standard Deviation of DataNodes usage. + * @return the DataNode usage information, as a JSON string. + */ + String getNodeUsage(); + + /** + * Get the number of blocks in the federation. + * @return Number of blocks in the federation. + */ + long getNumBlocks(); + + /** + * Get the number of missing blocks in the federation. + * @return Number of missing blocks in the federation. + */ + long getNumOfMissingBlocks(); + + /** + * Get the number of pending replication blocks in the federation. + * @return Number of pending replication blocks in the federation. + */ + long getNumOfBlocksPendingReplication(); + + /** + * Get the number of under replicated blocks in the federation. + * @return Number of under replicated blocks in the federation. + */ + long getNumOfBlocksUnderReplicated(); + + /** + * Get the number of pending deletion blocks in the federation. + * @return Number of pending deletion blocks in the federation. + */ + long getNumOfBlocksPendingDeletion(); + + /** + * Get the number of files in the federation. + * @return Number of files in the federation. + */ + long getNumFiles(); + + /** + * When the router started. + * @return Date as a string the router started. + */ + String getRouterStarted(); + + /** + * Get the version of the router. + * @return Version of the router. + */ + String getVersion(); + + /** + * Get the compilation date of the router. + * @return Compilation date of the router. + */ + String getCompiledDate(); + + /** + * Get the compilation info of the router. + * @return Compilation info of the router. + */ + String getCompileInfo(); + + /** + * Get the host and port of the router. + * @return Host and port of the router. + */ + String getHostAndPort(); + + /** + * Get the identifier of the router. + * @return Identifier of the router. + */ + String getRouterId(); + + /** + * Get the host and port of the router. + * @return Host and port of the router. + */ + String getClusterId(); + + /** + * Get the host and port of the router. + * @return Host and port of the router. + */ + String getBlockPoolId(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java new file mode 100644 index 00000000000..1e802564330 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java @@ -0,0 +1,673 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import static org.apache.hadoop.util.Time.now; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.function.ToIntFunction; +import java.util.function.ToLongFunction; +import java.util.stream.Collectors; + +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.StandardMBean; + +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.VersionInfo; +import org.codehaus.jettison.json.JSONObject; +import org.eclipse.jetty.util.ajax.JSON; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of the Router metrics collector. + */ +public class FederationMetrics implements FederationMBean { + + private static final Logger LOG = + LoggerFactory.getLogger(FederationMetrics.class); + + /** Format for a date. */ + private static final String DATE_FORMAT = "yyyy/MM/dd HH:mm:ss"; + + + /** Router interface. */ + private final Router router; + + /** FederationState JMX bean. */ + private ObjectName beanName; + + /** Resolve the namenode for each namespace. */ + private final ActiveNamenodeResolver namenodeResolver; + + /** State store. */ + private final StateStoreService stateStore; + /** Membership state store. */ + private MembershipStore membershipStore; + /** Mount table store. */ + private MountTableStore mountTableStore; + + + public FederationMetrics(Router router) throws IOException { + this.router = router; + + try { + StandardMBean bean = new StandardMBean(this, FederationMBean.class); + this.beanName = MBeans.register("Router", "FederationState", bean); + LOG.info("Registered Router MBean: {}", this.beanName); + } catch (NotCompliantMBeanException e) { + throw new RuntimeException("Bad Router MBean setup", e); + } + + // Resolve namenode for each nameservice + this.namenodeResolver = this.router.getNamenodeResolver(); + + // State store interfaces + this.stateStore = this.router.getStateStore(); + if (this.stateStore == null) { + LOG.error("State store not available"); + } else { + this.membershipStore = stateStore.getRegisteredRecordStore( + MembershipStore.class); + this.mountTableStore = stateStore.getRegisteredRecordStore( + MountTableStore.class); + } + } + + /** + * Unregister the JMX beans. + */ + public void close() { + if (this.beanName != null) { + MBeans.unregister(beanName); + } + } + + @Override + public String getNamenodes() { + final Map> info = new LinkedHashMap<>(); + try { + // Get the values from the store + GetNamenodeRegistrationsRequest request = + GetNamenodeRegistrationsRequest.newInstance(); + GetNamenodeRegistrationsResponse response = + membershipStore.getNamenodeRegistrations(request); + + // Order the namenodes + final List namenodes = response.getNamenodeMemberships(); + if (namenodes == null || namenodes.size() == 0) { + return JSON.toString(info); + } + List namenodesOrder = new ArrayList<>(namenodes); + Collections.sort(namenodesOrder, MembershipState.NAME_COMPARATOR); + + // Dump namenodes information into JSON + for (MembershipState namenode : namenodesOrder) { + Map innerInfo = new HashMap<>(); + Map map = getJson(namenode); + innerInfo.putAll(map); + long dateModified = namenode.getDateModified(); + long lastHeartbeat = getSecondsSince(dateModified); + innerInfo.put("lastHeartbeat", lastHeartbeat); + MembershipStats stats = namenode.getStats(); + long used = stats.getTotalSpace() - stats.getAvailableSpace(); + innerInfo.put("used", used); + info.put(namenode.getNamenodeKey(), + Collections.unmodifiableMap(innerInfo)); + } + } catch (IOException e) { + LOG.error("Enable to fetch json representation of namenodes {}", + e.getMessage()); + return "{}"; + } + return JSON.toString(info); + } + + @Override + public String getNameservices() { + final Map> info = new LinkedHashMap<>(); + try { + final List namenodes = getActiveNamenodeRegistrations(); + List namenodesOrder = new ArrayList<>(namenodes); + Collections.sort(namenodesOrder, MembershipState.NAME_COMPARATOR); + + // Dump namenodes information into JSON + for (MembershipState namenode : namenodesOrder) { + Map innerInfo = new HashMap<>(); + Map map = getJson(namenode); + innerInfo.putAll(map); + long dateModified = namenode.getDateModified(); + long lastHeartbeat = getSecondsSince(dateModified); + innerInfo.put("lastHeartbeat", lastHeartbeat); + MembershipStats stats = namenode.getStats(); + long used = stats.getTotalSpace() - stats.getAvailableSpace(); + innerInfo.put("used", used); + info.put(namenode.getNamenodeKey(), + Collections.unmodifiableMap(innerInfo)); + } + } catch (IOException e) { + LOG.error("Cannot retrieve nameservices for JMX: {}", e.getMessage()); + return "{}"; + } + return JSON.toString(info); + } + + @Override + public String getMountTable() { + final List> info = new LinkedList<>(); + + try { + // Get all the mount points in order + GetMountTableEntriesRequest request = + GetMountTableEntriesRequest.newInstance("/"); + GetMountTableEntriesResponse response = + mountTableStore.getMountTableEntries(request); + final List mounts = response.getEntries(); + List orderedMounts = new ArrayList<>(mounts); + Collections.sort(orderedMounts, MountTable.SOURCE_COMPARATOR); + + // Dump mount table entries information into JSON + for (MountTable entry : orderedMounts) { + // Sumarize destinations + Set nameservices = new LinkedHashSet<>(); + Set paths = new LinkedHashSet<>(); + for (RemoteLocation location : entry.getDestinations()) { + nameservices.add(location.getNameserviceId()); + paths.add(location.getDest()); + } + + Map map = getJson(entry); + // We add some values with a cleaner format + map.put("dateCreated", getDateString(entry.getDateCreated())); + map.put("dateModified", getDateString(entry.getDateModified())); + + Map innerInfo = new HashMap<>(); + innerInfo.putAll(map); + innerInfo.put("nameserviceId", StringUtils.join(",", nameservices)); + innerInfo.put("path", StringUtils.join(",", paths)); + if (nameservices.size() > 1) { + innerInfo.put("order", entry.getDestOrder().toString()); + } else { + innerInfo.put("order", ""); + } + innerInfo.put("readonly", entry.isReadOnly()); + info.add(Collections.unmodifiableMap(innerInfo)); + } + } catch (IOException e) { + LOG.error( + "Cannot generate JSON of mount table from store: {}", e.getMessage()); + return "[]"; + } + return JSON.toString(info); + } + + @Override + public long getTotalCapacity() { + return getNameserviceAggregatedLong(MembershipStats::getTotalSpace); + } + + @Override + public long getRemainingCapacity() { + return getNameserviceAggregatedLong(MembershipStats::getAvailableSpace); + } + + @Override + public long getUsedCapacity() { + return getTotalCapacity() - getRemainingCapacity(); + } + + @Override + public int getNumNameservices() { + try { + Set nss = namenodeResolver.getNamespaces(); + return nss.size(); + } catch (IOException e) { + LOG.error( + "Cannot fetch number of expired registrations from the store: {}", + e.getMessage()); + return 0; + } + } + + @Override + public int getNumNamenodes() { + try { + GetNamenodeRegistrationsRequest request = + GetNamenodeRegistrationsRequest.newInstance(); + GetNamenodeRegistrationsResponse response = + membershipStore.getNamenodeRegistrations(request); + List memberships = response.getNamenodeMemberships(); + return memberships.size(); + } catch (IOException e) { + LOG.error("Cannot retrieve numNamenodes for JMX: {}", e.getMessage()); + return 0; + } + } + + @Override + public int getNumExpiredNamenodes() { + try { + GetNamenodeRegistrationsRequest request = + GetNamenodeRegistrationsRequest.newInstance(); + GetNamenodeRegistrationsResponse response = + membershipStore.getExpiredNamenodeRegistrations(request); + List expiredMemberships = + response.getNamenodeMemberships(); + return expiredMemberships.size(); + } catch (IOException e) { + LOG.error( + "Cannot retrieve numExpiredNamenodes for JMX: {}", e.getMessage()); + return 0; + } + } + + @Override + public int getNumLiveNodes() { + return getNameserviceAggregatedInt( + MembershipStats::getNumOfActiveDatanodes); + } + + @Override + public int getNumDeadNodes() { + return getNameserviceAggregatedInt(MembershipStats::getNumOfDeadDatanodes); + } + + @Override + public int getNumDecommissioningNodes() { + return getNameserviceAggregatedInt( + MembershipStats::getNumOfDecommissioningDatanodes); + } + + @Override + public int getNumDecomLiveNodes() { + return getNameserviceAggregatedInt( + MembershipStats::getNumOfDecomActiveDatanodes); + } + + @Override + public int getNumDecomDeadNodes() { + return getNameserviceAggregatedInt( + MembershipStats::getNumOfDecomDeadDatanodes); + } + + @Override // NameNodeMXBean + public String getNodeUsage() { + float median = 0; + float max = 0; + float min = 0; + float dev = 0; + + final Map> info = new HashMap<>(); + try { + RouterRpcServer rpcServer = this.router.getRpcServer(); + DatanodeInfo[] live = + rpcServer.getDatanodeReport(DatanodeReportType.LIVE); + + if (live.length > 0) { + float totalDfsUsed = 0; + float[] usages = new float[live.length]; + int i = 0; + for (DatanodeInfo dn : live) { + usages[i++] = dn.getDfsUsedPercent(); + totalDfsUsed += dn.getDfsUsedPercent(); + } + totalDfsUsed /= live.length; + Arrays.sort(usages); + median = usages[usages.length / 2]; + max = usages[usages.length - 1]; + min = usages[0]; + + for (i = 0; i < usages.length; i++) { + dev += (usages[i] - totalDfsUsed) * (usages[i] - totalDfsUsed); + } + dev = (float) Math.sqrt(dev / usages.length); + } + } catch (IOException e) { + LOG.info("Cannot get the live nodes: {}", e.getMessage()); + } + + final Map innerInfo = new HashMap<>(); + innerInfo.put("min", StringUtils.format("%.2f%%", min)); + innerInfo.put("median", StringUtils.format("%.2f%%", median)); + innerInfo.put("max", StringUtils.format("%.2f%%", max)); + innerInfo.put("stdDev", StringUtils.format("%.2f%%", dev)); + info.put("nodeUsage", innerInfo); + + return JSON.toString(info); + } + + @Override + public long getNumBlocks() { + return getNameserviceAggregatedLong(MembershipStats::getNumOfBlocks); + } + + @Override + public long getNumOfMissingBlocks() { + return getNameserviceAggregatedLong(MembershipStats::getNumOfBlocksMissing); + } + + @Override + public long getNumOfBlocksPendingReplication() { + return getNameserviceAggregatedLong( + MembershipStats::getNumOfBlocksPendingReplication); + } + + @Override + public long getNumOfBlocksUnderReplicated() { + return getNameserviceAggregatedLong( + MembershipStats::getNumOfBlocksUnderReplicated); + } + + @Override + public long getNumOfBlocksPendingDeletion() { + return getNameserviceAggregatedLong( + MembershipStats::getNumOfBlocksPendingDeletion); + } + + @Override + public long getNumFiles() { + return getNameserviceAggregatedLong(MembershipStats::getNumOfFiles); + } + + @Override + public String getRouterStarted() { + long startTime = this.router.getStartTime(); + return new Date(startTime).toString(); + } + + @Override + public String getVersion() { + return VersionInfo.getVersion() + ", r" + VersionInfo.getRevision(); + } + + @Override + public String getCompiledDate() { + return VersionInfo.getDate(); + } + + @Override + public String getCompileInfo() { + return VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from " + + VersionInfo.getBranch(); + } + + @Override + public String getHostAndPort() { + // TODO this should be the HTTP address + return "Unknown"; + } + + @Override + public String getRouterId() { + return this.router.getRouterId(); + } + + @Override + public String getClusterId() { + try { + Collection clusterIds = + getNamespaceInfo(FederationNamespaceInfo::getClusterId); + return clusterIds.toString(); + } catch (IOException e) { + LOG.error("Cannot fetch cluster ID metrics: {}", e.getMessage()); + return ""; + } + } + + @Override + public String getBlockPoolId() { + try { + Collection blockpoolIds = + getNamespaceInfo(FederationNamespaceInfo::getBlockPoolId); + return blockpoolIds.toString(); + } catch (IOException e) { + LOG.error("Cannot fetch block pool ID metrics: {}", e.getMessage()); + return ""; + } + } + + /** + * Build a set of unique values found in all namespaces. + * + * @param f Method reference of the appropriate FederationNamespaceInfo + * getter function + * @return Set of unique string values found in all discovered namespaces. + * @throws IOException if the query could not be executed. + */ + private Collection getNamespaceInfo( + Function f) throws IOException { + GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance(); + GetNamespaceInfoResponse response = + membershipStore.getNamespaceInfo(request); + return response.getNamespaceInfo().stream() + .map(f) + .collect(Collectors.toSet()); + } + + /** + * Get the aggregated value for a method for all nameservices. + * @param f Method reference + * @return Aggregated integer. + */ + private int getNameserviceAggregatedInt(ToIntFunction f) { + try { + return getActiveNamenodeRegistrations().stream() + .map(MembershipState::getStats) + .collect(Collectors.summingInt(f)); + } catch (IOException e) { + LOG.error("Unable to extract metrics: {}", e.getMessage()); + return 0; + } + } + + /** + * Get the aggregated value for a method for all nameservices. + * @param f Method reference + * @return Aggregated long. + */ + private long getNameserviceAggregatedLong(ToLongFunction f) { + try { + return getActiveNamenodeRegistrations().stream() + .map(MembershipState::getStats) + .collect(Collectors.summingLong(f)); + } catch (IOException e) { + LOG.error("Unable to extract metrics: {}", e.getMessage()); + return 0; + } + } + + /** + * Fetches the most active namenode memberships for all known nameservices. + * The fetched membership may not or may not be active. Excludes expired + * memberships. + * @throws IOException if the query could not be performed. + * @return List of the most active NNs from each known nameservice. + */ + private List getActiveNamenodeRegistrations() + throws IOException { + + List resultList = new ArrayList<>(); + GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance(); + GetNamespaceInfoResponse response = + membershipStore.getNamespaceInfo(request); + for (FederationNamespaceInfo nsInfo : response.getNamespaceInfo()) { + // Fetch the most recent namenode registration + String nsId = nsInfo.getNameserviceId(); + List nns = + namenodeResolver.getNamenodesForNameserviceId(nsId); + if (nns != null) { + FederationNamenodeContext nn = nns.get(0); + if (nn != null && nn instanceof MembershipState) { + resultList.add((MembershipState) nn); + } + } + } + return resultList; + } + + /** + * Get time as a date string. + * @param time Seconds since 1970. + * @return String representing the date. + */ + private static String getDateString(long time) { + if (time <= 0) { + return "-"; + } + Date date = new Date(time); + + SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); + return sdf.format(date); + } + + /** + * Get the number of seconds passed since a date. + * + * @param timeMs to use as a reference. + * @return Seconds since the date. + */ + private static long getSecondsSince(long timeMs) { + if (timeMs < 0) { + return -1; + } + return (now() - timeMs) / 1000; + } + + /** + * Get JSON for this record. + * + * @return Map representing the data for the JSON representation. + */ + private static Map getJson(BaseRecord record) { + Map json = new HashMap<>(); + Map> fields = getFields(record); + + for (String fieldName : fields.keySet()) { + if (!fieldName.equalsIgnoreCase("proto")) { + try { + Object value = getField(record, fieldName); + if (value instanceof BaseRecord) { + BaseRecord recordField = (BaseRecord) value; + json.putAll(getJson(recordField)); + } else { + json.put(fieldName, value == null ? JSONObject.NULL : value); + } + } catch (Exception e) { + throw new IllegalArgumentException( + "Cannot serialize field " + fieldName + " into JSON"); + } + } + } + return json; + } + + /** + * Returns all serializable fields in the object. + * + * @return Map with the fields. + */ + private static Map> getFields(BaseRecord record) { + Map> getters = new HashMap<>(); + for (Method m : record.getClass().getDeclaredMethods()) { + if (m.getName().startsWith("get")) { + try { + Class type = m.getReturnType(); + char[] c = m.getName().substring(3).toCharArray(); + c[0] = Character.toLowerCase(c[0]); + String key = new String(c); + getters.put(key, type); + } catch (Exception e) { + LOG.error("Cannot execute getter {} on {}", m.getName(), record); + } + } + } + return getters; + } + + /** + * Fetches the value for a field name. + * + * @param fieldName the legacy name of the field. + * @return The field data or null if not found. + */ + private static Object getField(BaseRecord record, String fieldName) { + Object result = null; + Method m = locateGetter(record, fieldName); + if (m != null) { + try { + result = m.invoke(record); + } catch (Exception e) { + LOG.error("Cannot get field {} on {}", fieldName, record); + } + } + return result; + } + + /** + * Finds the appropriate getter for a field name. + * + * @param fieldName The legacy name of the field. + * @return The matching getter or null if not found. + */ + private static Method locateGetter(BaseRecord record, String fieldName) { + for (Method m : record.getClass().getMethods()) { + if (m.getName().equalsIgnoreCase("get" + fieldName)) { + return m; + } + } + return null; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java new file mode 100644 index 00000000000..00209e96784 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * JMX interface for the RPC server. + * TODO use the default RPC MBean. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface FederationRPCMBean { + + long getProxyOps(); + + double getProxyAvg(); + + long getProcessingOps(); + + double getProcessingAvg(); + + long getProxyOpFailureCommunicate(); + + long getProxyOpFailureStandby(); + + long getProxyOpNotImplemented(); + + long getRouterFailureStateStoreOps(); + + long getRouterFailureReadOnlyOps(); + + long getRouterFailureLockedOps(); + + long getRouterFailureSafemodeOps(); + + int getRpcServerCallQueue(); + + /** + * Get the number of RPC connections between the clients and the Router. + * @return Number of RPC connections between the clients and the Router. + */ + int getRpcServerNumOpenConnections(); + + /** + * Get the number of RPC connections between the Router and the NNs. + * @return Number of RPC connections between the Router and the NNs. + */ + int getRpcClientNumConnections(); + + /** + * Get the number of active RPC connections between the Router and the NNs. + * @return Number of active RPC connections between the Router and the NNs. + */ + int getRpcClientNumActiveConnections(); + + /** + * Get the number of RPC connections to be created. + * @return Number of RPC connections to be created. + */ + int getRpcClientNumCreatingConnections(); + + /** + * Get the number of connection pools between the Router and a NNs. + * @return Number of connection pools between the Router and a NNs. + */ + int getRpcClientNumConnectionPools(); + + /** + * JSON representation of the RPC connections from the Router to the NNs. + * @return JSON string representation. + */ + String getRpcClientConnections(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java new file mode 100644 index 00000000000..427bca28961 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; +import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableRate; + +/** + * Implementation of the RPC metrics collector. + */ +@Metrics(name = "RouterRPCActivity", about = "Router RPC Activity", + context = "router") +public class FederationRPCMetrics implements FederationRPCMBean { + + private final MetricsRegistry registry = new MetricsRegistry("router"); + + private RouterRpcServer rpcServer; + + @Metric("Time for the router to process an operation internally") + private MutableRate processing; + @Metric("Number of operations the Router processed internally") + private MutableCounterLong processingOp; + @Metric("Time for the Router to proxy an operation to the Namenodes") + private MutableRate proxy; + @Metric("Number of operations the Router proxied to a Namenode") + private MutableCounterLong proxyOp; + + @Metric("Number of operations to fail to reach NN") + private MutableCounterLong proxyOpFailureStandby; + @Metric("Number of operations to hit a standby NN") + private MutableCounterLong proxyOpFailureCommunicate; + @Metric("Number of operations not implemented") + private MutableCounterLong proxyOpNotImplemented; + + @Metric("Failed requests due to State Store unavailable") + private MutableCounterLong routerFailureStateStore; + @Metric("Failed requests due to read only mount point") + private MutableCounterLong routerFailureReadOnly; + @Metric("Failed requests due to locked path") + private MutableCounterLong routerFailureLocked; + @Metric("Failed requests due to safe mode") + private MutableCounterLong routerFailureSafemode; + + public FederationRPCMetrics(Configuration conf, RouterRpcServer rpcServer) { + this.rpcServer = rpcServer; + + registry.tag(SessionId, "RouterRPCSession"); + registry.tag(ProcessName, "Router"); + } + + public static FederationRPCMetrics create(Configuration conf, + RouterRpcServer rpcServer) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register(FederationRPCMetrics.class.getName(), + "HDFS Federation RPC Metrics", + new FederationRPCMetrics(conf, rpcServer)); + } + + /** + * Convert nanoseconds to milliseconds. + * @param ns Time in nanoseconds. + * @return Time in milliseconds. + */ + private static double toMs(double ns) { + return ns / 1000000; + } + + /** + * Reset the metrics system. + */ + public static void reset() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.unregisterSource(FederationRPCMetrics.class.getName()); + } + + public void incrProxyOpFailureStandby() { + proxyOpFailureStandby.incr(); + } + + @Override + public long getProxyOpFailureStandby() { + return proxyOpFailureStandby.value(); + } + + public void incrProxyOpFailureCommunicate() { + proxyOpFailureCommunicate.incr(); + } + + @Override + public long getProxyOpFailureCommunicate() { + return proxyOpFailureCommunicate.value(); + } + + + public void incrProxyOpNotImplemented() { + proxyOpNotImplemented.incr(); + } + + @Override + public long getProxyOpNotImplemented() { + return proxyOpNotImplemented.value(); + } + + public void incrRouterFailureStateStore() { + routerFailureStateStore.incr(); + } + + @Override + public long getRouterFailureStateStoreOps() { + return routerFailureStateStore.value(); + } + + public void incrRouterFailureSafemode() { + routerFailureSafemode.incr(); + } + + @Override + public long getRouterFailureSafemodeOps() { + return routerFailureSafemode.value(); + } + + public void incrRouterFailureReadOnly() { + routerFailureReadOnly.incr(); + } + + @Override + public long getRouterFailureReadOnlyOps() { + return routerFailureReadOnly.value(); + } + + public void incrRouterFailureLocked() { + routerFailureLocked.incr(); + } + + @Override + public long getRouterFailureLockedOps() { + return routerFailureLocked.value(); + } + + @Override + public int getRpcServerCallQueue() { + return rpcServer.getServer().getCallQueueLen(); + } + + @Override + public int getRpcServerNumOpenConnections() { + return rpcServer.getServer().getNumOpenConnections(); + } + + @Override + public int getRpcClientNumConnections() { + return rpcServer.getRPCClient().getNumConnections(); + } + + @Override + public int getRpcClientNumActiveConnections() { + return rpcServer.getRPCClient().getNumActiveConnections(); + } + + @Override + public int getRpcClientNumCreatingConnections() { + return rpcServer.getRPCClient().getNumCreatingConnections(); + } + + @Override + public int getRpcClientNumConnectionPools() { + return rpcServer.getRPCClient().getNumConnectionPools(); + } + + @Override + public String getRpcClientConnections() { + return rpcServer.getRPCClient().getJSON(); + } + + /** + * Add the time to proxy an operation from the moment the Router sends it to + * the Namenode until it replied. + * @param time Proxy time of an operation in nanoseconds. + */ + public void addProxyTime(long time) { + proxy.add(time); + proxyOp.incr(); + } + + @Override + public double getProxyAvg() { + return toMs(proxy.lastStat().mean()); + } + + @Override + public long getProxyOps() { + return proxyOp.value(); + } + + /** + * Add the time to process a request in the Router from the time we receive + * the call until we send it to the Namenode. + * @param time Process time of an operation in nanoseconds. + */ + public void addProcessingTime(long time) { + processing.add(time); + processingOp.incr(); + } + + @Override + public double getProcessingAvg() { + return toMs(processing.lastStat().mean()); + } + + @Override + public long getProcessingOps() { + return processingOp.value(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java new file mode 100644 index 00000000000..e3a16b55c44 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.StandardMBean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.metrics2.util.MBeans; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Customizable RPC performance monitor. Receives events from the RPC server + * and aggregates them via JMX. + */ +public class FederationRPCPerformanceMonitor implements RouterRpcMonitor { + + private static final Logger LOG = + LoggerFactory.getLogger(FederationRPCPerformanceMonitor.class); + + + /** Time for an operation to be received in the Router. */ + private static final ThreadLocal START_TIME = new ThreadLocal<>(); + /** Time for an operation to be send to the Namenode. */ + private static final ThreadLocal PROXY_TIME = new ThreadLocal<>(); + + /** Configuration for the performance monitor. */ + private Configuration conf; + /** RPC server for the Router. */ + private RouterRpcServer server; + /** State Store. */ + private StateStoreService store; + + /** JMX interface to monitor the RPC metrics. */ + private FederationRPCMetrics metrics; + private ObjectName registeredBean; + + /** Thread pool for logging stats. */ + private ExecutorService executor; + + + @Override + public void init(Configuration configuration, RouterRpcServer rpcServer, + StateStoreService stateStore) { + + this.conf = configuration; + this.server = rpcServer; + this.store = stateStore; + + // Create metrics + this.metrics = FederationRPCMetrics.create(conf, server); + + // Create thread pool + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("Federation RPC Performance Monitor-%d").build(); + this.executor = Executors.newFixedThreadPool(1, threadFactory); + + // Adding JMX interface + try { + StandardMBean bean = + new StandardMBean(this.metrics, FederationRPCMBean.class); + registeredBean = MBeans.register("Router", "FederationRPC", bean); + LOG.info("Registered FederationRPCMBean: {}", registeredBean); + } catch (NotCompliantMBeanException e) { + throw new RuntimeException("Bad FederationRPCMBean setup", e); + } + } + + @Override + public void close() { + if (registeredBean != null) { + MBeans.unregister(registeredBean); + registeredBean = null; + } + if (this.executor != null) { + this.executor.shutdown(); + } + } + + /** + * Resets all RPC service performance counters to their defaults. + */ + public void resetPerfCounters() { + if (registeredBean != null) { + MBeans.unregister(registeredBean); + registeredBean = null; + } + if (metrics != null) { + FederationRPCMetrics.reset(); + metrics = null; + } + init(conf, server, store); + } + + @Override + public void startOp() { + START_TIME.set(this.getNow()); + } + + @Override + public long proxyOp() { + PROXY_TIME.set(this.getNow()); + long processingTime = getProcessingTime(); + if (processingTime >= 0) { + metrics.addProcessingTime(processingTime); + } + return Thread.currentThread().getId(); + } + + @Override + public void proxyOpComplete(boolean success) { + if (success) { + long proxyTime = getProxyTime(); + if (proxyTime >= 0) { + metrics.addProxyTime(proxyTime); + } + } + } + + @Override + public void proxyOpFailureStandby() { + metrics.incrProxyOpFailureStandby(); + } + + @Override + public void proxyOpFailureCommunicate() { + metrics.incrProxyOpFailureCommunicate(); + } + + @Override + public void proxyOpNotImplemented() { + metrics.incrProxyOpNotImplemented(); + } + + @Override + public void routerFailureStateStore() { + metrics.incrRouterFailureStateStore(); + } + + @Override + public void routerFailureSafemode() { + metrics.incrRouterFailureSafemode(); + } + + @Override + public void routerFailureReadOnly() { + metrics.incrRouterFailureReadOnly(); + } + + @Override + public void routerFailureLocked() { + metrics.incrRouterFailureLocked(); + } + + /** + * Get current time. + * @return Current time in nanoseconds. + */ + private long getNow() { + return System.nanoTime(); + } + + /** + * Get time between we receiving the operation and sending it to the Namenode. + * @return Processing time in nanoseconds. + */ + private long getProcessingTime() { + if (START_TIME.get() != null && START_TIME.get() > 0 && + PROXY_TIME.get() != null && PROXY_TIME.get() > 0) { + return PROXY_TIME.get() - START_TIME.get(); + } + return -1; + } + + /** + * Get time between now and when the operation was forwarded to the Namenode. + * @return Current proxy time in nanoseconds. + */ + private long getProxyTime() { + if (PROXY_TIME.get() != null && PROXY_TIME.get() > 0) { + return getNow() - PROXY_TIME.get(); + } + return -1; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java new file mode 100644 index 00000000000..23cd67563dc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java @@ -0,0 +1,624 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import static org.apache.hadoop.util.Time.now; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.StandardMBean; + +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse; +import org.apache.hadoop.hdfs.server.namenode.NameNodeMXBean; +import org.apache.hadoop.hdfs.server.namenode.NameNodeStatusMXBean; +import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean; +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.VersionInfo; +import org.eclipse.jetty.util.ajax.JSON; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Expose the Namenode metrics as the Router was one. + */ +public class NamenodeBeanMetrics + implements FSNamesystemMBean, NameNodeMXBean, NameNodeStatusMXBean { + + private static final Logger LOG = + LoggerFactory.getLogger(NamenodeBeanMetrics.class); + + private final Router router; + + /** FSNamesystem bean. */ + private ObjectName fsBeanName; + /** FSNamesystemState bean. */ + private ObjectName fsStateBeanName; + /** NameNodeInfo bean. */ + private ObjectName nnInfoBeanName; + /** NameNodeStatus bean. */ + private ObjectName nnStatusBeanName; + + + public NamenodeBeanMetrics(Router router) { + this.router = router; + + try { + // TODO this needs to be done with the Metrics from FSNamesystem + StandardMBean bean = new StandardMBean(this, FSNamesystemMBean.class); + this.fsBeanName = MBeans.register("NameNode", "FSNamesystem", bean); + LOG.info("Registered FSNamesystem MBean: {}", this.fsBeanName); + } catch (NotCompliantMBeanException e) { + throw new RuntimeException("Bad FSNamesystem MBean setup", e); + } + + try { + StandardMBean bean = new StandardMBean(this, FSNamesystemMBean.class); + this.fsStateBeanName = + MBeans.register("NameNode", "FSNamesystemState", bean); + LOG.info("Registered FSNamesystemState MBean: {}", this.fsStateBeanName); + } catch (NotCompliantMBeanException e) { + throw new RuntimeException("Bad FSNamesystemState MBean setup", e); + } + + try { + StandardMBean bean = new StandardMBean(this, NameNodeMXBean.class); + this.nnInfoBeanName = MBeans.register("NameNode", "NameNodeInfo", bean); + LOG.info("Registered NameNodeInfo MBean: {}", this.nnInfoBeanName); + } catch (NotCompliantMBeanException e) { + throw new RuntimeException("Bad NameNodeInfo MBean setup", e); + } + + try { + StandardMBean bean = new StandardMBean(this, NameNodeStatusMXBean.class); + this.nnStatusBeanName = + MBeans.register("NameNode", "NameNodeStatus", bean); + LOG.info("Registered NameNodeStatus MBean: {}", this.nnStatusBeanName); + } catch (NotCompliantMBeanException e) { + throw new RuntimeException("Bad NameNodeStatus MBean setup", e); + } + } + + /** + * De-register the JMX interfaces. + */ + public void close() { + if (fsStateBeanName != null) { + MBeans.unregister(fsStateBeanName); + fsStateBeanName = null; + } + if (nnInfoBeanName != null) { + MBeans.unregister(nnInfoBeanName); + nnInfoBeanName = null; + } + // Remove the NameNode status bean + if (nnStatusBeanName != null) { + MBeans.unregister(nnStatusBeanName); + nnStatusBeanName = null; + } + } + + private FederationMetrics getFederationMetrics() { + return this.router.getMetrics(); + } + + ///////////////////////////////////////////////////////// + // NameNodeMXBean + ///////////////////////////////////////////////////////// + + @Override + public String getVersion() { + return VersionInfo.getVersion() + ", r" + VersionInfo.getRevision(); + } + + @Override + public String getSoftwareVersion() { + return VersionInfo.getVersion(); + } + + @Override + public long getUsed() { + return getFederationMetrics().getUsedCapacity(); + } + + @Override + public long getFree() { + return getFederationMetrics().getRemainingCapacity(); + } + + @Override + public long getTotal() { + return getFederationMetrics().getTotalCapacity(); + } + + @Override + public String getSafemode() { + // We assume that the global federated view is never in safe mode + return ""; + } + + @Override + public boolean isUpgradeFinalized() { + // We assume the upgrade is always finalized in a federated biew + return true; + } + + @Override + public RollingUpgradeInfo.Bean getRollingUpgradeStatus() { + return null; + } + + @Override + public long getNonDfsUsedSpace() { + return 0; + } + + @Override + public float getPercentUsed() { + return DFSUtilClient.getPercentUsed(getCapacityUsed(), getCapacityTotal()); + } + + @Override + public float getPercentRemaining() { + return DFSUtilClient.getPercentUsed( + getCapacityRemaining(), getCapacityTotal()); + } + + @Override + public long getCacheUsed() { + return 0; + } + + @Override + public long getCacheCapacity() { + return 0; + } + + @Override + public long getBlockPoolUsedSpace() { + return 0; + } + + @Override + public float getPercentBlockPoolUsed() { + return 0; + } + + @Override + public long getTotalBlocks() { + return getFederationMetrics().getNumBlocks(); + } + + @Override + public long getNumberOfMissingBlocks() { + return getFederationMetrics().getNumOfMissingBlocks(); + } + + @Override + @Deprecated + public long getPendingReplicationBlocks() { + return getFederationMetrics().getNumOfBlocksPendingReplication(); + } + + @Override + public long getPendingReconstructionBlocks() { + return getFederationMetrics().getNumOfBlocksPendingReplication(); + } + + @Override + @Deprecated + public long getUnderReplicatedBlocks() { + return getFederationMetrics().getNumOfBlocksUnderReplicated(); + } + + @Override + public long getLowRedundancyBlocks() { + return getFederationMetrics().getNumOfBlocksUnderReplicated(); + } + + @Override + public long getPendingDeletionBlocks() { + return getFederationMetrics().getNumOfBlocksPendingDeletion(); + } + + @Override + public long getScheduledReplicationBlocks() { + return -1; + } + + @Override + public long getNumberOfMissingBlocksWithReplicationFactorOne() { + return 0; + } + + @Override + public String getCorruptFiles() { + return "N/A"; + } + + @Override + public int getThreads() { + return ManagementFactory.getThreadMXBean().getThreadCount(); + } + + @Override + public String getLiveNodes() { + return this.getNodes(DatanodeReportType.LIVE); + } + + @Override + public String getDeadNodes() { + return this.getNodes(DatanodeReportType.DEAD); + } + + @Override + public String getDecomNodes() { + return this.getNodes(DatanodeReportType.DECOMMISSIONING); + } + + /** + * Get all the nodes in the federation from a particular type. + * TODO this is expensive, we may want to cache it. + * @param type Type of the datanodes to check. + * @return JSON with the nodes. + */ + private String getNodes(DatanodeReportType type) { + final Map> info = new HashMap<>(); + try { + RouterRpcServer rpcServer = this.router.getRpcServer(); + DatanodeInfo[] datanodes = rpcServer.getDatanodeReport(type); + for (DatanodeInfo node : datanodes) { + Map innerinfo = new HashMap<>(); + innerinfo.put("infoAddr", node.getInfoAddr()); + innerinfo.put("infoSecureAddr", node.getInfoSecureAddr()); + innerinfo.put("xferaddr", node.getXferAddr()); + innerinfo.put("location", node.getNetworkLocation()); + innerinfo.put("lastContact", getLastContact(node)); + innerinfo.put("usedSpace", node.getDfsUsed()); + innerinfo.put("adminState", node.getAdminState().toString()); + innerinfo.put("nonDfsUsedSpace", node.getNonDfsUsed()); + innerinfo.put("capacity", node.getCapacity()); + innerinfo.put("numBlocks", -1); // node.numBlocks() + innerinfo.put("version", (node.getSoftwareVersion() == null ? + "UNKNOWN" : node.getSoftwareVersion())); + innerinfo.put("used", node.getDfsUsed()); + innerinfo.put("remaining", node.getRemaining()); + innerinfo.put("blockScheduled", -1); // node.getBlocksScheduled() + innerinfo.put("blockPoolUsed", node.getBlockPoolUsed()); + innerinfo.put("blockPoolUsedPercent", node.getBlockPoolUsedPercent()); + innerinfo.put("volfails", -1); // node.getVolumeFailures() + info.put(node.getHostName() + ":" + node.getXferPort(), + Collections.unmodifiableMap(innerinfo)); + } + } catch (StandbyException e) { + LOG.error("Cannot get {} nodes, Router in safe mode", type); + } catch (IOException e) { + LOG.error("Cannot get " + type + " nodes", e); + } + return JSON.toString(info); + } + + @Override + public String getClusterId() { + try { + return getNamespaceInfo(FederationNamespaceInfo::getClusterId).toString(); + } catch (IOException e) { + LOG.error("Cannot fetch cluster ID metrics {}", e.getMessage()); + return ""; + } + } + + @Override + public String getBlockPoolId() { + try { + return + getNamespaceInfo(FederationNamespaceInfo::getBlockPoolId).toString(); + } catch (IOException e) { + LOG.error("Cannot fetch block pool ID metrics {}", e.getMessage()); + return ""; + } + } + + /** + * Build a set of unique values found in all namespaces. + * + * @param f Method reference of the appropriate FederationNamespaceInfo + * getter function + * @return Set of unique string values found in all discovered namespaces. + * @throws IOException if the query could not be executed. + */ + private Collection getNamespaceInfo( + Function f) throws IOException { + StateStoreService stateStore = router.getStateStore(); + MembershipStore membershipStore = + stateStore.getRegisteredRecordStore(MembershipStore.class); + + GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance(); + GetNamespaceInfoResponse response = + membershipStore.getNamespaceInfo(request); + return response.getNamespaceInfo().stream() + .map(f) + .collect(Collectors.toSet()); + } + + @Override + public String getNameDirStatuses() { + return "N/A"; + } + + @Override + public String getNodeUsage() { + return "N/A"; + } + + @Override + public String getNameJournalStatus() { + return "N/A"; + } + + @Override + public String getJournalTransactionInfo() { + return "N/A"; + } + + @Override + public long getNNStartedTimeInMillis() { + return this.router.getStartTime(); + } + + @Override + public String getCompileInfo() { + return VersionInfo.getDate() + " by " + VersionInfo.getUser() + + " from " + VersionInfo.getBranch(); + } + + @Override + public int getDistinctVersionCount() { + return 0; + } + + @Override + public Map getDistinctVersions() { + return null; + } + + ///////////////////////////////////////////////////////// + // FSNamesystemMBean + ///////////////////////////////////////////////////////// + + @Override + public String getFSState() { + // We assume is not in safe mode + return "Operational"; + } + + @Override + public long getBlocksTotal() { + return this.getTotalBlocks(); + } + + @Override + public long getCapacityTotal() { + return this.getTotal(); + } + + @Override + public long getCapacityRemaining() { + return this.getFree(); + } + + @Override + public long getCapacityUsed() { + return this.getUsed(); + } + + @Override + public long getFilesTotal() { + return getFederationMetrics().getNumFiles(); + } + + @Override + public int getTotalLoad() { + return -1; + } + + @Override + public int getNumLiveDataNodes() { + return this.router.getMetrics().getNumLiveNodes(); + } + + @Override + public int getNumDeadDataNodes() { + return this.router.getMetrics().getNumDeadNodes(); + } + + @Override + public int getNumStaleDataNodes() { + return -1; + } + + @Override + public int getNumDecomLiveDataNodes() { + return this.router.getMetrics().getNumDecomLiveNodes(); + } + + @Override + public int getNumDecomDeadDataNodes() { + return this.router.getMetrics().getNumDecomDeadNodes(); + } + + @Override + public int getNumDecommissioningDataNodes() { + return this.router.getMetrics().getNumDecommissioningNodes(); + } + + @Override + public int getNumInMaintenanceLiveDataNodes() { + return 0; + } + + @Override + public int getNumInMaintenanceDeadDataNodes() { + return 0; + } + + @Override + public int getNumEnteringMaintenanceDataNodes() { + return 0; + } + + @Override + public int getVolumeFailuresTotal() { + return 0; + } + + @Override + public long getEstimatedCapacityLostTotal() { + return 0; + } + + @Override + public String getSnapshotStats() { + return null; + } + + @Override + public long getMaxObjects() { + return 0; + } + + @Override + public long getBlockDeletionStartTime() { + return -1; + } + + @Override + public int getNumStaleStorages() { + return -1; + } + + @Override + public String getTopUserOpCounts() { + return "N/A"; + } + + @Override + public int getFsLockQueueLength() { + return 0; + } + + @Override + public long getTotalSyncCount() { + return 0; + } + + @Override + public String getTotalSyncTimes() { + return ""; + } + + private long getLastContact(DatanodeInfo node) { + return (now() - node.getLastUpdate()) / 1000; + } + + ///////////////////////////////////////////////////////// + // NameNodeStatusMXBean + ///////////////////////////////////////////////////////// + + @Override + public String getNNRole() { + return NamenodeRole.NAMENODE.toString(); + } + + @Override + public String getState() { + return HAServiceState.ACTIVE.toString(); + } + + @Override + public String getHostAndPort() { + return NetUtils.getHostPortString(router.getRpcServerAddress()); + } + + @Override + public boolean isSecurityEnabled() { + return false; + } + + @Override + public long getLastHATransitionTime() { + return 0; + } + + @Override + public long getBytesWithFutureGenerationStamps() { + return 0; + } + + @Override + public String getSlowPeersReport() { + return "N/A"; + } + + @Override + public String getSlowDisksReport() { + return "N/A"; + } + + @Override + public long getNumberOfSnapshottableDirs() { + return 0; + } + + @Override + public String getEnteringMaintenanceNodes() { + return "N/A"; + } + + @Override + public String getNameDirSize() { + return "N/A"; + } + + @Override + public int getNumEncryptionZones() { + return 0; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMBean.java new file mode 100644 index 00000000000..5e4ccabe608 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMBean.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * JMX interface for the State Store metrics. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface StateStoreMBean { + + long getReadOps(); + + double getReadAvg(); + + long getWriteOps(); + + double getWriteAvg(); + + long getFailureOps(); + + double getFailureAvg(); + + long getRemoveOps(); + + double getRemoveAvg(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java new file mode 100644 index 00000000000..c17eabcae2a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; +import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableRate; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Implementations of the JMX interface for the State Store metrics. + */ +@Metrics(name = "StateStoreActivity", about = "Router metrics", + context = "router") +public final class StateStoreMetrics implements StateStoreMBean { + + private final MetricsRegistry registry = new MetricsRegistry("router"); + + @Metric("GET transactions") + private MutableRate reads; + @Metric("PUT transactions") + private MutableRate writes; + @Metric("REMOVE transactions") + private MutableRate removes; + @Metric("Failed transactions") + private MutableRate failures; + + private Map cacheSizes; + + private StateStoreMetrics(Configuration conf) { + registry.tag(SessionId, "RouterSession"); + registry.tag(ProcessName, "Router"); + cacheSizes = new HashMap<>(); + } + + public static StateStoreMetrics create(Configuration conf) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register(new StateStoreMetrics(conf)); + } + + public void shutdown() { + DefaultMetricsSystem.shutdown(); + reset(); + } + + public void addRead(long latency) { + reads.add(latency); + } + + public long getReadOps() { + return reads.lastStat().numSamples(); + } + + public double getReadAvg() { + return reads.lastStat().mean(); + } + + public void addWrite(long latency) { + writes.add(latency); + } + + public long getWriteOps() { + return writes.lastStat().numSamples(); + } + + public double getWriteAvg() { + return writes.lastStat().mean(); + } + + public void addFailure(long latency) { + failures.add(latency); + } + + public long getFailureOps() { + return failures.lastStat().numSamples(); + } + + public double getFailureAvg() { + return failures.lastStat().mean(); + } + + public void addRemove(long latency) { + removes.add(latency); + } + + public long getRemoveOps() { + return removes.lastStat().numSamples(); + } + + public double getRemoveAvg() { + return removes.lastStat().mean(); + } + + /** + * Set the size of the cache for a State Store interface. + * + * @param name Name of the record to cache. + * @param size Number of records. + */ + public void setCacheSize(String name, int size) { + String counterName = "Cache" + name + "Size"; + MutableGaugeInt counter = cacheSizes.get(counterName); + if (counter == null) { + counter = registry.newGauge(counterName, name, size); + cacheSizes.put(counterName, counter); + } + counter.set(size); + } + + @VisibleForTesting + public void reset() { + reads.resetMinMax(); + writes.resetMinMax(); + removes.resetMinMax(); + failures.resetMinMax(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/package-info.java new file mode 100644 index 00000000000..c56c823cef6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/package-info.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Report metrics for Router-based Federation. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +package org.apache.hadoop.hdfs.server.federation.metrics; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java index d93d49833e4..543d964c245 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.TreeMap; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -35,6 +36,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; +import org.eclipse.jetty.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -282,6 +284,27 @@ public class ConnectionManager { return this.creatorQueue.size(); } + /** + * Get a JSON representation of the connection pool. + * + * @return JSON representation of all the connection pools. + */ + public String getJSON() { + final Map info = new TreeMap<>(); + readLock.lock(); + try { + for (Entry entry : + this.pools.entrySet()) { + ConnectionPoolId connectionPoolId = entry.getKey(); + ConnectionPool pool = entry.getValue(); + info.put(connectionPoolId.toString(), pool.getJSON()); + } + } finally { + readLock.unlock(); + } + return JSON.toString(info); + } + /** * Removes stale connections not accessed recently from the pool. This is * invoked periodically. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java index f76f621b841..ca113efa785 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java @@ -20,8 +20,10 @@ package org.apache.hadoop.hdfs.server.federation.router; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -46,6 +48,7 @@ import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; +import org.eclipse.jetty.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -257,6 +260,26 @@ public class ConnectionPool { return this.connectionPoolId.toString(); } + /** + * JSON representation of the connection pool. + * + * @return String representation of the JSON. + */ + public String getJSON() { + final Map info = new LinkedHashMap<>(); + info.put("active", Integer.toString(getNumActiveConnections())); + info.put("total", Integer.toString(getNumConnections())); + if (LOG.isDebugEnabled()) { + List tmpConnections = this.connections; + for (int i=0; i namenodeHearbeatServices; + /** Router metrics. */ + private RouterMetricsService metrics; + + /** JVM pauses (GC and others). */ + private JvmPauseMonitor pauseMonitor; + /** Usage string for help message. */ private static final String USAGE = "Usage: java Router"; @@ -174,18 +184,46 @@ public class Router extends CompositeService { } } + // Router metrics system + if (conf.getBoolean( + DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE, + DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE_DEFAULT)) { + + DefaultMetricsSystem.initialize("Router"); + + this.metrics = new RouterMetricsService(this); + addService(this.metrics); + + // JVM pause monitor + this.pauseMonitor = new JvmPauseMonitor(); + this.pauseMonitor.init(conf); + } + super.serviceInit(conf); } @Override protected void serviceStart() throws Exception { + if (this.pauseMonitor != null) { + this.pauseMonitor.start(); + JvmMetrics jvmMetrics = this.metrics.getJvmMetrics(); + if (jvmMetrics != null) { + jvmMetrics.setPauseMonitor(pauseMonitor); + } + } + super.serviceStart(); } @Override protected void serviceStop() throws Exception { + // JVM pause monitor + if (this.pauseMonitor != null) { + this.pauseMonitor.stop(); + } + super.serviceStop(); } @@ -418,6 +456,30 @@ public class Router extends CompositeService { return this.stateStore; } + /** + * Get the metrics system for the Router. + * + * @return Router metrics. + */ + public RouterMetrics getRouterMetrics() { + if (this.metrics != null) { + return this.metrics.getRouterMetrics(); + } + return null; + } + + /** + * Get the federation metrics. + * + * @return Federation metrics. + */ + public FederationMetrics getMetrics() { + if (this.metrics != null) { + return this.metrics.getFederationMetrics(); + } + return null; + } + /** * Get the subcluster resolver for files. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java new file mode 100644 index 00000000000..851538ad0a5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; +import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.source.JvmMetrics; + +/** + * This class is for maintaining the various Router activity statistics + * and publishing them through the metrics interfaces. + */ +@Metrics(name="RouterActivity", about="Router metrics", context="dfs") +public class RouterMetrics { + + private final MetricsRegistry registry = new MetricsRegistry("router"); + + @Metric("Duration in SafeMode at startup in msec") + private MutableGaugeInt safeModeTime; + + private JvmMetrics jvmMetrics = null; + + RouterMetrics( + String processName, String sessionId, final JvmMetrics jvmMetrics) { + this.jvmMetrics = jvmMetrics; + registry.tag(ProcessName, processName).tag(SessionId, sessionId); + } + + public static RouterMetrics create(Configuration conf) { + String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY); + String processName = "Router"; + MetricsSystem ms = DefaultMetricsSystem.instance(); + JvmMetrics jm = JvmMetrics.create(processName, sessionId, ms); + + return ms.register(new RouterMetrics(processName, sessionId, jm)); + } + + public JvmMetrics getJvmMetrics() { + return jvmMetrics; + } + + public void shutdown() { + DefaultMetricsSystem.shutdown(); + } + + public void setSafeModeTime(long elapsed) { + safeModeTime.set((int) elapsed); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java new file mode 100644 index 00000000000..f4debce3aae --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics; +import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics; +import org.apache.hadoop.metrics2.source.JvmMetrics; +import org.apache.hadoop.service.AbstractService; + +/** + * Service to manage the metrics of the Router. + */ +public class RouterMetricsService extends AbstractService { + + /** Router for this metrics. */ + private final Router router; + + /** Router metrics. */ + private RouterMetrics routerMetrics; + /** Federation metrics. */ + private FederationMetrics federationMetrics; + /** Namenode mock metrics. */ + private NamenodeBeanMetrics nnMetrics; + + + public RouterMetricsService(final Router router) { + super(RouterMetricsService.class.getName()); + this.router = router; + } + + @Override + protected void serviceInit(Configuration configuration) throws Exception { + this.routerMetrics = RouterMetrics.create(configuration); + } + + @Override + protected void serviceStart() throws Exception { + // Wrapper for all the FSNamesystem JMX interfaces + this.nnMetrics = new NamenodeBeanMetrics(this.router); + + // Federation MBean JMX interface + this.federationMetrics = new FederationMetrics(this.router); + } + + @Override + protected void serviceStop() throws Exception { + // Remove JMX interfaces + if (this.federationMetrics != null) { + this.federationMetrics.close(); + } + + // Remove Namenode JMX interfaces + if (this.nnMetrics != null) { + this.nnMetrics.close(); + } + + // Shutdown metrics + if (this.routerMetrics != null) { + this.routerMetrics.shutdown(); + } + } + + /** + * Get the metrics system for the Router. + * + * @return Router metrics. + */ + public RouterMetrics getRouterMetrics() { + return this.routerMetrics; + } + + /** + * Get the federation metrics. + * + * @return Federation metrics. + */ + public FederationMetrics getFederationMetrics() { + return this.federationMetrics; + } + + /** + * Get the JVM metrics for the Router. + * + * @return JVM metrics. + */ + public JvmMetrics getJvmMetrics() { + if (this.routerMetrics == null) { + return null; + } + return this.routerMetrics.getJvmMetrics(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 3a32be103bc..5c33c2e83fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -97,6 +97,8 @@ public class RouterRpcClient { private final ExecutorService executorService; /** Retry policy for router -> NN communication. */ private final RetryPolicy retryPolicy; + /** Optional perf monitor. */ + private final RouterRpcMonitor rpcMonitor; /** Pattern to parse a stack trace line. */ private static final Pattern STACK_TRACE_PATTERN = @@ -111,8 +113,7 @@ public class RouterRpcClient { * @param monitor Optional performance monitor. */ public RouterRpcClient(Configuration conf, String identifier, - ActiveNamenodeResolver resolver) { - + ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) { this.routerId = identifier; this.namenodeResolver = resolver; @@ -125,6 +126,8 @@ public class RouterRpcClient { .build(); this.executorService = Executors.newCachedThreadPool(threadFactory); + this.rpcMonitor = monitor; + int maxFailoverAttempts = conf.getInt( HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT); @@ -191,6 +194,15 @@ public class RouterRpcClient { return this.connectionManager.getNumCreatingConnections(); } + /** + * JSON representation of the connection pool. + * + * @return String representation of the JSON. + */ + public String getJSON() { + return this.connectionManager.getJSON(); + } + /** * Get ClientProtocol proxy client for a NameNode. Each combination of user + * NN must use a unique proxy client. Previously created clients are cached @@ -294,6 +306,9 @@ public class RouterRpcClient { } Object ret = null; + if (rpcMonitor != null) { + rpcMonitor.proxyOp(); + } boolean failover = false; Map ioes = new LinkedHashMap<>(); for (FederationNamenodeContext namenode : namenodes) { @@ -310,18 +325,31 @@ public class RouterRpcClient { InetSocketAddress address = client.getAddress(); namenodeResolver.updateActiveNamenode(nsId, address); } + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpComplete(true); + } return ret; } catch (IOException ioe) { ioes.put(namenode, ioe); if (ioe instanceof StandbyException) { // Fail over indicated by retry policy and/or NN + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpFailureStandby(); + } failover = true; } else if (ioe instanceof RemoteException) { + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpComplete(true); + } // RemoteException returned by NN throw (RemoteException) ioe; } else { // Other communication error, this is a failure // Communication retries are handled by the retry policy + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpFailureCommunicate(); + this.rpcMonitor.proxyOpComplete(false); + } throw ioe; } } finally { @@ -330,6 +358,9 @@ public class RouterRpcClient { } } } + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpComplete(false); + } // All namenodes were unavailable or in standby String msg = "No namenode available to invoke " + method.getName() + " " + @@ -746,6 +777,10 @@ public class RouterRpcClient { } } + if (rpcMonitor != null) { + rpcMonitor.proxyOp(); + } + try { List> futures = executorService.invokeAll(callables); Map results = new TreeMap<>(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java new file mode 100644 index 00000000000..d889a56433d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; + +/** + * Metrics and monitoring interface for the router RPC server. Allows pluggable + * diagnostics and monitoring services to be attached. + */ +public interface RouterRpcMonitor { + + /** + * Initialize the monitor. + * @param conf Configuration for the monitor. + * @param server RPC server. + * @param store State Store. + */ + void init( + Configuration conf, RouterRpcServer server, StateStoreService store); + + /** + * Close the monitor. + */ + void close(); + + /** + * Start processing an operation on the Router. + */ + void startOp(); + + /** + * Start proxying an operation to the Namenode. + * @return Id of the thread doing the proxying. + */ + long proxyOp(); + + /** + * Mark a proxy operation as completed. + * @param success If the operation was successful. + */ + void proxyOpComplete(boolean success); + + /** + * Failed to proxy an operation to a Namenode because it was in standby. + */ + void proxyOpFailureStandby(); + + /** + * Failed to proxy an operation to a Namenode because of an unexpected + * exception. + */ + void proxyOpFailureCommunicate(); + + /** + * Failed to proxy an operation because it is not implemented. + */ + void proxyOpNotImplemented(); + + /** + * If the Router cannot contact the State Store in an operation. + */ + void routerFailureStateStore(); + + /** + * If the Router is in safe mode. + */ + void routerFailureSafemode(); + + /** + * If a path is locked. + */ + void routerFailureLocked(); + + /** + * If a path is in a read only mount point. + */ + void routerFailureReadOnly(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index f9b4a5db54d..6aee1eea8bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -120,6 +120,7 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -159,6 +160,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { /** RPC clients to connect to the Namenodes. */ private final RouterRpcClient rpcClient; + /** Monitor metrics for the RPC calls. */ + private final RouterRpcMonitor rpcMonitor; + /** Interface to identify the active NN for a nameservice or blockpool ID. */ private final ActiveNamenodeResolver namenodeResolver; @@ -256,14 +260,28 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { this.rpcAddress = new InetSocketAddress( confRpcAddress.getHostName(), listenAddress.getPort()); + // Create metrics monitor + Class rpcMonitorClass = this.conf.getClass( + DFSConfigKeys.DFS_ROUTER_METRICS_CLASS, + DFSConfigKeys.DFS_ROUTER_METRICS_CLASS_DEFAULT, + RouterRpcMonitor.class); + this.rpcMonitor = ReflectionUtils.newInstance(rpcMonitorClass, conf); + // Create the client this.rpcClient = new RouterRpcClient(this.conf, this.router.getRouterId(), - this.namenodeResolver); + this.namenodeResolver, this.rpcMonitor); } @Override protected void serviceInit(Configuration configuration) throws Exception { this.conf = configuration; + + if (this.rpcMonitor == null) { + LOG.error("Cannot instantiate Router RPC metrics class"); + } else { + this.rpcMonitor.init(this.conf, this, this.router.getStateStore()); + } + super.serviceInit(configuration); } @@ -281,6 +299,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { if (this.rpcServer != null) { this.rpcServer.stop(); } + if (rpcMonitor != null) { + this.rpcMonitor.close(); + } super.serviceStop(); } @@ -293,6 +314,15 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { return rpcClient; } + /** + * Get the RPC monitor and metrics. + * + * @return RPC monitor and metrics. + */ + public RouterRpcMonitor getRPCMonitor() { + return rpcMonitor; + } + /** * Allow access to the client RPC server for testing. * @@ -330,6 +360,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { checkOperation(op); if (!supported) { + if (rpcMonitor != null) { + rpcMonitor.proxyOpNotImplemented(); + } String methodName = getMethodName(); throw new UnsupportedOperationException( "Operation \"" + methodName + "\" is not supported"); @@ -345,6 +378,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { * client requests. */ private void checkOperation(OperationCategory op) throws StandbyException { + // Log the function we are currently calling. + if (rpcMonitor != null) { + rpcMonitor.startOp(); + } // Log the function we are currently calling. if (LOG.isDebugEnabled()) { String methodName = getMethodName(); @@ -1912,16 +1949,22 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { */ private List getLocationsForPath( String path, boolean failIfLocked) throws IOException { - // Check the location for this path - final PathLocation location = - this.subclusterResolver.getDestinationForPath(path); - if (location == null) { - throw new IOException("Cannot find locations for " + path + " in " + - this.subclusterResolver); - } + try { + // Check the location for this path + final PathLocation location = + this.subclusterResolver.getDestinationForPath(path); + if (location == null) { + throw new IOException("Cannot find locations for " + path + " in " + + this.subclusterResolver); + } - // Log the access to a path - return location.getDestinations(); + return location.getDestinations(); + } catch (IOException ioe) { + if (this.rpcMonitor != null) { + this.rpcMonitor.routerFailureStateStore(); + } + throw ioe; + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java index 90a6699604a..fbece88fe6a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java @@ -26,6 +26,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; @@ -140,6 +141,13 @@ public abstract class CachedRecordStore writeLock.unlock(); } + // Update the metrics for the cache State Store size + StateStoreMetrics metrics = getDriver().getMetrics(); + if (metrics != null) { + String recordName = getRecordClass().getSimpleName(); + metrics.setCacheSize(recordName, this.records.size()); + } + lastUpdate = Time.monotonicNow(); } return true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java index 3aa3ffd394b..0289ba6c06b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java @@ -25,15 +25,23 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.StandardMBean; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMBean; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl; import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.metrics2.MetricsException; +import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; @@ -87,6 +95,8 @@ public class StateStoreService extends CompositeService { /** Service to maintain data store connection. */ private StateStoreConnectionMonitorService monitorService; + /** StateStore metrics. */ + private StateStoreMetrics metrics; /** Supported record stores. */ private final Map< @@ -152,6 +162,21 @@ public class StateStoreService extends CompositeService { this.cacheUpdater = new StateStoreCacheUpdateService(this); addService(this.cacheUpdater); + // Create metrics for the State Store + this.metrics = StateStoreMetrics.create(conf); + + // Adding JMX interface + try { + StandardMBean bean = new StandardMBean(metrics, StateStoreMBean.class); + ObjectName registeredObject = + MBeans.register("Router", "StateStore", bean); + LOG.info("Registered StateStoreMBean: {}", registeredObject); + } catch (NotCompliantMBeanException e) { + throw new RuntimeException("Bad StateStoreMBean setup", e); + } catch (MetricsException e) { + LOG.info("Failed to register State Store bean {}", e.getMessage()); + } + super.serviceInit(this.conf); } @@ -165,6 +190,11 @@ public class StateStoreService extends CompositeService { protected void serviceStop() throws Exception { closeDriver(); + if (metrics != null) { + metrics.shutdown(); + metrics = null; + } + super.serviceStop(); } @@ -228,7 +258,8 @@ public class StateStoreService extends CompositeService { synchronized (this.driver) { if (!isDriverReady()) { String driverName = this.driver.getClass().getSimpleName(); - if (this.driver.init(conf, getIdentifier(), getSupportedRecords())) { + if (this.driver.init( + conf, getIdentifier(), getSupportedRecords(), metrics)) { LOG.info("Connection to the State Store driver {} is open and ready", driverName); this.refreshCaches(); @@ -398,4 +429,13 @@ public class StateStoreService extends CompositeService { throw new IOException("Registered cache was not found for " + clazz); } + /** + * Get the metrics for the State Store. + * + * @return State Store metrics. + */ + public StateStoreMetrics getMetrics() { + return metrics; + } + } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java index 90111bfce9b..3ebab0bda9a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java @@ -21,6 +21,7 @@ import java.net.InetAddress; import java.util.Collection; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; @@ -46,6 +47,9 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { /** Identifier for the driver. */ private String identifier; + /** State Store metrics. */ + private StateStoreMetrics metrics; + /** * Initialize the state store connection. @@ -56,10 +60,12 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { * @return If initialized and ready, false if failed to initialize driver. */ public boolean init(final Configuration config, final String id, - final Collection> records) { + final Collection> records, + final StateStoreMetrics stateStoreMetrics) { this.conf = config; this.identifier = id; + this.metrics = stateStoreMetrics; if (this.identifier == null) { LOG.warn("The identifier for the State Store connection is not set"); @@ -100,6 +106,15 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { return this.identifier; } + /** + * Get the metrics for the State Store. + * + * @return State Store metrics. + */ + public StateStoreMetrics getMetrics() { + return this.metrics; + } + /** * Prepare the driver to access data storage. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java index e2038faab26..7bc93de84bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collection; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; @@ -41,8 +42,9 @@ public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl { @Override public boolean init(final Configuration config, final String id, - final Collection> records) { - boolean ret = super.init(config, id, records); + final Collection> records, + final StateStoreMetrics metrics) { + boolean ret = super.init(config, id, records, metrics); this.serializer = StateStoreSerializer.getSerializer(config); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java index ddcd537040b..97c821e0e07 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.federation.store.driver.impl; import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordName; import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath; +import static org.apache.hadoop.util.Time.monotonicNow; import java.io.IOException; import java.util.ArrayList; @@ -123,6 +124,7 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { public QueryResult get(Class clazz, String sub) throws IOException { verifyDriverReady(); + long start = monotonicNow(); List ret = new ArrayList<>(); String znode = getZNodeForClass(clazz); try { @@ -157,11 +159,14 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { } } } catch (Exception e) { + getMetrics().addFailure(monotonicNow() - start); String msg = "Cannot get children for \"" + znode + "\": " + e.getMessage(); LOG.error(msg); throw new IOException(msg); } + long end = monotonicNow(); + getMetrics().addRead(end - start); return new QueryResult(ret, getTime()); } @@ -178,6 +183,7 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { Class recordClass = record0.getClass(); String znode = getZNodeForClass(recordClass); + long start = monotonicNow(); boolean status = true; for (T record : records) { String primaryKey = getPrimaryKey(record); @@ -187,6 +193,12 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { status = false; } } + long end = monotonicNow(); + if (status) { + getMetrics().addWrite(end - start); + } else { + getMetrics().addFailure(end - start); + } return status; } @@ -199,12 +211,14 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { } // Read the current data + long start = monotonicNow(); List records = null; try { QueryResult result = get(clazz); records = result.getRecords(); } catch (IOException ex) { LOG.error("Cannot get existing records", ex); + getMetrics().addFailure(monotonicNow() - start); return 0; } @@ -226,14 +240,20 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { } } catch (Exception e) { LOG.error("Cannot remove \"{}\"", existingRecord, e); + getMetrics().addFailure(monotonicNow() - start); } } + long end = monotonicNow(); + if (removed > 0) { + getMetrics().addRemove(end - start); + } return removed; } @Override public boolean removeAll(Class clazz) throws IOException { + long start = monotonicNow(); boolean status = true; String znode = getZNodeForClass(clazz); LOG.info("Deleting all children under {}", znode); @@ -248,6 +268,12 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { LOG.error("Cannot remove {}: {}", znode, e.getMessage()); status = false; } + long time = monotonicNow() - start; + if (status) { + getMetrics().addRemove(time); + } else { + getMetrics().addFailure(time); + } return status; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java index ab0ff0ab4d8..ac0b22e748d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java @@ -154,7 +154,7 @@ public abstract class MembershipState extends BaseRecord public abstract void setStats(MembershipStats stats); - public abstract MembershipStats getStats() throws IOException; + public abstract MembershipStats getStats(); public abstract void setLastContact(long contact); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java index 16f2b8ba20c..0a3f19db105 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.federation.store.records; import java.io.IOException; +import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -46,6 +47,28 @@ public abstract class MountTable extends BaseRecord { private static final Logger LOG = LoggerFactory.getLogger(MountTable.class); + /** Comparator for paths which considers the /. */ + public static final Comparator PATH_COMPARATOR = + new Comparator() { + @Override + public int compare(String o1, String o2) { + String s1 = o1.replace('/', ' '); + String s2 = o2.replace('/', ' '); + return s1.compareTo(s2); + } + }; + + /** Comparator based on the mount table source. */ + public static final Comparator SOURCE_COMPARATOR = + new Comparator() { + public int compare(MountTable m1, MountTable m2) { + String src1 = m1.getSourcePath(); + String src2 = m2.getSourcePath(); + return PATH_COMPARATOR.compare(src1, src2); + } + }; + + /** * Default constructor for a mount table entry. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java index 805c2afc472..614957b16c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java @@ -288,7 +288,7 @@ public class MembershipStatePBImpl extends MembershipState implements PBRecord { } @Override - public MembershipStats getStats() throws IOException { + public MembershipStats getStats() { NamenodeMembershipStatsRecordProto statsProto = this.translator.getProtoOrBuilder().getStats(); MembershipStats stats = @@ -298,7 +298,8 @@ public class MembershipStatePBImpl extends MembershipState implements PBRecord { statsPB.setProto(statsProto); return statsPB; } else { - throw new IOException("Cannot get stats for the membership"); + throw new IllegalArgumentException( + "Cannot get stats for the membership"); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 7f421413767..96f17afe200 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4745,7 +4745,24 @@ - + + dfs.federation.router.metrics.enable + true + + If the metrics in the router are enabled. + + + + + dfs.federation.router.metrics.class + org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor + + Class to monitor the RPC system in the router. It must implement the + RouterRpcMonitor interface. + + + + dfs.federation.router.admin.enable true diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java index bbb548caff2..0c017631726 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java @@ -26,12 +26,18 @@ import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; +import java.lang.management.ManagementFactory; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Date; import java.util.List; import java.util.Random; +import javax.management.JMX; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -135,6 +141,13 @@ public final class FederationTestUtils { return Math.abs(d1.getTime() - d2.getTime()) < precision; } + public static T getBean(String name, Class obj) + throws MalformedObjectNameException { + MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + ObjectName poolName = new ObjectName(name); + return JMX.newMXBeanProxy(mBeanServer, poolName, obj); + } + public static boolean addDirectory(FileSystem context, String path) throws IOException { context.mkdirs(new Path(path), new FsPermission("777")); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java index cac5e6b6078..58ca1d182d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java @@ -32,6 +32,7 @@ public class RouterConfigBuilder { private boolean enableHeartbeat = false; private boolean enableLocalHeartbeat = false; private boolean enableStateStore = false; + private boolean enableMetrics = false; public RouterConfigBuilder(Configuration configuration) { this.conf = configuration; @@ -47,6 +48,7 @@ public class RouterConfigBuilder { this.enableHeartbeat = true; this.enableLocalHeartbeat = true; this.enableStateStore = true; + this.enableMetrics = true; return this; } @@ -75,6 +77,11 @@ public class RouterConfigBuilder { return this; } + public RouterConfigBuilder metrics(boolean enable) { + this.enableMetrics = enable; + return this; + } + public RouterConfigBuilder rpc() { return this.rpc(true); } @@ -91,6 +98,10 @@ public class RouterConfigBuilder { return this.stateStore(true); } + public RouterConfigBuilder metrics() { + return this.metrics(true); + } + public Configuration build() { conf.setBoolean(DFSConfigKeys.DFS_ROUTER_STORE_ENABLE, this.enableStateStore); @@ -101,6 +112,8 @@ public class RouterConfigBuilder { this.enableHeartbeat); conf.setBoolean(DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE, this.enableLocalHeartbeat); + conf.setBoolean(DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE, + this.enableMetrics); return conf; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java new file mode 100644 index 00000000000..d6a194fca05 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getBean; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import javax.management.MalformedObjectNameException; + +import org.apache.commons.collections.ListUtils; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.junit.Test; + +/** + * Test the JMX interface for the {@link Router}. + */ +public class TestFederationMetrics extends TestMetricsBase { + + public static final String FEDERATION_BEAN = + "Hadoop:service=Router,name=FederationState"; + public static final String STATE_STORE_BEAN = + "Hadoop:service=Router,name=StateStore"; + public static final String RPC_BEAN = + "Hadoop:service=Router,name=FederationRPC"; + + @Test + public void testClusterStatsJMX() + throws MalformedObjectNameException, IOException { + + FederationMBean bean = getBean(FEDERATION_BEAN, FederationMBean.class); + validateClusterStatsBean(bean); + } + + @Test + public void testClusterStatsDataSource() throws IOException { + FederationMetrics metrics = getRouter().getMetrics(); + validateClusterStatsBean(metrics); + } + + @Test + public void testMountTableStatsDataSource() + throws IOException, JSONException { + + FederationMetrics metrics = getRouter().getMetrics(); + String jsonString = metrics.getMountTable(); + JSONArray jsonArray = new JSONArray(jsonString); + assertEquals(jsonArray.length(), getMockMountTable().size()); + + int match = 0; + for (int i = 0; i < jsonArray.length(); i++) { + JSONObject json = jsonArray.getJSONObject(i); + String src = json.getString("sourcePath"); + + for (MountTable entry : getMockMountTable()) { + if (entry.getSourcePath().equals(src)) { + assertEquals(entry.getDefaultLocation().getNameserviceId(), + json.getString("nameserviceId")); + assertEquals(entry.getDefaultLocation().getDest(), + json.getString("path")); + assertNotNullAndNotEmpty(json.getString("dateCreated")); + assertNotNullAndNotEmpty(json.getString("dateModified")); + match++; + } + } + } + assertEquals(match, getMockMountTable().size()); + } + + private MembershipState findMockNamenode(String nsId, String nnId) { + + @SuppressWarnings("unchecked") + List namenodes = + ListUtils.union(getActiveMemberships(), getStandbyMemberships()); + for (MembershipState nn : namenodes) { + if (nn.getNamenodeId().equals(nnId) + && nn.getNameserviceId().equals(nsId)) { + return nn; + } + } + return null; + } + + @Test + public void testNamenodeStatsDataSource() throws IOException, JSONException { + + FederationMetrics metrics = getRouter().getMetrics(); + String jsonString = metrics.getNamenodes(); + JSONObject jsonObject = new JSONObject(jsonString); + Iterator keys = jsonObject.keys(); + int nnsFound = 0; + while (keys.hasNext()) { + // Validate each entry against our mocks + JSONObject json = jsonObject.getJSONObject((String) keys.next()); + String nameserviceId = json.getString("nameserviceId"); + String namenodeId = json.getString("namenodeId"); + + MembershipState mockEntry = + this.findMockNamenode(nameserviceId, namenodeId); + assertNotNull(mockEntry); + + assertEquals(json.getString("state"), mockEntry.getState().toString()); + MembershipStats stats = mockEntry.getStats(); + assertEquals(json.getLong("numOfActiveDatanodes"), + stats.getNumOfActiveDatanodes()); + assertEquals(json.getLong("numOfDeadDatanodes"), + stats.getNumOfDeadDatanodes()); + assertEquals(json.getLong("numOfDecommissioningDatanodes"), + stats.getNumOfDecommissioningDatanodes()); + assertEquals(json.getLong("numOfDecomActiveDatanodes"), + stats.getNumOfDecomActiveDatanodes()); + assertEquals(json.getLong("numOfDecomDeadDatanodes"), + stats.getNumOfDecomDeadDatanodes()); + assertEquals(json.getLong("numOfBlocks"), stats.getNumOfBlocks()); + assertEquals(json.getString("rpcAddress"), mockEntry.getRpcAddress()); + assertEquals(json.getString("webAddress"), mockEntry.getWebAddress()); + nnsFound++; + } + // Validate all memberships are present + assertEquals(getActiveMemberships().size() + getStandbyMemberships().size(), + nnsFound); + } + + @Test + public void testNameserviceStatsDataSource() + throws IOException, JSONException { + + FederationMetrics metrics = getRouter().getMetrics(); + String jsonString = metrics.getNameservices(); + JSONObject jsonObject = new JSONObject(jsonString); + Iterator keys = jsonObject.keys(); + int nameservicesFound = 0; + while (keys.hasNext()) { + JSONObject json = jsonObject.getJSONObject((String) keys.next()); + String nameserviceId = json.getString("nameserviceId"); + String namenodeId = json.getString("namenodeId"); + + MembershipState mockEntry = + this.findMockNamenode(nameserviceId, namenodeId); + assertNotNull(mockEntry); + + // NS should report the active NN + assertEquals(mockEntry.getState().toString(), json.getString("state")); + assertEquals("ACTIVE", json.getString("state")); + + // Stats in the NS should reflect the stats for the most active NN + MembershipStats stats = mockEntry.getStats(); + assertEquals(stats.getNumOfFiles(), json.getLong("numOfFiles")); + assertEquals(stats.getTotalSpace(), json.getLong("totalSpace")); + assertEquals(stats.getAvailableSpace(), + json.getLong("availableSpace")); + assertEquals(stats.getNumOfBlocksMissing(), + json.getLong("numOfBlocksMissing")); + assertEquals(stats.getNumOfActiveDatanodes(), + json.getLong("numOfActiveDatanodes")); + assertEquals(stats.getNumOfDeadDatanodes(), + json.getLong("numOfDeadDatanodes")); + assertEquals(stats.getNumOfDecommissioningDatanodes(), + json.getLong("numOfDecommissioningDatanodes")); + assertEquals(stats.getNumOfDecomActiveDatanodes(), + json.getLong("numOfDecomActiveDatanodes")); + assertEquals(stats.getNumOfDecomDeadDatanodes(), + json.getLong("numOfDecomDeadDatanodes")); + nameservicesFound++; + } + assertEquals(getNameservices().size(), nameservicesFound); + } + + private void assertNotNullAndNotEmpty(String field) { + assertNotNull(field); + assertTrue(field.length() > 0); + } + + private void validateClusterStatsBean(FederationMBean bean) + throws IOException { + + // Determine aggregates + long numBlocks = 0; + long numLive = 0; + long numDead = 0; + long numDecom = 0; + long numDecomLive = 0; + long numDecomDead = 0; + long numFiles = 0; + for (MembershipState mock : getActiveMemberships()) { + MembershipStats stats = mock.getStats(); + numBlocks += stats.getNumOfBlocks(); + numLive += stats.getNumOfActiveDatanodes(); + numDead += stats.getNumOfDeadDatanodes(); + numDecom += stats.getNumOfDecommissioningDatanodes(); + numDecomLive += stats.getNumOfDecomActiveDatanodes(); + numDecomDead += stats.getNumOfDecomDeadDatanodes(); + } + + assertEquals(numBlocks, bean.getNumBlocks()); + assertEquals(numLive, bean.getNumLiveNodes()); + assertEquals(numDead, bean.getNumDeadNodes()); + assertEquals(numDecom, bean.getNumDecommissioningNodes()); + assertEquals(numDecomLive, bean.getNumDecomLiveNodes()); + assertEquals(numDecomDead, bean.getNumDecomDeadNodes()); + assertEquals(numFiles, bean.getNumFiles()); + assertEquals(getActiveMemberships().size() + getStandbyMemberships().size(), + bean.getNumNamenodes()); + assertEquals(getNameservices().size(), bean.getNumNameservices()); + assertTrue(bean.getVersion().length() > 0); + assertTrue(bean.getCompiledDate().length() > 0); + assertTrue(bean.getCompileInfo().length() > 0); + assertTrue(bean.getRouterStarted().length() > 0); + assertTrue(bean.getHostAndPort().length() > 0); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java new file mode 100644 index 00000000000..bbcfbe82885 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearAllRecords; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockMountTable; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockRegistrationForNamenode; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.junit.After; +import org.junit.Before; + +/** + * Test the basic metrics functionality. + */ +public class TestMetricsBase { + + private StateStoreService stateStore; + private MembershipStore membershipStore; + private Router router; + private Configuration routerConfig; + + private List activeMemberships; + private List standbyMemberships; + private List mockMountTable; + private List nameservices; + + @Before + public void setupBase() throws Exception { + + if (router == null) { + routerConfig = new RouterConfigBuilder() + .stateStore() + .metrics() + .build(); + router = new Router(); + router.init(routerConfig); + router.setRouterId("routerId"); + router.start(); + stateStore = router.getStateStore(); + + membershipStore = + stateStore.getRegisteredRecordStore(MembershipStore.class); + + // Read all data and load all caches + waitStateStore(stateStore, 10000); + createFixtures(); + stateStore.refreshCaches(true); + Thread.sleep(1000); + } + } + + @After + public void tearDownBase() throws IOException { + if (router != null) { + router.stop(); + router.close(); + router = null; + } + } + + private void createFixtures() throws IOException { + // Clear all records + clearAllRecords(stateStore); + + nameservices = new ArrayList<>(); + nameservices.add(NAMESERVICES[0]); + nameservices.add(NAMESERVICES[1]); + + // 2 NNs per NS + activeMemberships = new ArrayList<>(); + standbyMemberships = new ArrayList<>(); + + for (String nameservice : nameservices) { + MembershipState namenode1 = createMockRegistrationForNamenode( + nameservice, NAMENODES[0], FederationNamenodeServiceState.ACTIVE); + NamenodeHeartbeatRequest request1 = + NamenodeHeartbeatRequest.newInstance(namenode1); + assertTrue(membershipStore.namenodeHeartbeat(request1).getResult()); + activeMemberships.add(namenode1); + + MembershipState namenode2 = createMockRegistrationForNamenode( + nameservice, NAMENODES[1], FederationNamenodeServiceState.STANDBY); + NamenodeHeartbeatRequest request2 = + NamenodeHeartbeatRequest.newInstance(namenode2); + assertTrue(membershipStore.namenodeHeartbeat(request2).getResult()); + standbyMemberships.add(namenode2); + } + + // Add 2 mount table memberships + mockMountTable = createMockMountTable(nameservices); + synchronizeRecords(stateStore, mockMountTable, MountTable.class); + } + + protected Router getRouter() { + return router; + } + + protected List getMockMountTable() { + return mockMountTable; + } + + protected List getActiveMemberships() { + return activeMemberships; + } + + protected List getStandbyMemberships() { + return standbyMemberships; + } + + protected List getNameservices() { + return nameservices; + } + + protected StateStoreService getStateStore() { + return stateStore; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java index 2074d3d0bac..e22be842e60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java @@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.federation.MockResolver; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.service.Service.STATE; import org.junit.After; import org.junit.AfterClass; @@ -46,15 +48,19 @@ public class TestRouter { public static void create() throws IOException { // Basic configuration without the state store conf = new Configuration(); + // 1 sec cache refresh + conf.setInt(DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, 1); // Mock resolver classes - conf.set(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, - MockResolver.class.getCanonicalName()); - conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, - MockResolver.class.getCanonicalName()); + conf.setClass(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, + MockResolver.class, ActiveNamenodeResolver.class); + conf.setClass(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + MockResolver.class, FileSubclusterResolver.class); // Bind to any available port conf.set(DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0"); conf.set(DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY, "0.0.0.0"); // Simulate a co-located NN conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns0"); @@ -95,9 +101,18 @@ public class TestRouter { @Test public void testRouterService() throws InterruptedException, IOException { + // Admin only + testRouterStartup(new RouterConfigBuilder(conf).admin().build()); + // Rpc only testRouterStartup(new RouterConfigBuilder(conf).rpc().build()); + // Metrics only + testRouterStartup(new RouterConfigBuilder(conf).metrics().build()); + + // Statestore only + testRouterStartup(new RouterConfigBuilder(conf).stateStore().build()); + // Heartbeat only testRouterStartup(new RouterConfigBuilder(conf).heartbeat().build()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java index 65e763bf18a..01fe1499943 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java @@ -35,6 +35,7 @@ import java.util.Map.Entry; import java.util.Random; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; @@ -377,6 +378,74 @@ public class TestStateStoreDriverBase { testFetchErrors(driver, MountTable.class); } + public void testMetrics(StateStoreDriver driver) + throws IOException, IllegalArgumentException, IllegalAccessException { + + MountTable insertRecord = + this.generateFakeRecord(MountTable.class); + + // Put single + StateStoreMetrics metrics = stateStore.getMetrics(); + assertEquals(0, metrics.getWriteOps()); + driver.put(insertRecord, true, false); + assertEquals(1, metrics.getWriteOps()); + + // Put multiple + metrics.reset(); + assertEquals(0, metrics.getWriteOps()); + driver.put(insertRecord, true, false); + assertEquals(1, metrics.getWriteOps()); + + // Get Single + metrics.reset(); + assertEquals(0, metrics.getReadOps()); + + final String querySourcePath = insertRecord.getSourcePath(); + MountTable partial = MountTable.newInstance(); + partial.setSourcePath(querySourcePath); + final Query query = new Query<>(partial); + driver.get(MountTable.class, query); + assertEquals(1, metrics.getReadOps()); + + // GetAll + metrics.reset(); + assertEquals(0, metrics.getReadOps()); + driver.get(MountTable.class); + assertEquals(1, metrics.getReadOps()); + + // GetMultiple + metrics.reset(); + assertEquals(0, metrics.getReadOps()); + driver.getMultiple(MountTable.class, query); + assertEquals(1, metrics.getReadOps()); + + // Insert fails + metrics.reset(); + assertEquals(0, metrics.getFailureOps()); + driver.put(insertRecord, false, true); + assertEquals(1, metrics.getFailureOps()); + + // Remove single + metrics.reset(); + assertEquals(0, metrics.getRemoveOps()); + driver.remove(insertRecord); + assertEquals(1, metrics.getRemoveOps()); + + // Remove multiple + metrics.reset(); + driver.put(insertRecord, true, false); + assertEquals(0, metrics.getRemoveOps()); + driver.remove(MountTable.class, query); + assertEquals(1, metrics.getRemoveOps()); + + // Remove all + metrics.reset(); + driver.put(insertRecord, true, false); + assertEquals(0, metrics.getRemoveOps()); + driver.removeAll(MountTable.class); + assertEquals(1, metrics.getRemoveOps()); + } + /** * Sets the value of a field on the object. *