From fa7b9248e415c04bb555772f44fadaf8d9f34974 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Fri, 12 Dec 2014 17:04:33 -0800 Subject: [PATCH] HDFS-7426. Change nntop JMX format to be a JSON blob. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hdfs/server/namenode/FSNamesystem.java | 38 ++- .../namenode/metrics/FSNamesystemMBean.java | 7 + .../server/namenode/top/TopAuditLogger.java | 20 +- .../hdfs/server/namenode/top/TopConf.java | 31 +-- .../namenode/top/metrics/TopMetrics.java | 216 ++++------------- .../top/window/RollingWindowManager.java | 225 ++++++++++++------ .../namenode/TestFSNamesystemMBean.java | 2 + .../server/namenode/TestNameNodeMXBean.java | 116 +++++++++ .../namenode/metrics/TestNameNodeMetrics.java | 59 ----- .../top/window/TestRollingWindowManager.java | 63 ++--- 11 files changed, 419 insertions(+), 360 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index eeedb0d8333..9dfecc1c5b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -455,6 +455,8 @@ Release 2.7.0 - UNRELEASED HDFS-7509. Avoid resolving path multiple times. (jing9) + HDFS-7426. Change nntop JMX format to be a JSON blob. (wang) + OPTIMIZATIONS HDFS-7454. Reduce memory footprint for AclEntries in NameNode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index b4b897a186b..1ac19fb30b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -120,6 +120,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -241,6 +242,7 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType; import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger; import org.apache.hadoop.hdfs.server.namenode.top.TopConf; import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics; +import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager; import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -281,6 +283,7 @@ import org.apache.hadoop.util.VersionInfo; import org.apache.log4j.Appender; import org.apache.log4j.AsyncAppender; import org.apache.log4j.Logger; +import org.codehaus.jackson.map.ObjectMapper; import org.mortbay.util.ajax.JSON; import com.google.common.annotations.VisibleForTesting; @@ -539,6 +542,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, private final FSImage fsImage; + private final TopConf topConf; + private TopMetrics topMetrics; + /** * Notify that loading of this FSDirectory is complete, and * it is imageLoaded for use @@ -842,6 +848,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, this.snapshotManager = new SnapshotManager(dir); this.cacheManager = new CacheManager(this, conf, blockManager); this.safeMode = new SafeModeInfo(conf); + this.topConf = new TopConf(conf); this.auditLoggers = initAuditLoggers(conf); this.isDefaultAuditLogger = auditLoggers.size() == 1 && auditLoggers.get(0) instanceof DefaultAuditLogger; @@ -952,13 +959,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } // Add audit logger to calculate top users - if (conf.getBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY, - DFSConfigKeys.NNTOP_ENABLED_DEFAULT)) { - String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY); - TopConf nntopConf = new TopConf(conf); - TopMetrics.initSingleton(conf, NamenodeRole.NAMENODE.name(), sessionId, - nntopConf.nntopReportingPeriodsMs); - auditLoggers.add(new TopAuditLogger()); + if (topConf.isEnabled) { + topMetrics = new TopMetrics(conf, topConf.nntopReportingPeriodsMs); + auditLoggers.add(new TopAuditLogger(topMetrics)); } return Collections.unmodifiableList(auditLoggers); @@ -6013,6 +6016,27 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return getBlockManager().getDatanodeManager().getNumStaleStorages(); } + @Override // FSNamesystemMBean + public String getTopUserOpCounts() { + if (!topConf.isEnabled) { + return null; + } + + Date now = new Date(); + final List topWindows = + topMetrics.getTopWindows(); + Map topMap = new TreeMap(); + topMap.put("windows", topWindows); + topMap.put("timestamp", DFSUtil.dateToIso8601String(now)); + ObjectMapper mapper = new ObjectMapper(); + try { + return mapper.writeValueAsString(topMap); + } catch (IOException e) { + LOG.warn("Failed to fetch TopUser metrics", e); + } + return null; + } + /** * Increments, logs and then returns the stamp */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java index 708591b45ea..86f4bd624ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java @@ -164,4 +164,11 @@ public interface FSNamesystemMBean { */ public int getNumStaleStorages(); + /** + * Returns a nested JSON object listing the top users for different RPC + * operations over tracked time windows. + * + * @return JSON string + */ + public String getTopUserOpCounts(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java index 4f26b171c83..49c91536215 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopAuditLogger.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode.top; import java.net.InetAddress; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -36,6 +37,14 @@ import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics; public class TopAuditLogger implements AuditLogger { public static final Logger LOG = LoggerFactory.getLogger(TopAuditLogger.class); + private final TopMetrics topMetrics; + + public TopAuditLogger(TopMetrics topMetrics) { + Preconditions.checkNotNull(topMetrics, "Cannot init with a null " + + "TopMetrics"); + this.topMetrics = topMetrics; + } + @Override public void initialize(Configuration conf) { } @@ -43,12 +52,11 @@ public class TopAuditLogger implements AuditLogger { @Override public void logAuditEvent(boolean succeeded, String userName, InetAddress addr, String cmd, String src, String dst, FileStatus status) { - - TopMetrics instance = TopMetrics.getInstance(); - if (instance != null) { - instance.report(succeeded, userName, addr, cmd, src, dst, status); - } else { - LOG.error("TopMetrics is not initialized yet!"); + try { + topMetrics.report(succeeded, userName, addr, cmd, src, dst, status); + } catch (Throwable t) { + LOG.error("An error occurred while reflecting the event in top service, " + + "event: (cmd={},userName={})", cmd, userName); } if (LOG.isDebugEnabled()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java index 0f4ebac5fd1..ba820323b20 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/TopConf.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hdfs.server.namenode.top; +import java.util.concurrent.TimeUnit; + +import com.google.common.primitives.Ints; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -27,34 +30,34 @@ import com.google.common.base.Preconditions; */ @InterfaceAudience.Private public final class TopConf { + /** + * Whether TopMetrics are enabled + */ + public final boolean isEnabled; - public static final String TOP_METRICS_REGISTRATION_NAME = "topusers"; - public static final String TOP_METRICS_RECORD_NAME = "topparam"; /** - * A meta command representing the total number of commands + * A meta command representing the total number of calls to all commands */ - public static final String CMD_TOTAL = "total"; - /** - * A meta user representing all users - */ - public static String ALL_USERS = "ALL"; + public static final String ALL_CMDS = "*"; /** * nntop reporting periods in milliseconds */ - public final long[] nntopReportingPeriodsMs; + public final int[] nntopReportingPeriodsMs; public TopConf(Configuration conf) { + isEnabled = conf.getBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY, + DFSConfigKeys.NNTOP_ENABLED_DEFAULT); String[] periodsStr = conf.getTrimmedStrings( DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY, DFSConfigKeys.NNTOP_WINDOWS_MINUTES_DEFAULT); - nntopReportingPeriodsMs = new long[periodsStr.length]; + nntopReportingPeriodsMs = new int[periodsStr.length]; for (int i = 0; i < periodsStr.length; i++) { - nntopReportingPeriodsMs[i] = Integer.parseInt(periodsStr[i]) * - 60L * 1000L; //min to ms + nntopReportingPeriodsMs[i] = Ints.checkedCast( + TimeUnit.MINUTES.toMillis(Integer.parseInt(periodsStr[i]))); } - for (long aPeriodMs: nntopReportingPeriodsMs) { - Preconditions.checkArgument(aPeriodMs >= 60L * 1000L, + for (int aPeriodMs: nntopReportingPeriodsMs) { + Preconditions.checkArgument(aPeriodMs >= TimeUnit.MINUTES.toMillis(1), "minimum reporting period is 1 min!"); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java index e8a4e23fdf0..ab553928862 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java @@ -17,67 +17,50 @@ */ package org.apache.hadoop.hdfs.server.namenode.top.metrics; -import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; -import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; -import static org.apache.hadoop.metrics2.lib.Interns.info; - import java.net.InetAddress; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger; import org.apache.hadoop.hdfs.server.namenode.top.TopConf; -import org.apache.hadoop.metrics2.MetricsCollector; -import org.apache.hadoop.metrics2.MetricsRecordBuilder; -import org.apache.hadoop.metrics2.MetricsSource; -import org.apache.hadoop.metrics2.MetricsSystem; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager; -import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.MetricValueMap; +import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.TopWindow; -/*** - * The interface to the top metrics +/** + * The interface to the top metrics. *

- * The producers use the {@link #report} method to report events and the - * consumers use {@link #getMetrics(MetricsCollector, boolean)} to retrieve the - * current top metrics. The default consumer is JMX but it could be any other - * user interface. + * Metrics are collected by a custom audit logger, {@link org.apache.hadoop + * .hdfs.server.namenode.top.TopAuditLogger}, which calls TopMetrics to + * increment per-operation, per-user counts on every audit log call. These + * counts are used to show the top users by NameNode operation as well as + * across all operations. + *

+ * TopMetrics maintains these counts for a configurable number of time + * intervals, e.g. 1min, 5min, 25min. Each interval is tracked by a + * RollingWindowManager. + *

+ * These metrics are published as a JSON string via {@link org.apache.hadoop + * .hdfs.server .namenode.metrics.FSNamesystemMBean#getTopWindows}. This is + * done by calling {@link org.apache.hadoop.hdfs.server.namenode.top.window + * .RollingWindowManager#snapshot} on each RollingWindowManager. *

* Thread-safe: relies on thread-safety of RollingWindowManager */ @InterfaceAudience.Private -public class TopMetrics implements MetricsSource { +public class TopMetrics { public static final Logger LOG = LoggerFactory.getLogger(TopMetrics.class); - enum Singleton { - INSTANCE; - - volatile TopMetrics impl = null; - - synchronized TopMetrics init(Configuration conf, String processName, - String sessionId, long[] reportingPeriods) { - if (impl == null) { - impl = - create(conf, processName, sessionId, reportingPeriods, - DefaultMetricsSystem.instance()); - } - logConf(conf); - return impl; - } - } - private static void logConf(Configuration conf) { LOG.info("NNTop conf: " + DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY + " = " + conf.get(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY)); @@ -87,128 +70,35 @@ public class TopMetrics implements MetricsSource { " = " + conf.get(DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY)); } - /** - * Return only the shortest periods for default - * TODO: make it configurable - */ - final boolean smallestOnlyDefault = true; - - /** - * The smallest of reporting periods - */ - long smallestPeriod = Long.MAX_VALUE; - - /** - * processName and sessionId might later be leveraged later when we aggregate - * report from multiple federated name nodes - */ - final String processName, sessionId; - /** * A map from reporting periods to WindowManager. Thread-safety is provided by * the fact that the mapping is not changed after construction. */ - final Map rollingWindowManagers = - new HashMap(); + final Map rollingWindowManagers = + new HashMap(); - TopMetrics(Configuration conf, String processName, String sessionId, - long[] reportingPeriods) { - this.processName = processName; - this.sessionId = sessionId; + public TopMetrics(Configuration conf, int[] reportingPeriods) { + logConf(conf); for (int i = 0; i < reportingPeriods.length; i++) { - smallestPeriod = Math.min(smallestPeriod, reportingPeriods[i]); rollingWindowManagers.put(reportingPeriods[i], new RollingWindowManager( conf, reportingPeriods[i])); } } - public static TopMetrics create(Configuration conf, String processName, - String sessionId, long[] reportingPeriods, MetricsSystem ms) { - return ms.register(TopConf.TOP_METRICS_REGISTRATION_NAME, - "top metrics of the namenode in a last period of time", new TopMetrics( - conf, processName, sessionId, reportingPeriods)); - } - - public static TopMetrics initSingleton(Configuration conf, - String processName, String sessionId, long[] reportingPeriods) { - return Singleton.INSTANCE.init(conf, processName, sessionId, - reportingPeriods); - } - - public static TopMetrics getInstance() { - TopMetrics topMetrics = Singleton.INSTANCE.impl; - Preconditions.checkArgument(topMetrics != null, - "The TopMetric singleton instance is not initialized." - + " Have you called initSingleton first?"); - return topMetrics; - } - /** - * In testing, the previous initialization should be reset if the entire - * metric system is reinitialized + * Get a list of the current TopWindow statistics, one TopWindow per tracked + * time interval. */ - @VisibleForTesting - public static void reset() { - Singleton.INSTANCE.impl = null; - } - - @Override - public void getMetrics(MetricsCollector collector, boolean all) { - long realTime = Time.monotonicNow(); - getMetrics(smallestOnlyDefault, realTime, collector, all); - } - - public void getMetrics(boolean smallestOnly, long currTime, - MetricsCollector collector, boolean all) { - for (Entry entry : rollingWindowManagers + public List getTopWindows() { + long monoTime = Time.monotonicNow(); + List windows = Lists.newArrayListWithCapacity + (rollingWindowManagers.size()); + for (Entry entry : rollingWindowManagers .entrySet()) { - if (!smallestOnly || smallestPeriod == entry.getKey()) { - getMetrics(currTime, collector, entry.getKey(), entry.getValue(), all); - } + TopWindow window = entry.getValue().snapshot(monoTime); + windows.add(window); } - } - - /** - * Get metrics for a particular recording period and its corresponding - * {@link RollingWindowManager} - *

- * - * @param collector the metric collector - * @param period the reporting period - * @param rollingWindowManager the window manager corresponding to the - * reporting period - * @param all currently ignored - */ - void getMetrics(long currTime, MetricsCollector collector, Long period, - RollingWindowManager rollingWindowManager, boolean all) { - MetricsRecordBuilder rb = - collector.addRecord(createTopMetricsRecordName(period)) - .setContext("namenode").tag(ProcessName, processName) - .tag(SessionId, sessionId); - - MetricValueMap snapshotMetrics = rollingWindowManager.snapshot(currTime); - LOG.debug("calling snapshot, result size is: " + snapshotMetrics.size()); - for (Map.Entry entry : snapshotMetrics.entrySet()) { - String key = entry.getKey(); - Number value = entry.getValue(); - LOG.debug("checking an entry: key: {} value: {}", key, value); - long min = period / 1000L / 60L; //ms -> min - String desc = "top user of name node in the past " + min + " minutes"; - - if (value instanceof Integer) { - rb.addGauge(info(key, desc), (Integer) value); - } else if (value instanceof Long) { - rb.addGauge(info(key, desc), (Long) value); - } else if (value instanceof Float) { - rb.addGauge(info(key, desc), (Float) value); - } else if (value instanceof Double) { - rb.addGauge(info(key, desc), (Double) value); - } else { - LOG.warn("Unsupported metric type: " + value.getClass()); - } - } - LOG.debug("END iterating over metrics, result size is: {}", - snapshotMetrics.size()); + return windows; } /** @@ -216,18 +106,10 @@ public class TopMetrics implements MetricsSource { * log file. This is to be consistent when {@link TopMetrics} is charged with * data read back from log files instead of being invoked directly by the * FsNamesystem - * - * @param succeeded - * @param userName - * @param addr - * @param cmd - * @param src - * @param dst - * @param status */ public void report(boolean succeeded, String userName, InetAddress addr, String cmd, String src, String dst, FileStatus status) { - //currently we nntop makes use of only the username and the command + // currently nntop only makes use of the username and the command report(userName, cmd); } @@ -239,27 +121,11 @@ public class TopMetrics implements MetricsSource { public void report(long currTime, String userName, String cmd) { LOG.debug("a metric is reported: cmd: {} user: {}", cmd, userName); userName = UserGroupInformation.trimLoginMethod(userName); - try { - for (RollingWindowManager rollingWindowManager : rollingWindowManagers - .values()) { - rollingWindowManager.recordMetric(currTime, cmd, userName, 1); - rollingWindowManager.recordMetric(currTime, - TopConf.CMD_TOTAL, userName, 1); - } - } catch (Throwable t) { - LOG.error("An error occurred while reflecting the event in top service, " - + "event: (time,cmd,userName)=(" + currTime + "," + cmd + "," - + userName); + for (RollingWindowManager rollingWindowManager : rollingWindowManagers + .values()) { + rollingWindowManager.recordMetric(currTime, cmd, userName, 1); + rollingWindowManager.recordMetric(currTime, + TopConf.ALL_CMDS, userName, 1); } } - - /*** - * - * @param period the reporting period length in ms - * @return - */ - public static String createTopMetricsRecordName(Long period) { - return TopConf.TOP_METRICS_RECORD_NAME + "-" + period; - } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java index d818cce26ef..00e708766e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java @@ -17,21 +17,22 @@ */ package org.apache.hadoop.hdfs.server.namenode.top.window; -import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.Set; +import java.util.Stack; import java.util.concurrent.ConcurrentHashMap; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.namenode.top.TopConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.conf.Configuration; /** * A class to manage the set of {@link RollingWindow}s. This class is the @@ -46,27 +47,95 @@ public class RollingWindowManager { public static final Logger LOG = LoggerFactory.getLogger( RollingWindowManager.class); - private int windowLenMs; - private int bucketsPerWindow; // e.g., 10 buckets per minute - private int topUsersCnt; // e.g., report top 10 metrics - - /** - * Create a metric name composed of the command and user - * - * @param command the command executed - * @param user the user - * @return a composed metric name - */ - @VisibleForTesting - public static String createMetricName(String command, String user) { - return command + "." + user; - } + private final int windowLenMs; + private final int bucketsPerWindow; // e.g., 10 buckets per minute + private final int topUsersCnt; // e.g., report top 10 metrics static private class RollingWindowMap extends ConcurrentHashMap { private static final long serialVersionUID = -6785807073237052051L; } + /** + * Represents a snapshot of the rolling window. It contains one Op per + * operation in the window, with ranked users for each Op. + */ + public static class TopWindow { + private final int windowMillis; + private final List top; + + public TopWindow(int windowMillis) { + this.windowMillis = windowMillis; + this.top = Lists.newArrayList(); + } + + public void addOp(Op op) { + top.add(op); + } + + public int getWindowLenMs() { + return windowMillis; + } + + public List getOps() { + return top; + } + } + + /** + * Represents an operation within a TopWindow. It contains a ranked + * set of the top users for the operation. + */ + public static class Op { + private final String opType; + private final List topUsers; + private final long totalCount; + + public Op(String opType, long totalCount) { + this.opType = opType; + this.topUsers = Lists.newArrayList(); + this.totalCount = totalCount; + } + + public void addUser(User u) { + topUsers.add(u); + } + + public String getOpType() { + return opType; + } + + public List getTopUsers() { + return topUsers; + } + + public long getTotalCount() { + return totalCount; + } + } + + /** + * Represents a user who called an Op within a TopWindow. Specifies the + * user and the number of times the user called the operation. + */ + public static class User { + private final String user; + private final long count; + + public User(String user, long count) { + this.user = user; + this.count = count; + } + + public String getUser() { + return user; + } + + public long getCount() { + return count; + } + } + /** * A mapping from each reported metric to its {@link RollingWindowMap} that * maintains the set of {@link RollingWindow}s for the users that have @@ -75,8 +144,9 @@ public class RollingWindowManager { public ConcurrentHashMap metricMap = new ConcurrentHashMap(); - public RollingWindowManager(Configuration conf, long reportingPeriodMs) { - windowLenMs = (int) reportingPeriodMs; + public RollingWindowManager(Configuration conf, int reportingPeriodMs) { + + windowLenMs = reportingPeriodMs; bucketsPerWindow = conf.getInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_DEFAULT); @@ -112,53 +182,71 @@ public class RollingWindowManager { * Take a snapshot of current top users in the past period. * * @param time the current time - * @return a map between the top metrics and their values. The user is encoded - * in the metric name. Refer to {@link RollingWindowManager#createMetricName} for - * the actual format. + * @return a TopWindow describing the top users for each metric in the + * window. */ - public MetricValueMap snapshot(long time) { - MetricValueMap map = new MetricValueMap(); - Set metricNames = metricMap.keySet(); - LOG.debug("iterating in reported metrics, size={} values={}", - metricNames.size(), metricNames); - for (Map.Entry rwEntry: metricMap.entrySet()) { - String metricName = rwEntry.getKey(); - RollingWindowMap rollingWindows = rwEntry.getValue(); - TopN topN = new TopN(topUsersCnt); - Iterator> iterator = - rollingWindows.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - String userName = entry.getKey(); - RollingWindow aWindow = entry.getValue(); - long windowSum = aWindow.getSum(time); - // do the gc here - if (windowSum == 0) { - LOG.debug("gc window of metric: {} userName: {}", - metricName, userName); - iterator.remove(); - continue; - } - LOG.debug("offer window of metric: {} userName: {} sum: {}", - metricName, userName, windowSum); - topN.offer(new NameValuePair(userName, windowSum)); - } - int n = topN.size(); - LOG.info("topN size for command " + metricName + " is: " + n); - if (n == 0) { + public TopWindow snapshot(long time) { + TopWindow window = new TopWindow(windowLenMs); + if (LOG.isDebugEnabled()) { + Set metricNames = metricMap.keySet(); + LOG.debug("iterating in reported metrics, size={} values={}", + metricNames.size(), metricNames); + } + for (Map.Entry entry : metricMap.entrySet()) { + String metricName = entry.getKey(); + RollingWindowMap rollingWindows = entry.getValue(); + TopN topN = getTopUsersForMetric(time, metricName, rollingWindows); + final int size = topN.size(); + if (size == 0) { continue; } - String allMetricName = - createMetricName(metricName, TopConf.ALL_USERS); - map.put(allMetricName, Long.valueOf(topN.total)); - for (int i = 0; i < n; i++) { - NameValuePair userEntry = topN.poll(); - String userMetricName = - createMetricName(metricName, userEntry.name); - map.put(userMetricName, Long.valueOf(userEntry.value)); + Op op = new Op(metricName, topN.getTotal()); + window.addOp(op); + // Reverse the users from the TopUsers using a stack, + // since we'd like them sorted in descending rather than ascending order + Stack reverse = new Stack(); + for (int i = 0; i < size; i++) { + reverse.push(topN.poll()); + } + for (int i = 0; i < size; i++) { + NameValuePair userEntry = reverse.pop(); + User user = new User(userEntry.name, Long.valueOf(userEntry.value)); + op.addUser(user); } } - return map; + return window; + } + + /** + * Calculates the top N users over a time interval. + * + * @param time the current time + * @param metricName Name of metric + * @return + */ + private TopN getTopUsersForMetric(long time, String metricName, + RollingWindowMap rollingWindows) { + TopN topN = new TopN(topUsersCnt); + Iterator> iterator = + rollingWindows.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + String userName = entry.getKey(); + RollingWindow aWindow = entry.getValue(); + long windowSum = aWindow.getSum(time); + // do the gc here + if (windowSum == 0) { + LOG.debug("gc window of metric: {} userName: {}", + metricName, userName); + iterator.remove(); + continue; + } + LOG.debug("offer window of metric: {} userName: {} sum: {}", + metricName, userName, windowSum); + topN.offer(new NameValuePair(userName, windowSum)); + } + LOG.info("topN size for command {} is: {}", metricName, topN.size()); + return topN; } /** @@ -190,7 +278,8 @@ public class RollingWindowManager { } /** - * A pair of a name and its corresponding value + * A pair of a name and its corresponding value. Defines a custom + * comparator so the TopN PriorityQueue sorts based on the count. */ static private class NameValuePair implements Comparable { String name; @@ -254,12 +343,4 @@ public class RollingWindowManager { return total; } } - - /** - * A mapping from metric names to their absolute values and their percentage - */ - @InterfaceAudience.Private - public static class MetricValueMap extends HashMap { - private static final long serialVersionUID = 8936732010242400171L; - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java index 39e1165359d..3703c2dcba4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystemMBean.java @@ -96,6 +96,8 @@ public class TestFSNamesystemMBean { "MaxObjects")); Integer numStaleStorages = (Integer) (mbs.getAttribute( mxbeanNameFsns, "NumStaleStorages")); + String topUsers = + (String) (mbs.getAttribute(mxbeanNameFsns, "TopUserOpCounts")); // Metrics that belong to "NameNodeInfo". // These are metrics that FSNamesystem registers directly with MBeanServer. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java index 03ade90b415..c649621db73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java @@ -26,9 +26,12 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.top.TopConf; +import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator; import org.apache.hadoop.util.VersionInfo; +import org.codehaus.jackson.map.ObjectMapper; import org.junit.Test; import org.mortbay.util.ajax.JSON; @@ -38,10 +41,15 @@ import java.io.File; import java.lang.management.ManagementFactory; import java.net.URI; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op; +import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.TopWindow; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; /** @@ -257,4 +265,112 @@ public class TestNameNodeMXBean { } } } + + @Test(timeout=120000) + @SuppressWarnings("unchecked") + public void testTopUsers() throws Exception { + final Configuration conf = new Configuration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); + cluster.waitActive(); + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + ObjectName mxbeanNameFsns = new ObjectName( + "Hadoop:service=NameNode,name=FSNamesystemState"); + FileSystem fs = cluster.getFileSystem(); + final Path path = new Path("/"); + final int NUM_OPS = 10; + for (int i=0; i< NUM_OPS; i++) { + fs.listStatus(path); + fs.setTimes(path, 0, 1); + } + String topUsers = + (String) (mbs.getAttribute(mxbeanNameFsns, "TopUserOpCounts")); + ObjectMapper mapper = new ObjectMapper(); + Map map = mapper.readValue(topUsers, Map.class); + assertTrue("Could not find map key timestamp", + map.containsKey("timestamp")); + assertTrue("Could not find map key windows", map.containsKey("windows")); + List>>> windows = + (List>>>) map.get("windows"); + assertEquals("Unexpected num windows", 3, windows.size()); + for (Map>> window : windows) { + final List> ops = window.get("ops"); + assertEquals("Unexpected num ops", 3, ops.size()); + for (Map op: ops) { + final long count = Long.parseLong(op.get("totalCount").toString()); + final String opType = op.get("opType").toString(); + final int expected; + if (opType.equals(TopConf.ALL_CMDS)) { + expected = 2*NUM_OPS; + } else { + expected = NUM_OPS; + } + assertEquals("Unexpected total count", expected, count); + } + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test(timeout=120000) + public void testTopUsersDisabled() throws Exception { + final Configuration conf = new Configuration(); + // Disable nntop + conf.setBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY, false); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); + cluster.waitActive(); + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + ObjectName mxbeanNameFsns = new ObjectName( + "Hadoop:service=NameNode,name=FSNamesystemState"); + FileSystem fs = cluster.getFileSystem(); + final Path path = new Path("/"); + final int NUM_OPS = 10; + for (int i=0; i< NUM_OPS; i++) { + fs.listStatus(path); + fs.setTimes(path, 0, 1); + } + String topUsers = + (String) (mbs.getAttribute(mxbeanNameFsns, "TopUserOpCounts")); + assertNull("Did not expect to find TopUserOpCounts bean!", topUsers); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test(timeout=120000) + public void testTopUsersNoPeriods() throws Exception { + final Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY, true); + conf.set(DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY, ""); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); + cluster.waitActive(); + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + ObjectName mxbeanNameFsns = new ObjectName( + "Hadoop:service=NameNode,name=FSNamesystemState"); + FileSystem fs = cluster.getFileSystem(); + final Path path = new Path("/"); + final int NUM_OPS = 10; + for (int i=0; i< NUM_OPS; i++) { + fs.listStatus(path); + fs.setTimes(path, 0, 1); + } + String topUsers = + (String) (mbs.getAttribute(mxbeanNameFsns, "TopUserOpCounts")); + assertNotNull("Expected TopUserOpCounts bean!", topUsers); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java index c028a4a31a8..6c378226e68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java @@ -47,10 +47,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; -import org.apache.hadoop.hdfs.server.namenode.top.TopConf; -import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics; import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger; -import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -58,7 +55,6 @@ import org.apache.hadoop.test.MetricsAsserts; import org.apache.hadoop.util.Time; import org.apache.log4j.Level; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -93,11 +89,6 @@ public class TestNameNodeMetrics { CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true); ((Log4JLogger)LogFactory.getLog(MetricsAsserts.class)) .getLogger().setLevel(Level.DEBUG); - /** - * need it to test {@link #testTopAuditLogger} - */ - CONF.set(DFS_NAMENODE_AUDIT_LOGGERS_KEY, - TopAuditLogger.class.getName()); } private MiniDFSCluster cluster; @@ -112,7 +103,6 @@ public class TestNameNodeMetrics { @Before public void setUp() throws Exception { - TopMetrics.reset();//reset the static init done by prev test cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT).build(); cluster.waitActive(); namesystem = cluster.getNamesystem(); @@ -465,53 +455,4 @@ public class TestNameNodeMetrics { assertQuantileGauges("Syncs1s", rb); assertQuantileGauges("BlockReport1s", rb); } - - /** - * Test whether {@link TopMetrics} is registered with metrics system - * @throws Exception - */ - @Test - public void testTopMetrics() throws Exception { - final String testUser = "NNTopTestUser"; - final String testOp = "NNTopTestOp"; - final String metricName = - RollingWindowManager.createMetricName(testOp, testUser); - TopMetrics.getInstance().report(testUser, testOp); - final String regName = TopConf.TOP_METRICS_REGISTRATION_NAME; - MetricsRecordBuilder rb = getMetrics(regName); - assertGauge(metricName, 1L, rb); - } - - /** - * Test whether {@link TopAuditLogger} is registered as an audit logger - * @throws Exception - */ - @Test - public void testTopAuditLogger() throws Exception { - //note: the top audit logger should already be set in conf - //issue one command, any command is fine - FileSystem fs = cluster.getFileSystem(); - long time = System.currentTimeMillis(); - fs.setTimes(new Path("/"), time, time); - //the command should be reflected in the total count of all users - final String testUser = TopConf.ALL_USERS; - final String testOp = TopConf.CMD_TOTAL; - final String metricName = - RollingWindowManager.createMetricName(testOp, testUser); - final String regName = TopConf.TOP_METRICS_REGISTRATION_NAME; - MetricsRecordBuilder rb = getMetrics(regName); - assertGaugeGreaterThan(metricName, 1L, rb); - } - - /** - * Assert a long gauge metric greater than - * @param name of the metric - * @param expected minimum expected value of the metric - * @param rb the record builder mock used to getMetrics - */ - public static void assertGaugeGreaterThan(String name, long expected, - MetricsRecordBuilder rb) { - Assert.assertTrue("Bad value for metric " + name, - expected <= MetricsAsserts.getLongGauge(name, rb)); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java index de2171443b9..494ed08dbea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java @@ -17,16 +17,19 @@ */ package org.apache.hadoop.hdfs.server.namenode.top.window; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; import org.junit.Before; import org.junit.Test; -import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.MetricValueMap; +import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op; +import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.TopWindow; +import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.User; +import static org.junit.Assert.assertEquals; public class TestRollingWindowManager { @@ -61,33 +64,39 @@ public class TestRollingWindowManager { for (int i = 0; i < users.length; i++) manager.recordMetric(time, "close", users[i], i + 1); time++; - MetricValueMap tops = manager.snapshot(time); + TopWindow tops = manager.snapshot(time); - assertEquals("The number of returned top metrics is invalid", - 2 * (N_TOP_USERS + 1), tops.size()); - int userIndex = users.length - 2; - String metricName = RollingWindowManager.createMetricName("open", - users[userIndex]); - boolean includes = tops.containsKey(metricName); - assertTrue("The order of entries in top metrics is wrong", includes); - assertEquals("The reported value by top is different from recorded one", - (userIndex + 1) * 2, ((Long) tops.get(metricName)).longValue()); + assertEquals("Unexpected number of ops", 2, tops.getOps().size()); + for (Op op : tops.getOps()) { + final List topUsers = op.getTopUsers(); + assertEquals("Unexpected number of users", N_TOP_USERS, topUsers.size()); + if (op.getOpType() == "open") { + for (int i = 0; i < topUsers.size(); i++) { + User user = topUsers.get(i); + assertEquals("Unexpected count for user " + user.getUser(), + (users.length-i)*2, user.getCount()); + } + // Closed form of sum(range(2,42,2)) + assertEquals("Unexpected total count for op", + (2+(users.length*2))*(users.length/2), + op.getTotalCount()); + } + } // move the window forward not to see the "open" results time += WINDOW_LEN_MS - 2; - // top should not include only "close" results tops = manager.snapshot(time); - assertEquals("The number of returned top metrics is invalid", - N_TOP_USERS + 1, tops.size()); - includes = tops.containsKey(metricName); - assertFalse("After rolling, the top list still includes the stale metrics", - includes); - - metricName = RollingWindowManager.createMetricName("close", - users[userIndex]); - includes = tops.containsKey(metricName); - assertTrue("The order of entries in top metrics is wrong", includes); - assertEquals("The reported value by top is different from recorded one", - (userIndex + 1), ((Long) tops.get(metricName)).longValue()); + assertEquals("Unexpected number of ops", 1, tops.getOps().size()); + final Op op = tops.getOps().get(0); + assertEquals("Should only see close ops", "close", op.getOpType()); + final List topUsers = op.getTopUsers(); + for (int i = 0; i < topUsers.size(); i++) { + User user = topUsers.get(i); + assertEquals("Unexpected count for user " + user.getUser(), + (users.length-i), user.getCount()); + } + // Closed form of sum(range(1,21)) + assertEquals("Unexpected total count for op", + (1 + users.length) * (users.length / 2), op.getTotalCount()); } }