HDFS-7426. Change nntop JMX format to be a JSON blob.
This commit is contained in:
parent
e5a6925199
commit
fa7b9248e4
|
@ -455,6 +455,8 @@ Release 2.7.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-7509. Avoid resolving path multiple times. (jing9)
|
HDFS-7509. Avoid resolving path multiple times. (jing9)
|
||||||
|
|
||||||
|
HDFS-7426. Change nntop JMX format to be a JSON blob. (wang)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
||||||
|
|
|
@ -120,6 +120,7 @@ import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.TreeMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
@ -241,6 +242,7 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
|
import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
|
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
|
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
@ -281,6 +283,7 @@ import org.apache.hadoop.util.VersionInfo;
|
||||||
import org.apache.log4j.Appender;
|
import org.apache.log4j.Appender;
|
||||||
import org.apache.log4j.AsyncAppender;
|
import org.apache.log4j.AsyncAppender;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
import org.mortbay.util.ajax.JSON;
|
import org.mortbay.util.ajax.JSON;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -539,6 +542,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
|
|
||||||
private final FSImage fsImage;
|
private final FSImage fsImage;
|
||||||
|
|
||||||
|
private final TopConf topConf;
|
||||||
|
private TopMetrics topMetrics;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notify that loading of this FSDirectory is complete, and
|
* Notify that loading of this FSDirectory is complete, and
|
||||||
* it is imageLoaded for use
|
* it is imageLoaded for use
|
||||||
|
@ -842,6 +848,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
this.snapshotManager = new SnapshotManager(dir);
|
this.snapshotManager = new SnapshotManager(dir);
|
||||||
this.cacheManager = new CacheManager(this, conf, blockManager);
|
this.cacheManager = new CacheManager(this, conf, blockManager);
|
||||||
this.safeMode = new SafeModeInfo(conf);
|
this.safeMode = new SafeModeInfo(conf);
|
||||||
|
this.topConf = new TopConf(conf);
|
||||||
this.auditLoggers = initAuditLoggers(conf);
|
this.auditLoggers = initAuditLoggers(conf);
|
||||||
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
|
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
|
||||||
auditLoggers.get(0) instanceof DefaultAuditLogger;
|
auditLoggers.get(0) instanceof DefaultAuditLogger;
|
||||||
|
@ -952,13 +959,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add audit logger to calculate top users
|
// Add audit logger to calculate top users
|
||||||
if (conf.getBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY,
|
if (topConf.isEnabled) {
|
||||||
DFSConfigKeys.NNTOP_ENABLED_DEFAULT)) {
|
topMetrics = new TopMetrics(conf, topConf.nntopReportingPeriodsMs);
|
||||||
String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
|
auditLoggers.add(new TopAuditLogger(topMetrics));
|
||||||
TopConf nntopConf = new TopConf(conf);
|
|
||||||
TopMetrics.initSingleton(conf, NamenodeRole.NAMENODE.name(), sessionId,
|
|
||||||
nntopConf.nntopReportingPeriodsMs);
|
|
||||||
auditLoggers.add(new TopAuditLogger());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return Collections.unmodifiableList(auditLoggers);
|
return Collections.unmodifiableList(auditLoggers);
|
||||||
|
@ -6013,6 +6016,27 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
return getBlockManager().getDatanodeManager().getNumStaleStorages();
|
return getBlockManager().getDatanodeManager().getNumStaleStorages();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override // FSNamesystemMBean
|
||||||
|
public String getTopUserOpCounts() {
|
||||||
|
if (!topConf.isEnabled) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Date now = new Date();
|
||||||
|
final List<RollingWindowManager.TopWindow> topWindows =
|
||||||
|
topMetrics.getTopWindows();
|
||||||
|
Map<String, Object> topMap = new TreeMap<String, Object>();
|
||||||
|
topMap.put("windows", topWindows);
|
||||||
|
topMap.put("timestamp", DFSUtil.dateToIso8601String(now));
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
try {
|
||||||
|
return mapper.writeValueAsString(topMap);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed to fetch TopUser metrics", e);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Increments, logs and then returns the stamp
|
* Increments, logs and then returns the stamp
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -164,4 +164,11 @@ public interface FSNamesystemMBean {
|
||||||
*/
|
*/
|
||||||
public int getNumStaleStorages();
|
public int getNumStaleStorages();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a nested JSON object listing the top users for different RPC
|
||||||
|
* operations over tracked time windows.
|
||||||
|
*
|
||||||
|
* @return JSON string
|
||||||
|
*/
|
||||||
|
public String getTopUserOpCounts();
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode.top;
|
||||||
|
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -36,6 +37,14 @@ import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
|
||||||
public class TopAuditLogger implements AuditLogger {
|
public class TopAuditLogger implements AuditLogger {
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(TopAuditLogger.class);
|
public static final Logger LOG = LoggerFactory.getLogger(TopAuditLogger.class);
|
||||||
|
|
||||||
|
private final TopMetrics topMetrics;
|
||||||
|
|
||||||
|
public TopAuditLogger(TopMetrics topMetrics) {
|
||||||
|
Preconditions.checkNotNull(topMetrics, "Cannot init with a null " +
|
||||||
|
"TopMetrics");
|
||||||
|
this.topMetrics = topMetrics;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize(Configuration conf) {
|
public void initialize(Configuration conf) {
|
||||||
}
|
}
|
||||||
|
@ -43,12 +52,11 @@ public class TopAuditLogger implements AuditLogger {
|
||||||
@Override
|
@Override
|
||||||
public void logAuditEvent(boolean succeeded, String userName,
|
public void logAuditEvent(boolean succeeded, String userName,
|
||||||
InetAddress addr, String cmd, String src, String dst, FileStatus status) {
|
InetAddress addr, String cmd, String src, String dst, FileStatus status) {
|
||||||
|
try {
|
||||||
TopMetrics instance = TopMetrics.getInstance();
|
topMetrics.report(succeeded, userName, addr, cmd, src, dst, status);
|
||||||
if (instance != null) {
|
} catch (Throwable t) {
|
||||||
instance.report(succeeded, userName, addr, cmd, src, dst, status);
|
LOG.error("An error occurred while reflecting the event in top service, "
|
||||||
} else {
|
+ "event: (cmd={},userName={})", cmd, userName);
|
||||||
LOG.error("TopMetrics is not initialized yet!");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
|
|
@ -17,6 +17,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode.top;
|
package org.apache.hadoop.hdfs.server.namenode.top;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import com.google.common.primitives.Ints;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
@ -27,34 +30,34 @@ import com.google.common.base.Preconditions;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public final class TopConf {
|
public final class TopConf {
|
||||||
|
/**
|
||||||
|
* Whether TopMetrics are enabled
|
||||||
|
*/
|
||||||
|
public final boolean isEnabled;
|
||||||
|
|
||||||
public static final String TOP_METRICS_REGISTRATION_NAME = "topusers";
|
|
||||||
public static final String TOP_METRICS_RECORD_NAME = "topparam";
|
|
||||||
/**
|
/**
|
||||||
* A meta command representing the total number of commands
|
* A meta command representing the total number of calls to all commands
|
||||||
*/
|
*/
|
||||||
public static final String CMD_TOTAL = "total";
|
public static final String ALL_CMDS = "*";
|
||||||
/**
|
|
||||||
* A meta user representing all users
|
|
||||||
*/
|
|
||||||
public static String ALL_USERS = "ALL";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* nntop reporting periods in milliseconds
|
* nntop reporting periods in milliseconds
|
||||||
*/
|
*/
|
||||||
public final long[] nntopReportingPeriodsMs;
|
public final int[] nntopReportingPeriodsMs;
|
||||||
|
|
||||||
public TopConf(Configuration conf) {
|
public TopConf(Configuration conf) {
|
||||||
|
isEnabled = conf.getBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY,
|
||||||
|
DFSConfigKeys.NNTOP_ENABLED_DEFAULT);
|
||||||
String[] periodsStr = conf.getTrimmedStrings(
|
String[] periodsStr = conf.getTrimmedStrings(
|
||||||
DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY,
|
DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY,
|
||||||
DFSConfigKeys.NNTOP_WINDOWS_MINUTES_DEFAULT);
|
DFSConfigKeys.NNTOP_WINDOWS_MINUTES_DEFAULT);
|
||||||
nntopReportingPeriodsMs = new long[periodsStr.length];
|
nntopReportingPeriodsMs = new int[periodsStr.length];
|
||||||
for (int i = 0; i < periodsStr.length; i++) {
|
for (int i = 0; i < periodsStr.length; i++) {
|
||||||
nntopReportingPeriodsMs[i] = Integer.parseInt(periodsStr[i]) *
|
nntopReportingPeriodsMs[i] = Ints.checkedCast(
|
||||||
60L * 1000L; //min to ms
|
TimeUnit.MINUTES.toMillis(Integer.parseInt(periodsStr[i])));
|
||||||
}
|
}
|
||||||
for (long aPeriodMs: nntopReportingPeriodsMs) {
|
for (int aPeriodMs: nntopReportingPeriodsMs) {
|
||||||
Preconditions.checkArgument(aPeriodMs >= 60L * 1000L,
|
Preconditions.checkArgument(aPeriodMs >= TimeUnit.MINUTES.toMillis(1),
|
||||||
"minimum reporting period is 1 min!");
|
"minimum reporting period is 1 min!");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,67 +17,50 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode.top.metrics;
|
package org.apache.hadoop.hdfs.server.namenode.top.metrics;
|
||||||
|
|
||||||
import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
|
|
||||||
import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
|
|
||||||
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
|
||||||
|
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
|
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
|
||||||
import org.apache.hadoop.metrics2.MetricsCollector;
|
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
|
||||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
|
||||||
import org.apache.hadoop.metrics2.MetricsSource;
|
|
||||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
|
import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.TopWindow;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.MetricValueMap;
|
|
||||||
|
|
||||||
/***
|
/**
|
||||||
* The interface to the top metrics
|
* The interface to the top metrics.
|
||||||
* <p/>
|
* <p/>
|
||||||
* The producers use the {@link #report} method to report events and the
|
* Metrics are collected by a custom audit logger, {@link org.apache.hadoop
|
||||||
* consumers use {@link #getMetrics(MetricsCollector, boolean)} to retrieve the
|
* .hdfs.server.namenode.top.TopAuditLogger}, which calls TopMetrics to
|
||||||
* current top metrics. The default consumer is JMX but it could be any other
|
* increment per-operation, per-user counts on every audit log call. These
|
||||||
* user interface.
|
* counts are used to show the top users by NameNode operation as well as
|
||||||
|
* across all operations.
|
||||||
|
* <p/>
|
||||||
|
* TopMetrics maintains these counts for a configurable number of time
|
||||||
|
* intervals, e.g. 1min, 5min, 25min. Each interval is tracked by a
|
||||||
|
* RollingWindowManager.
|
||||||
|
* <p/>
|
||||||
|
* These metrics are published as a JSON string via {@link org.apache.hadoop
|
||||||
|
* .hdfs.server .namenode.metrics.FSNamesystemMBean#getTopWindows}. This is
|
||||||
|
* done by calling {@link org.apache.hadoop.hdfs.server.namenode.top.window
|
||||||
|
* .RollingWindowManager#snapshot} on each RollingWindowManager.
|
||||||
* <p/>
|
* <p/>
|
||||||
* Thread-safe: relies on thread-safety of RollingWindowManager
|
* Thread-safe: relies on thread-safety of RollingWindowManager
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class TopMetrics implements MetricsSource {
|
public class TopMetrics {
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(TopMetrics.class);
|
public static final Logger LOG = LoggerFactory.getLogger(TopMetrics.class);
|
||||||
|
|
||||||
enum Singleton {
|
|
||||||
INSTANCE;
|
|
||||||
|
|
||||||
volatile TopMetrics impl = null;
|
|
||||||
|
|
||||||
synchronized TopMetrics init(Configuration conf, String processName,
|
|
||||||
String sessionId, long[] reportingPeriods) {
|
|
||||||
if (impl == null) {
|
|
||||||
impl =
|
|
||||||
create(conf, processName, sessionId, reportingPeriods,
|
|
||||||
DefaultMetricsSystem.instance());
|
|
||||||
}
|
|
||||||
logConf(conf);
|
|
||||||
return impl;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void logConf(Configuration conf) {
|
private static void logConf(Configuration conf) {
|
||||||
LOG.info("NNTop conf: " + DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY +
|
LOG.info("NNTop conf: " + DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY +
|
||||||
" = " + conf.get(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY));
|
" = " + conf.get(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY));
|
||||||
|
@ -87,128 +70,35 @@ public class TopMetrics implements MetricsSource {
|
||||||
" = " + conf.get(DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY));
|
" = " + conf.get(DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Return only the shortest periods for default
|
|
||||||
* TODO: make it configurable
|
|
||||||
*/
|
|
||||||
final boolean smallestOnlyDefault = true;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The smallest of reporting periods
|
|
||||||
*/
|
|
||||||
long smallestPeriod = Long.MAX_VALUE;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* processName and sessionId might later be leveraged later when we aggregate
|
|
||||||
* report from multiple federated name nodes
|
|
||||||
*/
|
|
||||||
final String processName, sessionId;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A map from reporting periods to WindowManager. Thread-safety is provided by
|
* A map from reporting periods to WindowManager. Thread-safety is provided by
|
||||||
* the fact that the mapping is not changed after construction.
|
* the fact that the mapping is not changed after construction.
|
||||||
*/
|
*/
|
||||||
final Map<Long, RollingWindowManager> rollingWindowManagers =
|
final Map<Integer, RollingWindowManager> rollingWindowManagers =
|
||||||
new HashMap<Long, RollingWindowManager>();
|
new HashMap<Integer, RollingWindowManager>();
|
||||||
|
|
||||||
TopMetrics(Configuration conf, String processName, String sessionId,
|
public TopMetrics(Configuration conf, int[] reportingPeriods) {
|
||||||
long[] reportingPeriods) {
|
logConf(conf);
|
||||||
this.processName = processName;
|
|
||||||
this.sessionId = sessionId;
|
|
||||||
for (int i = 0; i < reportingPeriods.length; i++) {
|
for (int i = 0; i < reportingPeriods.length; i++) {
|
||||||
smallestPeriod = Math.min(smallestPeriod, reportingPeriods[i]);
|
|
||||||
rollingWindowManagers.put(reportingPeriods[i], new RollingWindowManager(
|
rollingWindowManagers.put(reportingPeriods[i], new RollingWindowManager(
|
||||||
conf, reportingPeriods[i]));
|
conf, reportingPeriods[i]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TopMetrics create(Configuration conf, String processName,
|
|
||||||
String sessionId, long[] reportingPeriods, MetricsSystem ms) {
|
|
||||||
return ms.register(TopConf.TOP_METRICS_REGISTRATION_NAME,
|
|
||||||
"top metrics of the namenode in a last period of time", new TopMetrics(
|
|
||||||
conf, processName, sessionId, reportingPeriods));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static TopMetrics initSingleton(Configuration conf,
|
|
||||||
String processName, String sessionId, long[] reportingPeriods) {
|
|
||||||
return Singleton.INSTANCE.init(conf, processName, sessionId,
|
|
||||||
reportingPeriods);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static TopMetrics getInstance() {
|
|
||||||
TopMetrics topMetrics = Singleton.INSTANCE.impl;
|
|
||||||
Preconditions.checkArgument(topMetrics != null,
|
|
||||||
"The TopMetric singleton instance is not initialized."
|
|
||||||
+ " Have you called initSingleton first?");
|
|
||||||
return topMetrics;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In testing, the previous initialization should be reset if the entire
|
* Get a list of the current TopWindow statistics, one TopWindow per tracked
|
||||||
* metric system is reinitialized
|
* time interval.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
public List<TopWindow> getTopWindows() {
|
||||||
public static void reset() {
|
long monoTime = Time.monotonicNow();
|
||||||
Singleton.INSTANCE.impl = null;
|
List<TopWindow> windows = Lists.newArrayListWithCapacity
|
||||||
}
|
(rollingWindowManagers.size());
|
||||||
|
for (Entry<Integer, RollingWindowManager> entry : rollingWindowManagers
|
||||||
@Override
|
|
||||||
public void getMetrics(MetricsCollector collector, boolean all) {
|
|
||||||
long realTime = Time.monotonicNow();
|
|
||||||
getMetrics(smallestOnlyDefault, realTime, collector, all);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void getMetrics(boolean smallestOnly, long currTime,
|
|
||||||
MetricsCollector collector, boolean all) {
|
|
||||||
for (Entry<Long, RollingWindowManager> entry : rollingWindowManagers
|
|
||||||
.entrySet()) {
|
.entrySet()) {
|
||||||
if (!smallestOnly || smallestPeriod == entry.getKey()) {
|
TopWindow window = entry.getValue().snapshot(monoTime);
|
||||||
getMetrics(currTime, collector, entry.getKey(), entry.getValue(), all);
|
windows.add(window);
|
||||||
}
|
}
|
||||||
}
|
return windows;
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get metrics for a particular recording period and its corresponding
|
|
||||||
* {@link RollingWindowManager}
|
|
||||||
* <p/>
|
|
||||||
*
|
|
||||||
* @param collector the metric collector
|
|
||||||
* @param period the reporting period
|
|
||||||
* @param rollingWindowManager the window manager corresponding to the
|
|
||||||
* reporting period
|
|
||||||
* @param all currently ignored
|
|
||||||
*/
|
|
||||||
void getMetrics(long currTime, MetricsCollector collector, Long period,
|
|
||||||
RollingWindowManager rollingWindowManager, boolean all) {
|
|
||||||
MetricsRecordBuilder rb =
|
|
||||||
collector.addRecord(createTopMetricsRecordName(period))
|
|
||||||
.setContext("namenode").tag(ProcessName, processName)
|
|
||||||
.tag(SessionId, sessionId);
|
|
||||||
|
|
||||||
MetricValueMap snapshotMetrics = rollingWindowManager.snapshot(currTime);
|
|
||||||
LOG.debug("calling snapshot, result size is: " + snapshotMetrics.size());
|
|
||||||
for (Map.Entry<String, Number> entry : snapshotMetrics.entrySet()) {
|
|
||||||
String key = entry.getKey();
|
|
||||||
Number value = entry.getValue();
|
|
||||||
LOG.debug("checking an entry: key: {} value: {}", key, value);
|
|
||||||
long min = period / 1000L / 60L; //ms -> min
|
|
||||||
String desc = "top user of name node in the past " + min + " minutes";
|
|
||||||
|
|
||||||
if (value instanceof Integer) {
|
|
||||||
rb.addGauge(info(key, desc), (Integer) value);
|
|
||||||
} else if (value instanceof Long) {
|
|
||||||
rb.addGauge(info(key, desc), (Long) value);
|
|
||||||
} else if (value instanceof Float) {
|
|
||||||
rb.addGauge(info(key, desc), (Float) value);
|
|
||||||
} else if (value instanceof Double) {
|
|
||||||
rb.addGauge(info(key, desc), (Double) value);
|
|
||||||
} else {
|
|
||||||
LOG.warn("Unsupported metric type: " + value.getClass());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LOG.debug("END iterating over metrics, result size is: {}",
|
|
||||||
snapshotMetrics.size());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -216,18 +106,10 @@ public class TopMetrics implements MetricsSource {
|
||||||
* log file. This is to be consistent when {@link TopMetrics} is charged with
|
* log file. This is to be consistent when {@link TopMetrics} is charged with
|
||||||
* data read back from log files instead of being invoked directly by the
|
* data read back from log files instead of being invoked directly by the
|
||||||
* FsNamesystem
|
* FsNamesystem
|
||||||
*
|
|
||||||
* @param succeeded
|
|
||||||
* @param userName
|
|
||||||
* @param addr
|
|
||||||
* @param cmd
|
|
||||||
* @param src
|
|
||||||
* @param dst
|
|
||||||
* @param status
|
|
||||||
*/
|
*/
|
||||||
public void report(boolean succeeded, String userName, InetAddress addr,
|
public void report(boolean succeeded, String userName, InetAddress addr,
|
||||||
String cmd, String src, String dst, FileStatus status) {
|
String cmd, String src, String dst, FileStatus status) {
|
||||||
//currently we nntop makes use of only the username and the command
|
// currently nntop only makes use of the username and the command
|
||||||
report(userName, cmd);
|
report(userName, cmd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -239,27 +121,11 @@ public class TopMetrics implements MetricsSource {
|
||||||
public void report(long currTime, String userName, String cmd) {
|
public void report(long currTime, String userName, String cmd) {
|
||||||
LOG.debug("a metric is reported: cmd: {} user: {}", cmd, userName);
|
LOG.debug("a metric is reported: cmd: {} user: {}", cmd, userName);
|
||||||
userName = UserGroupInformation.trimLoginMethod(userName);
|
userName = UserGroupInformation.trimLoginMethod(userName);
|
||||||
try {
|
|
||||||
for (RollingWindowManager rollingWindowManager : rollingWindowManagers
|
for (RollingWindowManager rollingWindowManager : rollingWindowManagers
|
||||||
.values()) {
|
.values()) {
|
||||||
rollingWindowManager.recordMetric(currTime, cmd, userName, 1);
|
rollingWindowManager.recordMetric(currTime, cmd, userName, 1);
|
||||||
rollingWindowManager.recordMetric(currTime,
|
rollingWindowManager.recordMetric(currTime,
|
||||||
TopConf.CMD_TOTAL, userName, 1);
|
TopConf.ALL_CMDS, userName, 1);
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
|
||||||
LOG.error("An error occurred while reflecting the event in top service, "
|
|
||||||
+ "event: (time,cmd,userName)=(" + currTime + "," + cmd + ","
|
|
||||||
+ userName);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/***
|
|
||||||
*
|
|
||||||
* @param period the reporting period length in ms
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
public static String createTopMetricsRecordName(Long period) {
|
|
||||||
return TopConf.TOP_METRICS_RECORD_NAME + "-" + period;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,21 +17,22 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode.top.window;
|
package org.apache.hadoop.hdfs.server.namenode.top.window;
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.PriorityQueue;
|
import java.util.PriorityQueue;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.Stack;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.primitives.Ints;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A class to manage the set of {@link RollingWindow}s. This class is the
|
* A class to manage the set of {@link RollingWindow}s. This class is the
|
||||||
|
@ -46,27 +47,95 @@ public class RollingWindowManager {
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(
|
public static final Logger LOG = LoggerFactory.getLogger(
|
||||||
RollingWindowManager.class);
|
RollingWindowManager.class);
|
||||||
|
|
||||||
private int windowLenMs;
|
private final int windowLenMs;
|
||||||
private int bucketsPerWindow; // e.g., 10 buckets per minute
|
private final int bucketsPerWindow; // e.g., 10 buckets per minute
|
||||||
private int topUsersCnt; // e.g., report top 10 metrics
|
private final int topUsersCnt; // e.g., report top 10 metrics
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a metric name composed of the command and user
|
|
||||||
*
|
|
||||||
* @param command the command executed
|
|
||||||
* @param user the user
|
|
||||||
* @return a composed metric name
|
|
||||||
*/
|
|
||||||
@VisibleForTesting
|
|
||||||
public static String createMetricName(String command, String user) {
|
|
||||||
return command + "." + user;
|
|
||||||
}
|
|
||||||
|
|
||||||
static private class RollingWindowMap extends
|
static private class RollingWindowMap extends
|
||||||
ConcurrentHashMap<String, RollingWindow> {
|
ConcurrentHashMap<String, RollingWindow> {
|
||||||
private static final long serialVersionUID = -6785807073237052051L;
|
private static final long serialVersionUID = -6785807073237052051L;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a snapshot of the rolling window. It contains one Op per
|
||||||
|
* operation in the window, with ranked users for each Op.
|
||||||
|
*/
|
||||||
|
public static class TopWindow {
|
||||||
|
private final int windowMillis;
|
||||||
|
private final List<Op> top;
|
||||||
|
|
||||||
|
public TopWindow(int windowMillis) {
|
||||||
|
this.windowMillis = windowMillis;
|
||||||
|
this.top = Lists.newArrayList();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addOp(Op op) {
|
||||||
|
top.add(op);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getWindowLenMs() {
|
||||||
|
return windowMillis;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Op> getOps() {
|
||||||
|
return top;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents an operation within a TopWindow. It contains a ranked
|
||||||
|
* set of the top users for the operation.
|
||||||
|
*/
|
||||||
|
public static class Op {
|
||||||
|
private final String opType;
|
||||||
|
private final List<User> topUsers;
|
||||||
|
private final long totalCount;
|
||||||
|
|
||||||
|
public Op(String opType, long totalCount) {
|
||||||
|
this.opType = opType;
|
||||||
|
this.topUsers = Lists.newArrayList();
|
||||||
|
this.totalCount = totalCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addUser(User u) {
|
||||||
|
topUsers.add(u);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getOpType() {
|
||||||
|
return opType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<User> getTopUsers() {
|
||||||
|
return topUsers;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getTotalCount() {
|
||||||
|
return totalCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a user who called an Op within a TopWindow. Specifies the
|
||||||
|
* user and the number of times the user called the operation.
|
||||||
|
*/
|
||||||
|
public static class User {
|
||||||
|
private final String user;
|
||||||
|
private final long count;
|
||||||
|
|
||||||
|
public User(String user, long count) {
|
||||||
|
this.user = user;
|
||||||
|
this.count = count;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getUser() {
|
||||||
|
return user;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getCount() {
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A mapping from each reported metric to its {@link RollingWindowMap} that
|
* A mapping from each reported metric to its {@link RollingWindowMap} that
|
||||||
* maintains the set of {@link RollingWindow}s for the users that have
|
* maintains the set of {@link RollingWindow}s for the users that have
|
||||||
|
@ -75,8 +144,9 @@ public class RollingWindowManager {
|
||||||
public ConcurrentHashMap<String, RollingWindowMap> metricMap =
|
public ConcurrentHashMap<String, RollingWindowMap> metricMap =
|
||||||
new ConcurrentHashMap<String, RollingWindowMap>();
|
new ConcurrentHashMap<String, RollingWindowMap>();
|
||||||
|
|
||||||
public RollingWindowManager(Configuration conf, long reportingPeriodMs) {
|
public RollingWindowManager(Configuration conf, int reportingPeriodMs) {
|
||||||
windowLenMs = (int) reportingPeriodMs;
|
|
||||||
|
windowLenMs = reportingPeriodMs;
|
||||||
bucketsPerWindow =
|
bucketsPerWindow =
|
||||||
conf.getInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY,
|
conf.getInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY,
|
||||||
DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_DEFAULT);
|
DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_DEFAULT);
|
||||||
|
@ -112,18 +182,50 @@ public class RollingWindowManager {
|
||||||
* Take a snapshot of current top users in the past period.
|
* Take a snapshot of current top users in the past period.
|
||||||
*
|
*
|
||||||
* @param time the current time
|
* @param time the current time
|
||||||
* @return a map between the top metrics and their values. The user is encoded
|
* @return a TopWindow describing the top users for each metric in the
|
||||||
* in the metric name. Refer to {@link RollingWindowManager#createMetricName} for
|
* window.
|
||||||
* the actual format.
|
|
||||||
*/
|
*/
|
||||||
public MetricValueMap snapshot(long time) {
|
public TopWindow snapshot(long time) {
|
||||||
MetricValueMap map = new MetricValueMap();
|
TopWindow window = new TopWindow(windowLenMs);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
Set<String> metricNames = metricMap.keySet();
|
Set<String> metricNames = metricMap.keySet();
|
||||||
LOG.debug("iterating in reported metrics, size={} values={}",
|
LOG.debug("iterating in reported metrics, size={} values={}",
|
||||||
metricNames.size(), metricNames);
|
metricNames.size(), metricNames);
|
||||||
for (Map.Entry<String,RollingWindowMap> rwEntry: metricMap.entrySet()) {
|
}
|
||||||
String metricName = rwEntry.getKey();
|
for (Map.Entry<String, RollingWindowMap> entry : metricMap.entrySet()) {
|
||||||
RollingWindowMap rollingWindows = rwEntry.getValue();
|
String metricName = entry.getKey();
|
||||||
|
RollingWindowMap rollingWindows = entry.getValue();
|
||||||
|
TopN topN = getTopUsersForMetric(time, metricName, rollingWindows);
|
||||||
|
final int size = topN.size();
|
||||||
|
if (size == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Op op = new Op(metricName, topN.getTotal());
|
||||||
|
window.addOp(op);
|
||||||
|
// Reverse the users from the TopUsers using a stack,
|
||||||
|
// since we'd like them sorted in descending rather than ascending order
|
||||||
|
Stack<NameValuePair> reverse = new Stack<NameValuePair>();
|
||||||
|
for (int i = 0; i < size; i++) {
|
||||||
|
reverse.push(topN.poll());
|
||||||
|
}
|
||||||
|
for (int i = 0; i < size; i++) {
|
||||||
|
NameValuePair userEntry = reverse.pop();
|
||||||
|
User user = new User(userEntry.name, Long.valueOf(userEntry.value));
|
||||||
|
op.addUser(user);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return window;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculates the top N users over a time interval.
|
||||||
|
*
|
||||||
|
* @param time the current time
|
||||||
|
* @param metricName Name of metric
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private TopN getTopUsersForMetric(long time, String metricName,
|
||||||
|
RollingWindowMap rollingWindows) {
|
||||||
TopN topN = new TopN(topUsersCnt);
|
TopN topN = new TopN(topUsersCnt);
|
||||||
Iterator<Map.Entry<String, RollingWindow>> iterator =
|
Iterator<Map.Entry<String, RollingWindow>> iterator =
|
||||||
rollingWindows.entrySet().iterator();
|
rollingWindows.entrySet().iterator();
|
||||||
|
@ -143,22 +245,8 @@ public class RollingWindowManager {
|
||||||
metricName, userName, windowSum);
|
metricName, userName, windowSum);
|
||||||
topN.offer(new NameValuePair(userName, windowSum));
|
topN.offer(new NameValuePair(userName, windowSum));
|
||||||
}
|
}
|
||||||
int n = topN.size();
|
LOG.info("topN size for command {} is: {}", metricName, topN.size());
|
||||||
LOG.info("topN size for command " + metricName + " is: " + n);
|
return topN;
|
||||||
if (n == 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
String allMetricName =
|
|
||||||
createMetricName(metricName, TopConf.ALL_USERS);
|
|
||||||
map.put(allMetricName, Long.valueOf(topN.total));
|
|
||||||
for (int i = 0; i < n; i++) {
|
|
||||||
NameValuePair userEntry = topN.poll();
|
|
||||||
String userMetricName =
|
|
||||||
createMetricName(metricName, userEntry.name);
|
|
||||||
map.put(userMetricName, Long.valueOf(userEntry.value));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return map;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -190,7 +278,8 @@ public class RollingWindowManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A pair of a name and its corresponding value
|
* A pair of a name and its corresponding value. Defines a custom
|
||||||
|
* comparator so the TopN PriorityQueue sorts based on the count.
|
||||||
*/
|
*/
|
||||||
static private class NameValuePair implements Comparable<NameValuePair> {
|
static private class NameValuePair implements Comparable<NameValuePair> {
|
||||||
String name;
|
String name;
|
||||||
|
@ -254,12 +343,4 @@ public class RollingWindowManager {
|
||||||
return total;
|
return total;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* A mapping from metric names to their absolute values and their percentage
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public static class MetricValueMap extends HashMap<String, Number> {
|
|
||||||
private static final long serialVersionUID = 8936732010242400171L;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -96,6 +96,8 @@ public class TestFSNamesystemMBean {
|
||||||
"MaxObjects"));
|
"MaxObjects"));
|
||||||
Integer numStaleStorages = (Integer) (mbs.getAttribute(
|
Integer numStaleStorages = (Integer) (mbs.getAttribute(
|
||||||
mxbeanNameFsns, "NumStaleStorages"));
|
mxbeanNameFsns, "NumStaleStorages"));
|
||||||
|
String topUsers =
|
||||||
|
(String) (mbs.getAttribute(mxbeanNameFsns, "TopUserOpCounts"));
|
||||||
|
|
||||||
// Metrics that belong to "NameNodeInfo".
|
// Metrics that belong to "NameNodeInfo".
|
||||||
// These are metrics that FSNamesystem registers directly with MBeanServer.
|
// These are metrics that FSNamesystem registers directly with MBeanServer.
|
||||||
|
|
|
@ -26,9 +26,12 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
|
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
|
||||||
import org.apache.hadoop.util.VersionInfo;
|
import org.apache.hadoop.util.VersionInfo;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mortbay.util.ajax.JSON;
|
import org.mortbay.util.ajax.JSON;
|
||||||
|
|
||||||
|
@ -38,10 +41,15 @@ import java.io.File;
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
|
||||||
|
import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.TopWindow;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -257,4 +265,112 @@ public class TestNameNodeMXBean {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=120000)
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testTopUsers() throws Exception {
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||||
|
ObjectName mxbeanNameFsns = new ObjectName(
|
||||||
|
"Hadoop:service=NameNode,name=FSNamesystemState");
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
final Path path = new Path("/");
|
||||||
|
final int NUM_OPS = 10;
|
||||||
|
for (int i=0; i< NUM_OPS; i++) {
|
||||||
|
fs.listStatus(path);
|
||||||
|
fs.setTimes(path, 0, 1);
|
||||||
|
}
|
||||||
|
String topUsers =
|
||||||
|
(String) (mbs.getAttribute(mxbeanNameFsns, "TopUserOpCounts"));
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
Map<String, Object> map = mapper.readValue(topUsers, Map.class);
|
||||||
|
assertTrue("Could not find map key timestamp",
|
||||||
|
map.containsKey("timestamp"));
|
||||||
|
assertTrue("Could not find map key windows", map.containsKey("windows"));
|
||||||
|
List<Map<String, List<Map<String, Object>>>> windows =
|
||||||
|
(List<Map<String, List<Map<String, Object>>>>) map.get("windows");
|
||||||
|
assertEquals("Unexpected num windows", 3, windows.size());
|
||||||
|
for (Map<String, List<Map<String, Object>>> window : windows) {
|
||||||
|
final List<Map<String, Object>> ops = window.get("ops");
|
||||||
|
assertEquals("Unexpected num ops", 3, ops.size());
|
||||||
|
for (Map<String, Object> op: ops) {
|
||||||
|
final long count = Long.parseLong(op.get("totalCount").toString());
|
||||||
|
final String opType = op.get("opType").toString();
|
||||||
|
final int expected;
|
||||||
|
if (opType.equals(TopConf.ALL_CMDS)) {
|
||||||
|
expected = 2*NUM_OPS;
|
||||||
|
} else {
|
||||||
|
expected = NUM_OPS;
|
||||||
|
}
|
||||||
|
assertEquals("Unexpected total count", expected, count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=120000)
|
||||||
|
public void testTopUsersDisabled() throws Exception {
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
// Disable nntop
|
||||||
|
conf.setBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY, false);
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||||
|
ObjectName mxbeanNameFsns = new ObjectName(
|
||||||
|
"Hadoop:service=NameNode,name=FSNamesystemState");
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
final Path path = new Path("/");
|
||||||
|
final int NUM_OPS = 10;
|
||||||
|
for (int i=0; i< NUM_OPS; i++) {
|
||||||
|
fs.listStatus(path);
|
||||||
|
fs.setTimes(path, 0, 1);
|
||||||
|
}
|
||||||
|
String topUsers =
|
||||||
|
(String) (mbs.getAttribute(mxbeanNameFsns, "TopUserOpCounts"));
|
||||||
|
assertNull("Did not expect to find TopUserOpCounts bean!", topUsers);
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=120000)
|
||||||
|
public void testTopUsersNoPeriods() throws Exception {
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean(DFSConfigKeys.NNTOP_ENABLED_KEY, true);
|
||||||
|
conf.set(DFSConfigKeys.NNTOP_WINDOWS_MINUTES_KEY, "");
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||||
|
ObjectName mxbeanNameFsns = new ObjectName(
|
||||||
|
"Hadoop:service=NameNode,name=FSNamesystemState");
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
final Path path = new Path("/");
|
||||||
|
final int NUM_OPS = 10;
|
||||||
|
for (int i=0; i< NUM_OPS; i++) {
|
||||||
|
fs.listStatus(path);
|
||||||
|
fs.setTimes(path, 0, 1);
|
||||||
|
}
|
||||||
|
String topUsers =
|
||||||
|
(String) (mbs.getAttribute(mxbeanNameFsns, "TopUserOpCounts"));
|
||||||
|
assertNotNull("Expected TopUserOpCounts bean!", topUsers);
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,10 +47,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
|
import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
|
|
||||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
import org.apache.hadoop.metrics2.MetricsSource;
|
import org.apache.hadoop.metrics2.MetricsSource;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
@ -58,7 +55,6 @@ import org.apache.hadoop.test.MetricsAsserts;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -93,11 +89,6 @@ public class TestNameNodeMetrics {
|
||||||
CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
|
CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
|
||||||
((Log4JLogger)LogFactory.getLog(MetricsAsserts.class))
|
((Log4JLogger)LogFactory.getLog(MetricsAsserts.class))
|
||||||
.getLogger().setLevel(Level.DEBUG);
|
.getLogger().setLevel(Level.DEBUG);
|
||||||
/**
|
|
||||||
* need it to test {@link #testTopAuditLogger}
|
|
||||||
*/
|
|
||||||
CONF.set(DFS_NAMENODE_AUDIT_LOGGERS_KEY,
|
|
||||||
TopAuditLogger.class.getName());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
|
@ -112,7 +103,6 @@ public class TestNameNodeMetrics {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
TopMetrics.reset();//reset the static init done by prev test
|
|
||||||
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT).build();
|
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
namesystem = cluster.getNamesystem();
|
namesystem = cluster.getNamesystem();
|
||||||
|
@ -465,53 +455,4 @@ public class TestNameNodeMetrics {
|
||||||
assertQuantileGauges("Syncs1s", rb);
|
assertQuantileGauges("Syncs1s", rb);
|
||||||
assertQuantileGauges("BlockReport1s", rb);
|
assertQuantileGauges("BlockReport1s", rb);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Test whether {@link TopMetrics} is registered with metrics system
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testTopMetrics() throws Exception {
|
|
||||||
final String testUser = "NNTopTestUser";
|
|
||||||
final String testOp = "NNTopTestOp";
|
|
||||||
final String metricName =
|
|
||||||
RollingWindowManager.createMetricName(testOp, testUser);
|
|
||||||
TopMetrics.getInstance().report(testUser, testOp);
|
|
||||||
final String regName = TopConf.TOP_METRICS_REGISTRATION_NAME;
|
|
||||||
MetricsRecordBuilder rb = getMetrics(regName);
|
|
||||||
assertGauge(metricName, 1L, rb);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test whether {@link TopAuditLogger} is registered as an audit logger
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testTopAuditLogger() throws Exception {
|
|
||||||
//note: the top audit logger should already be set in conf
|
|
||||||
//issue one command, any command is fine
|
|
||||||
FileSystem fs = cluster.getFileSystem();
|
|
||||||
long time = System.currentTimeMillis();
|
|
||||||
fs.setTimes(new Path("/"), time, time);
|
|
||||||
//the command should be reflected in the total count of all users
|
|
||||||
final String testUser = TopConf.ALL_USERS;
|
|
||||||
final String testOp = TopConf.CMD_TOTAL;
|
|
||||||
final String metricName =
|
|
||||||
RollingWindowManager.createMetricName(testOp, testUser);
|
|
||||||
final String regName = TopConf.TOP_METRICS_REGISTRATION_NAME;
|
|
||||||
MetricsRecordBuilder rb = getMetrics(regName);
|
|
||||||
assertGaugeGreaterThan(metricName, 1L, rb);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Assert a long gauge metric greater than
|
|
||||||
* @param name of the metric
|
|
||||||
* @param expected minimum expected value of the metric
|
|
||||||
* @param rb the record builder mock used to getMetrics
|
|
||||||
*/
|
|
||||||
public static void assertGaugeGreaterThan(String name, long expected,
|
|
||||||
MetricsRecordBuilder rb) {
|
|
||||||
Assert.assertTrue("Bad value for metric " + name,
|
|
||||||
expected <= MetricsAsserts.getLongGauge(name, rb));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,16 +17,19 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode.top.window;
|
package org.apache.hadoop.hdfs.server.namenode.top.window;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import java.util.List;
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.MetricValueMap;
|
import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
|
||||||
|
import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.TopWindow;
|
||||||
|
import static org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.User;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
public class TestRollingWindowManager {
|
public class TestRollingWindowManager {
|
||||||
|
|
||||||
|
@ -61,33 +64,39 @@ public class TestRollingWindowManager {
|
||||||
for (int i = 0; i < users.length; i++)
|
for (int i = 0; i < users.length; i++)
|
||||||
manager.recordMetric(time, "close", users[i], i + 1);
|
manager.recordMetric(time, "close", users[i], i + 1);
|
||||||
time++;
|
time++;
|
||||||
MetricValueMap tops = manager.snapshot(time);
|
TopWindow tops = manager.snapshot(time);
|
||||||
|
|
||||||
assertEquals("The number of returned top metrics is invalid",
|
assertEquals("Unexpected number of ops", 2, tops.getOps().size());
|
||||||
2 * (N_TOP_USERS + 1), tops.size());
|
for (Op op : tops.getOps()) {
|
||||||
int userIndex = users.length - 2;
|
final List<User> topUsers = op.getTopUsers();
|
||||||
String metricName = RollingWindowManager.createMetricName("open",
|
assertEquals("Unexpected number of users", N_TOP_USERS, topUsers.size());
|
||||||
users[userIndex]);
|
if (op.getOpType() == "open") {
|
||||||
boolean includes = tops.containsKey(metricName);
|
for (int i = 0; i < topUsers.size(); i++) {
|
||||||
assertTrue("The order of entries in top metrics is wrong", includes);
|
User user = topUsers.get(i);
|
||||||
assertEquals("The reported value by top is different from recorded one",
|
assertEquals("Unexpected count for user " + user.getUser(),
|
||||||
(userIndex + 1) * 2, ((Long) tops.get(metricName)).longValue());
|
(users.length-i)*2, user.getCount());
|
||||||
|
}
|
||||||
|
// Closed form of sum(range(2,42,2))
|
||||||
|
assertEquals("Unexpected total count for op",
|
||||||
|
(2+(users.length*2))*(users.length/2),
|
||||||
|
op.getTotalCount());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// move the window forward not to see the "open" results
|
// move the window forward not to see the "open" results
|
||||||
time += WINDOW_LEN_MS - 2;
|
time += WINDOW_LEN_MS - 2;
|
||||||
// top should not include only "close" results
|
|
||||||
tops = manager.snapshot(time);
|
tops = manager.snapshot(time);
|
||||||
assertEquals("The number of returned top metrics is invalid",
|
assertEquals("Unexpected number of ops", 1, tops.getOps().size());
|
||||||
N_TOP_USERS + 1, tops.size());
|
final Op op = tops.getOps().get(0);
|
||||||
includes = tops.containsKey(metricName);
|
assertEquals("Should only see close ops", "close", op.getOpType());
|
||||||
assertFalse("After rolling, the top list still includes the stale metrics",
|
final List<User> topUsers = op.getTopUsers();
|
||||||
includes);
|
for (int i = 0; i < topUsers.size(); i++) {
|
||||||
|
User user = topUsers.get(i);
|
||||||
metricName = RollingWindowManager.createMetricName("close",
|
assertEquals("Unexpected count for user " + user.getUser(),
|
||||||
users[userIndex]);
|
(users.length-i), user.getCount());
|
||||||
includes = tops.containsKey(metricName);
|
}
|
||||||
assertTrue("The order of entries in top metrics is wrong", includes);
|
// Closed form of sum(range(1,21))
|
||||||
assertEquals("The reported value by top is different from recorded one",
|
assertEquals("Unexpected total count for op",
|
||||||
(userIndex + 1), ((Long) tops.get(metricName)).longValue());
|
(1 + users.length) * (users.length / 2), op.getTotalCount());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue