HDFS-8953. DataNode Metrics logging (Contributed by Kanaka Kumar Avvaru)
(cherry picked from commit ce69c9b54c
)
This commit is contained in:
parent
c31fe79979
commit
2b78714c68
|
@ -162,6 +162,20 @@ log4j.appender.NNMETRICSRFA.layout.ConversionPattern=%d{ISO8601} %m%n
|
||||||
log4j.appender.NNMETRICSRFA.MaxBackupIndex=1
|
log4j.appender.NNMETRICSRFA.MaxBackupIndex=1
|
||||||
log4j.appender.NNMETRICSRFA.MaxFileSize=64MB
|
log4j.appender.NNMETRICSRFA.MaxFileSize=64MB
|
||||||
|
|
||||||
|
#
|
||||||
|
# DataNode metrics logging.
|
||||||
|
# The default is to retain two datanode-metrics.log files up to 64MB each.
|
||||||
|
#
|
||||||
|
datanode.metrics.logger=INFO,NullAppender
|
||||||
|
log4j.logger.DataNodeMetricsLog=${datanode.metrics.logger}
|
||||||
|
log4j.additivity.DataNodeMetricsLog=false
|
||||||
|
log4j.appender.DNMETRICSRFA=org.apache.log4j.RollingFileAppender
|
||||||
|
log4j.appender.DNMETRICSRFA.File=${hadoop.log.dir}/datanode-metrics.log
|
||||||
|
log4j.appender.DNMETRICSRFA.layout=org.apache.log4j.PatternLayout
|
||||||
|
log4j.appender.DNMETRICSRFA.layout.ConversionPattern=%d{ISO8601} %m%n
|
||||||
|
log4j.appender.DNMETRICSRFA.MaxBackupIndex=1
|
||||||
|
log4j.appender.DNMETRICSRFA.MaxFileSize=64MB
|
||||||
|
|
||||||
#
|
#
|
||||||
# mapred audit logging
|
# mapred audit logging
|
||||||
#
|
#
|
||||||
|
|
|
@ -570,6 +570,8 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-9008. Balancer#Parameters class could use a builder pattern.
|
HDFS-9008. Balancer#Parameters class could use a builder pattern.
|
||||||
(Chris Trezzo via mingma)
|
(Chris Trezzo via mingma)
|
||||||
|
|
||||||
|
HDFS-8953. DataNode Metrics logging (Kanaka Kumar Avvaru via vinayakumarb)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -360,6 +360,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
"dfs.namenode.metrics.logger.period.seconds";
|
"dfs.namenode.metrics.logger.period.seconds";
|
||||||
public static final int DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT =
|
public static final int DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT =
|
||||||
600;
|
600;
|
||||||
|
public static final String DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY =
|
||||||
|
"dfs.datanode.metrics.logger.period.seconds";
|
||||||
|
public static final int DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT =
|
||||||
|
600;
|
||||||
|
|
||||||
public static final String DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth";
|
public static final String DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth";
|
||||||
public static final long DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
|
public static final long DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
|
||||||
|
|
|
@ -0,0 +1,174 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.common;
|
||||||
|
|
||||||
|
import java.lang.management.ManagementFactory;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import javax.management.Attribute;
|
||||||
|
import javax.management.AttributeList;
|
||||||
|
import javax.management.MBeanAttributeInfo;
|
||||||
|
import javax.management.MBeanInfo;
|
||||||
|
import javax.management.MBeanServer;
|
||||||
|
import javax.management.MalformedObjectNameException;
|
||||||
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
|
import org.apache.hadoop.metrics2.util.MBeans;
|
||||||
|
import org.apache.log4j.Appender;
|
||||||
|
import org.apache.log4j.AsyncAppender;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MetricsLoggerTask can be used as utility to dump metrics to log.
|
||||||
|
*/
|
||||||
|
public class MetricsLoggerTask implements Runnable {
|
||||||
|
|
||||||
|
public static final Log LOG = LogFactory.getLog(MetricsLoggerTask.class);
|
||||||
|
|
||||||
|
private static ObjectName objectName = null;
|
||||||
|
|
||||||
|
static {
|
||||||
|
try {
|
||||||
|
objectName = new ObjectName("Hadoop:*");
|
||||||
|
} catch (MalformedObjectNameException m) {
|
||||||
|
// This should not occur in practice since we pass
|
||||||
|
// a valid pattern to the constructor above.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Log metricsLog;
|
||||||
|
private String nodeName;
|
||||||
|
private short maxLogLineLength;
|
||||||
|
|
||||||
|
public MetricsLoggerTask(Log metricsLog, String nodeName,
|
||||||
|
short maxLogLineLength) {
|
||||||
|
this.metricsLog = metricsLog;
|
||||||
|
this.nodeName = nodeName;
|
||||||
|
this.maxLogLineLength = maxLogLineLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write metrics to the metrics appender when invoked.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
// Skip querying metrics if there are no known appenders.
|
||||||
|
if (!metricsLog.isInfoEnabled() || !hasAppenders(metricsLog)
|
||||||
|
|| objectName == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
metricsLog.info(" >> Begin " + nodeName + " metrics dump");
|
||||||
|
final MBeanServer server = ManagementFactory.getPlatformMBeanServer();
|
||||||
|
|
||||||
|
// Iterate over each MBean.
|
||||||
|
for (final ObjectName mbeanName : server.queryNames(objectName, null)) {
|
||||||
|
try {
|
||||||
|
MBeanInfo mBeanInfo = server.getMBeanInfo(mbeanName);
|
||||||
|
final String mBeanNameName = MBeans.getMbeanNameName(mbeanName);
|
||||||
|
final Set<String> attributeNames = getFilteredAttributes(mBeanInfo);
|
||||||
|
|
||||||
|
final AttributeList attributes = server.getAttributes(mbeanName,
|
||||||
|
attributeNames.toArray(new String[attributeNames.size()]));
|
||||||
|
|
||||||
|
for (Object o : attributes) {
|
||||||
|
final Attribute attribute = (Attribute) o;
|
||||||
|
final Object value = attribute.getValue();
|
||||||
|
final String valueStr = (value != null) ? value.toString() : "null";
|
||||||
|
// Truncate the value if it is too long
|
||||||
|
metricsLog.info(mBeanNameName + ":" + attribute.getName() + "="
|
||||||
|
+ trimLine(valueStr));
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
metricsLog.error("Failed to get " + nodeName + " metrics for mbean "
|
||||||
|
+ mbeanName.toString(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
metricsLog.info(" << End " + nodeName + " metrics dump");
|
||||||
|
}
|
||||||
|
|
||||||
|
private String trimLine(String valueStr) {
|
||||||
|
if (maxLogLineLength <= 0) {
|
||||||
|
return valueStr;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (valueStr.length() < maxLogLineLength ? valueStr : valueStr
|
||||||
|
.substring(0, maxLogLineLength) + "...");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean hasAppenders(Log logger) {
|
||||||
|
if (!(logger instanceof Log4JLogger)) {
|
||||||
|
// Don't bother trying to determine the presence of appenders.
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
Log4JLogger log4JLogger = ((Log4JLogger) logger);
|
||||||
|
return log4JLogger.getLogger().getAllAppenders().hasMoreElements();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the list of attributes for the MBean, filtering out a few attribute
|
||||||
|
* types.
|
||||||
|
*/
|
||||||
|
private static Set<String> getFilteredAttributes(MBeanInfo mBeanInfo) {
|
||||||
|
Set<String> attributeNames = new HashSet<>();
|
||||||
|
for (MBeanAttributeInfo attributeInfo : mBeanInfo.getAttributes()) {
|
||||||
|
if (!attributeInfo.getType().equals(
|
||||||
|
"javax.management.openmbean.TabularData")
|
||||||
|
&& !attributeInfo.getType().equals(
|
||||||
|
"javax.management.openmbean.CompositeData")
|
||||||
|
&& !attributeInfo.getType().equals(
|
||||||
|
"[Ljavax.management.openmbean.CompositeData;")) {
|
||||||
|
attributeNames.add(attributeInfo.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return attributeNames;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make the metrics logger async and add all pre-existing appenders to the
|
||||||
|
* async appender.
|
||||||
|
*/
|
||||||
|
public static void makeMetricsLoggerAsync(Log metricsLog) {
|
||||||
|
if (!(metricsLog instanceof Log4JLogger)) {
|
||||||
|
LOG.warn("Metrics logging will not be async since "
|
||||||
|
+ "the logger is not log4j");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
org.apache.log4j.Logger logger = ((Log4JLogger) metricsLog).getLogger();
|
||||||
|
logger.setAdditivity(false); // Don't pollute actual logs with metrics dump
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
List<Appender> appenders = Collections.list(logger.getAllAppenders());
|
||||||
|
// failsafe against trying to async it more than once
|
||||||
|
if (!appenders.isEmpty() && !(appenders.get(0) instanceof AsyncAppender)) {
|
||||||
|
AsyncAppender asyncAppender = new AsyncAppender();
|
||||||
|
// change logger to have an async appender containing all the
|
||||||
|
// previously configured appenders
|
||||||
|
for (Appender appender : appenders) {
|
||||||
|
logger.removeAppender(appender);
|
||||||
|
asyncAppender.addAppender(appender);
|
||||||
|
}
|
||||||
|
logger.addAppender(asyncAppender);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -46,6 +46,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY;
|
||||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||||
|
|
||||||
import java.io.BufferedOutputStream;
|
import java.io.BufferedOutputStream;
|
||||||
|
@ -85,6 +87,8 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
@ -150,6 +154,7 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.MetricsLoggerTask;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
|
||||||
|
@ -279,6 +284,8 @@ public class DataNode extends ReconfigurableBase
|
||||||
Collections.unmodifiableList(
|
Collections.unmodifiableList(
|
||||||
Arrays.asList(DFS_DATANODE_DATA_DIR_KEY));
|
Arrays.asList(DFS_DATANODE_DATA_DIR_KEY));
|
||||||
|
|
||||||
|
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Use {@link NetUtils#createSocketAddr(String)} instead.
|
* Use {@link NetUtils#createSocketAddr(String)} instead.
|
||||||
*/
|
*/
|
||||||
|
@ -366,6 +373,8 @@ public class DataNode extends ReconfigurableBase
|
||||||
|
|
||||||
private long[] oobTimeouts; /** timeout value of each OOB type */
|
private long[] oobTimeouts; /** timeout value of each OOB type */
|
||||||
|
|
||||||
|
private ScheduledThreadPoolExecutor metricsLoggerTimer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a dummy DataNode for testing purpose.
|
* Creates a dummy DataNode for testing purpose.
|
||||||
*/
|
*/
|
||||||
|
@ -386,7 +395,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create the DataNode given a configuration, an array of dataDirs,
|
* Create the DataNode given a configuration, an array of dataDirs,
|
||||||
* and a namenode proxy
|
* and a namenode proxy.
|
||||||
*/
|
*/
|
||||||
DataNode(final Configuration conf,
|
DataNode(final Configuration conf,
|
||||||
final List<StorageLocation> dataDirs,
|
final List<StorageLocation> dataDirs,
|
||||||
|
@ -1173,6 +1182,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
saslClient = new SaslDataTransferClient(dnConf.conf,
|
saslClient = new SaslDataTransferClient(dnConf.conf,
|
||||||
dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
|
dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
|
||||||
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
|
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
|
||||||
|
startMetricsLogger(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1679,6 +1689,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
* Otherwise, deadlock might occur.
|
* Otherwise, deadlock might occur.
|
||||||
*/
|
*/
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
|
stopMetricsLogger();
|
||||||
if (plugins != null) {
|
if (plugins != null) {
|
||||||
for (ServicePlugin p : plugins) {
|
for (ServicePlugin p : plugins) {
|
||||||
try {
|
try {
|
||||||
|
@ -3297,4 +3308,41 @@ public class DataNode extends ReconfigurableBase
|
||||||
|
|
||||||
return oobTimeouts[status.getNumber() - Status.OOB_RESTART_VALUE];
|
return oobTimeouts[status.getNumber() - Status.OOB_RESTART_VALUE];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start a timer to periodically write DataNode metrics to the log file. This
|
||||||
|
* behavior can be disabled by configuration.
|
||||||
|
*
|
||||||
|
* @param metricConf
|
||||||
|
*/
|
||||||
|
protected void startMetricsLogger(Configuration metricConf) {
|
||||||
|
long metricsLoggerPeriodSec = metricConf.getInt(
|
||||||
|
DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY,
|
||||||
|
DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT);
|
||||||
|
|
||||||
|
if (metricsLoggerPeriodSec <= 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
MetricsLoggerTask.makeMetricsLoggerAsync(METRICS_LOG);
|
||||||
|
|
||||||
|
// Schedule the periodic logging.
|
||||||
|
metricsLoggerTimer = new ScheduledThreadPoolExecutor(1);
|
||||||
|
metricsLoggerTimer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
||||||
|
metricsLoggerTimer.scheduleWithFixedDelay(new MetricsLoggerTask(METRICS_LOG,
|
||||||
|
"DataNode", (short) 0), metricsLoggerPeriodSec, metricsLoggerPeriodSec,
|
||||||
|
TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void stopMetricsLogger() {
|
||||||
|
if (metricsLoggerTimer != null) {
|
||||||
|
metricsLoggerTimer.shutdown();
|
||||||
|
metricsLoggerTimer = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
ScheduledThreadPoolExecutor getMetricsLoggerTimer() {
|
||||||
|
return metricsLoggerTimer;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,6 @@ import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -45,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.MetricsLoggerTask;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
|
import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby;
|
import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
||||||
|
@ -77,32 +77,20 @@ import org.apache.hadoop.util.ExitUtil.ExitException;
|
||||||
import org.apache.hadoop.util.JvmPauseMonitor;
|
import org.apache.hadoop.util.JvmPauseMonitor;
|
||||||
import org.apache.hadoop.util.ServicePlugin;
|
import org.apache.hadoop.util.ServicePlugin;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.log4j.Appender;
|
|
||||||
import org.apache.log4j.AsyncAppender;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.management.Attribute;
|
|
||||||
import javax.management.AttributeList;
|
|
||||||
import javax.management.MBeanAttributeInfo;
|
|
||||||
import javax.management.MBeanInfo;
|
|
||||||
import javax.management.MBeanServer;
|
|
||||||
import javax.management.MalformedObjectNameException;
|
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.lang.management.ManagementFactory;
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -711,46 +699,19 @@ public class NameNode implements NameNodeStatusMXBean {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
makeMetricsLoggerAsync();
|
MetricsLoggerTask.makeMetricsLoggerAsync(MetricsLog);
|
||||||
|
|
||||||
// Schedule the periodic logging.
|
// Schedule the periodic logging.
|
||||||
metricsLoggerTimer = new ScheduledThreadPoolExecutor(1);
|
metricsLoggerTimer = new ScheduledThreadPoolExecutor(1);
|
||||||
metricsLoggerTimer.setExecuteExistingDelayedTasksAfterShutdownPolicy(
|
metricsLoggerTimer.setExecuteExistingDelayedTasksAfterShutdownPolicy(
|
||||||
false);
|
false);
|
||||||
metricsLoggerTimer.scheduleWithFixedDelay(new MetricsLoggerTask(),
|
metricsLoggerTimer.scheduleWithFixedDelay(new MetricsLoggerTask(MetricsLog,
|
||||||
|
"NameNode", (short) 128),
|
||||||
metricsLoggerPeriodSec,
|
metricsLoggerPeriodSec,
|
||||||
metricsLoggerPeriodSec,
|
metricsLoggerPeriodSec,
|
||||||
TimeUnit.SECONDS);
|
TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Make the metrics logger async and add all pre-existing appenders
|
|
||||||
* to the async appender.
|
|
||||||
*/
|
|
||||||
private static void makeMetricsLoggerAsync() {
|
|
||||||
if (!(MetricsLog instanceof Log4JLogger)) {
|
|
||||||
LOG.warn(
|
|
||||||
"Metrics logging will not be async since the logger is not log4j");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
org.apache.log4j.Logger logger = ((Log4JLogger) MetricsLog).getLogger();
|
|
||||||
logger.setAdditivity(false); // Don't pollute NN logs with metrics dump
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
List<Appender> appenders = Collections.list(logger.getAllAppenders());
|
|
||||||
// failsafe against trying to async it more than once
|
|
||||||
if (!appenders.isEmpty() && !(appenders.get(0) instanceof AsyncAppender)) {
|
|
||||||
AsyncAppender asyncAppender = new AsyncAppender();
|
|
||||||
// change logger to have an async appender containing all the
|
|
||||||
// previously configured appenders
|
|
||||||
for (Appender appender : appenders) {
|
|
||||||
logger.removeAppender(appender);
|
|
||||||
asyncAppender.addAppender(appender);
|
|
||||||
}
|
|
||||||
logger.addAppender(asyncAppender);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void stopMetricsLogger() {
|
protected void stopMetricsLogger() {
|
||||||
if (metricsLoggerTimer != null) {
|
if (metricsLoggerTimer != null) {
|
||||||
metricsLoggerTimer.shutdown();
|
metricsLoggerTimer.shutdown();
|
||||||
|
@ -1932,91 +1893,4 @@ public class NameNode implements NameNodeStatusMXBean {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class MetricsLoggerTask implements Runnable {
|
|
||||||
private static final int MAX_LOGGED_VALUE_LEN = 128;
|
|
||||||
private static ObjectName OBJECT_NAME = null;
|
|
||||||
|
|
||||||
static {
|
|
||||||
try {
|
|
||||||
OBJECT_NAME = new ObjectName("Hadoop:*");
|
|
||||||
} catch (MalformedObjectNameException m) {
|
|
||||||
// This should not occur in practice since we pass
|
|
||||||
// a valid pattern to the constructor above.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Write NameNode metrics to the metrics appender when invoked.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
// Skip querying metrics if there are no known appenders.
|
|
||||||
if (!MetricsLog.isInfoEnabled() ||
|
|
||||||
!hasAppenders(MetricsLog) ||
|
|
||||||
OBJECT_NAME == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
MetricsLog.info(" >> Begin NameNode metrics dump");
|
|
||||||
final MBeanServer server = ManagementFactory.getPlatformMBeanServer();
|
|
||||||
|
|
||||||
// Iterate over each MBean.
|
|
||||||
for (final ObjectName mbeanName : server.queryNames(OBJECT_NAME, null)) {
|
|
||||||
try {
|
|
||||||
MBeanInfo mBeanInfo = server.getMBeanInfo(mbeanName);
|
|
||||||
final String mBeanNameName = MBeans.getMbeanNameName(mbeanName);
|
|
||||||
final Set<String> attributeNames = getFilteredAttributes(mBeanInfo);
|
|
||||||
|
|
||||||
final AttributeList attributes =
|
|
||||||
server.getAttributes(mbeanName,
|
|
||||||
attributeNames.toArray(new String[attributeNames.size()]));
|
|
||||||
|
|
||||||
for (Object o : attributes) {
|
|
||||||
final Attribute attribute = (Attribute) o;
|
|
||||||
final Object value = attribute.getValue();
|
|
||||||
final String valueStr =
|
|
||||||
(value != null) ? value.toString() : "null";
|
|
||||||
// Truncate the value if it is too long
|
|
||||||
MetricsLog.info(mBeanNameName + ":" + attribute.getName() + "=" +
|
|
||||||
(valueStr.length() < MAX_LOGGED_VALUE_LEN ? valueStr :
|
|
||||||
valueStr.substring(0, MAX_LOGGED_VALUE_LEN) + "..."));
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
MetricsLog.error("Failed to get NameNode metrics for mbean " +
|
|
||||||
mbeanName.toString(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
MetricsLog.info(" << End NameNode metrics dump");
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean hasAppenders(Log logger) {
|
|
||||||
if (!(logger instanceof Log4JLogger)) {
|
|
||||||
// Don't bother trying to determine the presence of appenders.
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
Log4JLogger log4JLogger = ((Log4JLogger) MetricsLog);
|
|
||||||
return log4JLogger.getLogger().getAllAppenders().hasMoreElements();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the list of attributes for the MBean, filtering out a few
|
|
||||||
* attribute types.
|
|
||||||
*/
|
|
||||||
private static Set<String> getFilteredAttributes(
|
|
||||||
MBeanInfo mBeanInfo) {
|
|
||||||
Set<String> attributeNames = new HashSet<>();
|
|
||||||
for (MBeanAttributeInfo attributeInfo : mBeanInfo.getAttributes()) {
|
|
||||||
if (!attributeInfo.getType().equals(
|
|
||||||
"javax.management.openmbean.TabularData") &&
|
|
||||||
!attributeInfo.getType().equals(
|
|
||||||
"javax.management.openmbean.CompositeData") &&
|
|
||||||
!attributeInfo.getType().equals(
|
|
||||||
"[Ljavax.management.openmbean.CompositeData;")) {
|
|
||||||
attributeNames.add(attributeInfo.getName());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return attributeNames;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1587,6 +1587,18 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.datanode.metrics.logger.period.seconds</name>
|
||||||
|
<value>600</value>
|
||||||
|
<description>
|
||||||
|
This setting controls how frequently the DataNode logs its metrics. The
|
||||||
|
logging configuration must also define one or more appenders for
|
||||||
|
DataNodeMetricsLog for the metrics to be logged.
|
||||||
|
DataNode metrics logging is disabled if this value is set to zero or
|
||||||
|
less than zero.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.metrics.percentiles.intervals</name>
|
<name>dfs.metrics.percentiles.intervals</name>
|
||||||
<value></value>
|
<value></value>
|
||||||
|
|
|
@ -19,21 +19,38 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
||||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
@ -44,6 +61,9 @@ import com.google.common.base.Preconditions;
|
||||||
public class DataNodeTestUtils {
|
public class DataNodeTestUtils {
|
||||||
private static final String DIR_FAILURE_SUFFIX = ".origin";
|
private static final String DIR_FAILURE_SUFFIX = ".origin";
|
||||||
|
|
||||||
|
public final static String TEST_CLUSTER_ID = "testClusterID";
|
||||||
|
public final static String TEST_POOL_ID = "BP-TEST";
|
||||||
|
|
||||||
public static DatanodeRegistration
|
public static DatanodeRegistration
|
||||||
getDNRegistrationForBP(DataNode dn, String bpid) throws IOException {
|
getDNRegistrationForBP(DataNode dn, String bpid) throws IOException {
|
||||||
return dn.getDNRegistrationForBP(bpid);
|
return dn.getDNRegistrationForBP(bpid);
|
||||||
|
@ -231,4 +251,61 @@ public class DataNodeTestUtils {
|
||||||
dn.getDirectoryScanner().reconcile();
|
dn.getDirectoryScanner().reconcile();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts an instance of DataNode with NN mocked. Called should ensure to
|
||||||
|
* shutdown the DN
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static DataNode startDNWithMockNN(Configuration conf,
|
||||||
|
final InetSocketAddress nnSocketAddr, final String dnDataDir)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
FileSystem.setDefaultUri(conf, "hdfs://" + nnSocketAddr.getHostName() + ":"
|
||||||
|
+ nnSocketAddr.getPort());
|
||||||
|
ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>();
|
||||||
|
File dataDir = new File(dnDataDir);
|
||||||
|
FileUtil.fullyDelete(dataDir);
|
||||||
|
dataDir.mkdirs();
|
||||||
|
StorageLocation location = StorageLocation.parse(dataDir.getPath());
|
||||||
|
locations.add(location);
|
||||||
|
|
||||||
|
final DatanodeProtocolClientSideTranslatorPB namenode =
|
||||||
|
mock(DatanodeProtocolClientSideTranslatorPB.class);
|
||||||
|
|
||||||
|
Mockito.doAnswer(new Answer<DatanodeRegistration>() {
|
||||||
|
@Override
|
||||||
|
public DatanodeRegistration answer(InvocationOnMock invocation)
|
||||||
|
throws Throwable {
|
||||||
|
return (DatanodeRegistration) invocation.getArguments()[0];
|
||||||
|
}
|
||||||
|
}).when(namenode).registerDatanode(Mockito.any(DatanodeRegistration.class));
|
||||||
|
|
||||||
|
when(namenode.versionRequest()).thenReturn(
|
||||||
|
new NamespaceInfo(1, TEST_CLUSTER_ID, TEST_POOL_ID, 1L));
|
||||||
|
|
||||||
|
when(
|
||||||
|
namenode.sendHeartbeat(Mockito.any(DatanodeRegistration.class),
|
||||||
|
Mockito.any(StorageReport[].class), Mockito.anyLong(),
|
||||||
|
Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(),
|
||||||
|
Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class),
|
||||||
|
Mockito.anyBoolean())).thenReturn(
|
||||||
|
new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
|
||||||
|
HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
|
||||||
|
.nextLong() | 1L));
|
||||||
|
|
||||||
|
DataNode dn = new DataNode(conf, locations, null) {
|
||||||
|
@Override
|
||||||
|
DatanodeProtocolClientSideTranslatorPB connectToNN(
|
||||||
|
InetSocketAddress nnAddr) throws IOException {
|
||||||
|
Assert.assertEquals(nnSocketAddr, nnAddr);
|
||||||
|
return namenode;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// Trigger a heartbeat so that it acknowledges the NN as active.
|
||||||
|
dn.getAllBpOs().get(0).triggerHeartbeatForTests();
|
||||||
|
|
||||||
|
return dn;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,224 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p/>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p/>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.metrics2.util.MBeans;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.log4j.Appender;
|
||||||
|
import org.apache.log4j.AppenderSkeleton;
|
||||||
|
import org.apache.log4j.AsyncAppender;
|
||||||
|
import org.apache.log4j.spi.LoggingEvent;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.Timeout;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test periodic logging of DataNode metrics.
|
||||||
|
*/
|
||||||
|
public class TestDataNodeMetricsLogger {
|
||||||
|
static final Log LOG = LogFactory.getLog(TestDataNodeMetricsLogger.class);
|
||||||
|
|
||||||
|
private static final String DATA_DIR = MiniDFSCluster.getBaseDirectory()
|
||||||
|
+ "data";
|
||||||
|
|
||||||
|
private final static InetSocketAddress NN_ADDR = new InetSocketAddress(
|
||||||
|
"localhost", 5020);
|
||||||
|
|
||||||
|
private DataNode dn;
|
||||||
|
|
||||||
|
static final Random random = new Random(System.currentTimeMillis());
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public Timeout timeout = new Timeout(300000);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts an instance of DataNode
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void startDNForTest(boolean enableMetricsLogging) throws IOException {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR);
|
||||||
|
conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
|
conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
|
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
|
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
||||||
|
conf.setInt(DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY,
|
||||||
|
enableMetricsLogging ? 1 : 0); // If enabled, log early and log often
|
||||||
|
|
||||||
|
dn = DataNodeTestUtils.startDNWithMockNN(conf, NN_ADDR, DATA_DIR);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleans the resources and closes the instance of datanode
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* if an error occurred
|
||||||
|
*/
|
||||||
|
@After
|
||||||
|
public void tearDown() throws IOException {
|
||||||
|
if (dn != null) {
|
||||||
|
try {
|
||||||
|
dn.shutdown();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Cannot close: ", e);
|
||||||
|
} finally {
|
||||||
|
File dir = new File(DATA_DIR);
|
||||||
|
if (dir.exists())
|
||||||
|
Assert.assertTrue("Cannot delete data-node dirs",
|
||||||
|
FileUtil.fullyDelete(dir));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dn = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetricsLoggerOnByDefault() throws IOException {
|
||||||
|
startDNForTest(true);
|
||||||
|
assertNotNull(dn);
|
||||||
|
assertNotNull(dn.getMetricsLoggerTimer());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDisableMetricsLogger() throws IOException {
|
||||||
|
startDNForTest(false);
|
||||||
|
assertNotNull(dn);
|
||||||
|
assertNull(dn.getMetricsLoggerTimer());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetricsLoggerIsAsync() throws IOException {
|
||||||
|
startDNForTest(true);
|
||||||
|
assertNotNull(dn);
|
||||||
|
org.apache.log4j.Logger logger = ((Log4JLogger) DataNode.METRICS_LOG)
|
||||||
|
.getLogger();
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
List<Appender> appenders = Collections.list(logger.getAllAppenders());
|
||||||
|
assertTrue(appenders.get(0) instanceof AsyncAppender);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Publish a fake metric under the "Hadoop:" domain and ensure it is logged by
|
||||||
|
* the metrics logger.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMetricsLogOutput() throws IOException, InterruptedException,
|
||||||
|
TimeoutException {
|
||||||
|
TestFakeMetric metricsProvider = new TestFakeMetric();
|
||||||
|
MBeans.register(this.getClass().getSimpleName(), "DummyMetrics",
|
||||||
|
metricsProvider);
|
||||||
|
startDNForTest(true);
|
||||||
|
assertNotNull(dn);
|
||||||
|
final PatternMatchingAppender appender = new PatternMatchingAppender(
|
||||||
|
"^.*FakeMetric.*$");
|
||||||
|
addAppender(DataNode.METRICS_LOG, appender);
|
||||||
|
|
||||||
|
// Ensure that the supplied pattern was matched.
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return appender.isMatched();
|
||||||
|
}
|
||||||
|
}, 1000, 60000);
|
||||||
|
|
||||||
|
dn.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addAppender(Log log, Appender appender) {
|
||||||
|
org.apache.log4j.Logger logger = ((Log4JLogger) log).getLogger();
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
List<Appender> appenders = Collections.list(logger.getAllAppenders());
|
||||||
|
((AsyncAppender) appenders.get(0)).addAppender(appender);
|
||||||
|
}
|
||||||
|
|
||||||
|
public interface TestFakeMetricMXBean {
|
||||||
|
int getFakeMetric();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MBean for testing
|
||||||
|
*/
|
||||||
|
public static class TestFakeMetric implements TestFakeMetricMXBean {
|
||||||
|
@Override
|
||||||
|
public int getFakeMetric() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An appender that matches logged messages against the given regular
|
||||||
|
* expression.
|
||||||
|
*/
|
||||||
|
public static class PatternMatchingAppender extends AppenderSkeleton {
|
||||||
|
private final Pattern pattern;
|
||||||
|
private volatile boolean matched;
|
||||||
|
|
||||||
|
public PatternMatchingAppender(String pattern) {
|
||||||
|
this.pattern = Pattern.compile(pattern);
|
||||||
|
this.matched = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isMatched() {
|
||||||
|
return matched;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void append(LoggingEvent event) {
|
||||||
|
if (pattern.matcher(event.getMessage().toString()).matches()) {
|
||||||
|
matched = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean requiresLayout() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -34,3 +34,16 @@ log4j.appender.NNMETRICSRFA.layout=org.apache.log4j.PatternLayout
|
||||||
log4j.appender.NNMETRICSRFA.layout.ConversionPattern=%d{ISO8601} %m%n
|
log4j.appender.NNMETRICSRFA.layout.ConversionPattern=%d{ISO8601} %m%n
|
||||||
log4j.appender.NNMETRICSRFA.MaxBackupIndex=1
|
log4j.appender.NNMETRICSRFA.MaxBackupIndex=1
|
||||||
log4j.appender.NNMETRICSRFA.MaxFileSize=64MB
|
log4j.appender.NNMETRICSRFA.MaxFileSize=64MB
|
||||||
|
|
||||||
|
#
|
||||||
|
# DataNode metrics logging.
|
||||||
|
# The default is to retain two datanode-metrics.log files up to 64MB each.
|
||||||
|
#
|
||||||
|
log4j.logger.DataNodeMetricsLog=INFO,DNMETRICSRFA
|
||||||
|
log4j.additivity.DataNodeMetricsLog=false
|
||||||
|
log4j.appender.DNMETRICSRFA=org.apache.log4j.RollingFileAppender
|
||||||
|
log4j.appender.DNMETRICSRFA.File=${hadoop.log.dir}/datanode-metrics.log
|
||||||
|
log4j.appender.DNMETRICSRFA.layout=org.apache.log4j.PatternLayout
|
||||||
|
log4j.appender.DNMETRICSRFA.layout.ConversionPattern=%d{ISO8601} %m%n
|
||||||
|
log4j.appender.DNMETRICSRFA.MaxBackupIndex=1
|
||||||
|
log4j.appender.DNMETRICSRFA.MaxFileSize=64MB
|
||||||
|
|
Loading…
Reference in New Issue