diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 74ae7220705..b132775e303 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -98,6 +98,7 @@ import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*; import static org.apache.hadoop.util.Time.now; import static org.apache.hadoop.util.Time.monotonicNow; +import static org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics.TOPMETRICS_METRICS_SOURCE_NAME; import java.io.BufferedWriter; import java.io.ByteArrayInputStream; @@ -976,6 +977,11 @@ private List initAuditLoggers(Configuration conf) { // Add audit logger to calculate top users if (topConf.isEnabled) { topMetrics = new TopMetrics(conf, topConf.nntopReportingPeriodsMs); + if (DefaultMetricsSystem.instance().getSource( + TOPMETRICS_METRICS_SOURCE_NAME) == null) { + DefaultMetricsSystem.instance().register(TOPMETRICS_METRICS_SOURCE_NAME, + "Top N operations by user", topMetrics); + } auditLoggers.add(new TopAuditLogger(topMetrics)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java index ab553928862..2719c8857ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java @@ -17,24 +17,32 @@ */ package org.apache.hadoop.hdfs.server.namenode.top.metrics; -import java.net.InetAddress; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - import com.google.common.collect.Lists; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.namenode.top.TopConf; import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager; +import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op; +import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.User; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.lib.Interns; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.TopWindow; /** @@ -58,8 +66,11 @@ * Thread-safe: relies on thread-safety of RollingWindowManager */ @InterfaceAudience.Private -public class TopMetrics { +public class TopMetrics implements MetricsSource { public static final Logger LOG = LoggerFactory.getLogger(TopMetrics.class); + public static final String TOPMETRICS_METRICS_SOURCE_NAME = + "NNTopUserOpCounts"; + private final boolean isMetricsSourceEnabled; private static void logConf(Configuration conf) { LOG.info("NNTop conf: " + DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY + @@ -83,6 +94,8 @@ public TopMetrics(Configuration conf, int[] reportingPeriods) { rollingWindowManagers.put(reportingPeriods[i], new RollingWindowManager( conf, reportingPeriods[i])); } + isMetricsSourceEnabled = conf.getBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY, + DFSConfigKeys.NNTOP_ENABLED_DEFAULT); } /** @@ -128,4 +141,44 @@ public void report(long currTime, String userName, String cmd) { TopConf.ALL_CMDS, userName, 1); } } + + /** + * Flatten out the top window metrics into + * {@link org.apache.hadoop.metrics2.MetricsRecord}s for consumption by + * external metrics systems. Each metrics record added corresponds to the + * reporting period a.k.a window length of the configured rolling windows. + */ + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + if (!isMetricsSourceEnabled) { + return; + } + + for (final TopWindow window : getTopWindows()) { + MetricsRecordBuilder rb = collector.addRecord(buildOpRecordName(window)) + .setContext("dfs"); + for (final Op op: window.getOps()) { + rb.addCounter(buildOpTotalCountMetricsInfo(op), op.getTotalCount()); + for (User user : op.getTopUsers()) { + rb.addCounter(buildOpRecordMetricsInfo(op, user), user.getCount()); + } + } + } + } + + private String buildOpRecordName(TopWindow window) { + return TOPMETRICS_METRICS_SOURCE_NAME + ".windowMs=" + + window.getWindowLenMs(); + } + + private MetricsInfo buildOpTotalCountMetricsInfo(Op op) { + return Interns.info("op=" + StringUtils.deleteWhitespace(op.getOpType()) + + ".TotalCount", "Total operation count"); + } + + private MetricsInfo buildOpRecordMetricsInfo(Op op, User user) { + return Interns.info("op=" + StringUtils.deleteWhitespace(op.getOpType()) + + ".user=" + user.getUser() + + ".count", "Total operations performed by user"); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestTopMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestTopMetrics.java new file mode 100644 index 00000000000..4d3a4f030e2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestTopMetrics.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.metrics; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.namenode.top.TopConf; +import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.lib.Interns; +import org.junit.Test; + +import static org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics.TOPMETRICS_METRICS_SOURCE_NAME; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Test for MetricsSource part of the {@link TopMetrics} impl. + */ +public class TestTopMetrics { + @Test + public void testPresence() { + Configuration conf = new Configuration(); + TopConf topConf = new TopConf(conf); + TopMetrics topMetrics = new TopMetrics(conf, + topConf.nntopReportingPeriodsMs); + // Dummy command + topMetrics.report("test", "listStatus"); + topMetrics.report("test", "listStatus"); + topMetrics.report("test", "listStatus"); + + MetricsRecordBuilder rb = getMetrics(topMetrics); + MetricsCollector mc = rb.parent(); + + verify(mc).addRecord(TOPMETRICS_METRICS_SOURCE_NAME + ".windowMs=60000"); + verify(mc).addRecord(TOPMETRICS_METRICS_SOURCE_NAME + ".windowMs=300000"); + verify(mc).addRecord(TOPMETRICS_METRICS_SOURCE_NAME + ".windowMs=1500000"); + + verify(rb, times(3)).addCounter(Interns.info("op=listStatus.TotalCount", + "Total operation count"), 3L); + verify(rb, times(3)).addCounter(Interns.info("op=*.TotalCount", + "Total operation count"), 3L); + + verify(rb, times(3)).addCounter(Interns.info("op=listStatus." + + "user=test.count", "Total operations performed by user"), 3L); + } +}