diff --git a/hadoop-common-project/hadoop-common/src/main/conf/log4j.properties b/hadoop-common-project/hadoop-common/src/main/conf/log4j.properties index c26fed4e9ba..299caa85160 100644 --- a/hadoop-common-project/hadoop-common/src/main/conf/log4j.properties +++ b/hadoop-common-project/hadoop-common/src/main/conf/log4j.properties @@ -162,6 +162,20 @@ log4j.appender.NNMETRICSRFA.layout.ConversionPattern=%d{ISO8601} %m%n log4j.appender.NNMETRICSRFA.MaxBackupIndex=1 log4j.appender.NNMETRICSRFA.MaxFileSize=64MB +# +# DataNode metrics logging. +# The default is to retain two datanode-metrics.log files up to 64MB each. +# +datanode.metrics.logger=INFO,NullAppender +log4j.logger.DataNodeMetricsLog=${datanode.metrics.logger} +log4j.additivity.DataNodeMetricsLog=false +log4j.appender.DNMETRICSRFA=org.apache.log4j.RollingFileAppender +log4j.appender.DNMETRICSRFA.File=${hadoop.log.dir}/datanode-metrics.log +log4j.appender.DNMETRICSRFA.layout=org.apache.log4j.PatternLayout +log4j.appender.DNMETRICSRFA.layout.ConversionPattern=%d{ISO8601} %m%n +log4j.appender.DNMETRICSRFA.MaxBackupIndex=1 +log4j.appender.DNMETRICSRFA.MaxFileSize=64MB + # # mapred audit logging # diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 891aefc9b2d..16045a455a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -570,6 +570,8 @@ Release 2.8.0 - UNRELEASED HDFS-9008. Balancer#Parameters class could use a builder pattern. (Chris Trezzo via mingma) + HDFS-8953. DataNode Metrics logging (Kanaka Kumar Avvaru via vinayakumarb) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index aad6ca78a2b..f1e66b34e77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -360,6 +360,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.namenode.metrics.logger.period.seconds"; public static final int DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT = 600; + public static final String DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY = + "dfs.datanode.metrics.logger.period.seconds"; + public static final int DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT = + 600; public static final String DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth"; public static final long DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/MetricsLoggerTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/MetricsLoggerTask.java new file mode 100644 index 00000000000..40c048c5a3e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/MetricsLoggerTask.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.common; + +import java.lang.management.ManagementFactory; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import javax.management.Attribute; +import javax.management.AttributeList; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanInfo; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.log4j.Appender; +import org.apache.log4j.AsyncAppender; + +/** + * MetricsLoggerTask can be used as utility to dump metrics to log. + */ +public class MetricsLoggerTask implements Runnable { + + public static final Log LOG = LogFactory.getLog(MetricsLoggerTask.class); + + private static ObjectName objectName = null; + + static { + try { + objectName = new ObjectName("Hadoop:*"); + } catch (MalformedObjectNameException m) { + // This should not occur in practice since we pass + // a valid pattern to the constructor above. + } + } + + private Log metricsLog; + private String nodeName; + private short maxLogLineLength; + + public MetricsLoggerTask(Log metricsLog, String nodeName, + short maxLogLineLength) { + this.metricsLog = metricsLog; + this.nodeName = nodeName; + this.maxLogLineLength = maxLogLineLength; + } + + /** + * Write metrics to the metrics appender when invoked. + */ + @Override + public void run() { + // Skip querying metrics if there are no known appenders. + if (!metricsLog.isInfoEnabled() || !hasAppenders(metricsLog) + || objectName == null) { + return; + } + + metricsLog.info(" >> Begin " + nodeName + " metrics dump"); + final MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + + // Iterate over each MBean. + for (final ObjectName mbeanName : server.queryNames(objectName, null)) { + try { + MBeanInfo mBeanInfo = server.getMBeanInfo(mbeanName); + final String mBeanNameName = MBeans.getMbeanNameName(mbeanName); + final Set attributeNames = getFilteredAttributes(mBeanInfo); + + final AttributeList attributes = server.getAttributes(mbeanName, + attributeNames.toArray(new String[attributeNames.size()])); + + for (Object o : attributes) { + final Attribute attribute = (Attribute) o; + final Object value = attribute.getValue(); + final String valueStr = (value != null) ? value.toString() : "null"; + // Truncate the value if it is too long + metricsLog.info(mBeanNameName + ":" + attribute.getName() + "=" + + trimLine(valueStr)); + } + } catch (Exception e) { + metricsLog.error("Failed to get " + nodeName + " metrics for mbean " + + mbeanName.toString(), e); + } + } + metricsLog.info(" << End " + nodeName + " metrics dump"); + } + + private String trimLine(String valueStr) { + if (maxLogLineLength <= 0) { + return valueStr; + } + + return (valueStr.length() < maxLogLineLength ? valueStr : valueStr + .substring(0, maxLogLineLength) + "..."); + } + + private static boolean hasAppenders(Log logger) { + if (!(logger instanceof Log4JLogger)) { + // Don't bother trying to determine the presence of appenders. + return true; + } + Log4JLogger log4JLogger = ((Log4JLogger) logger); + return log4JLogger.getLogger().getAllAppenders().hasMoreElements(); + } + + /** + * Get the list of attributes for the MBean, filtering out a few attribute + * types. + */ + private static Set getFilteredAttributes(MBeanInfo mBeanInfo) { + Set attributeNames = new HashSet<>(); + for (MBeanAttributeInfo attributeInfo : mBeanInfo.getAttributes()) { + if (!attributeInfo.getType().equals( + "javax.management.openmbean.TabularData") + && !attributeInfo.getType().equals( + "javax.management.openmbean.CompositeData") + && !attributeInfo.getType().equals( + "[Ljavax.management.openmbean.CompositeData;")) { + attributeNames.add(attributeInfo.getName()); + } + } + return attributeNames; + } + + /** + * Make the metrics logger async and add all pre-existing appenders to the + * async appender. + */ + public static void makeMetricsLoggerAsync(Log metricsLog) { + if (!(metricsLog instanceof Log4JLogger)) { + LOG.warn("Metrics logging will not be async since " + + "the logger is not log4j"); + return; + } + org.apache.log4j.Logger logger = ((Log4JLogger) metricsLog).getLogger(); + logger.setAdditivity(false); // Don't pollute actual logs with metrics dump + + @SuppressWarnings("unchecked") + List appenders = Collections.list(logger.getAllAppenders()); + // failsafe against trying to async it more than once + if (!appenders.isEmpty() && !(appenders.get(0) instanceof AsyncAppender)) { + AsyncAppender asyncAppender = new AsyncAppender(); + // change logger to have an async appender containing all the + // previously configured appenders + for (Appender appender : appenders) { + logger.removeAppender(appender); + asyncAppender.addAppender(appender); + } + logger.addAppender(asyncAppender); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index ad718a51484..c883c69f85a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -46,6 +46,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY; import static org.apache.hadoop.util.ExitUtil.terminate; import java.io.BufferedOutputStream; @@ -85,6 +87,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.management.ObjectName; @@ -150,6 +154,7 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; +import org.apache.hadoop.hdfs.server.common.MetricsLoggerTask; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; @@ -279,6 +284,8 @@ public class DataNode extends ReconfigurableBase Collections.unmodifiableList( Arrays.asList(DFS_DATANODE_DATA_DIR_KEY)); + public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog"); + /** * Use {@link NetUtils#createSocketAddr(String)} instead. */ @@ -366,6 +373,8 @@ public class DataNode extends ReconfigurableBase private long[] oobTimeouts; /** timeout value of each OOB type */ + private ScheduledThreadPoolExecutor metricsLoggerTimer; + /** * Creates a dummy DataNode for testing purpose. */ @@ -386,7 +395,7 @@ public class DataNode extends ReconfigurableBase /** * Create the DataNode given a configuration, an array of dataDirs, - * and a namenode proxy + * and a namenode proxy. */ DataNode(final Configuration conf, final List dataDirs, @@ -1173,6 +1182,7 @@ public class DataNode extends ReconfigurableBase saslClient = new SaslDataTransferClient(dnConf.conf, dnConf.saslPropsResolver, dnConf.trustedChannelResolver); saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); + startMetricsLogger(conf); } /** @@ -1679,6 +1689,7 @@ public class DataNode extends ReconfigurableBase * Otherwise, deadlock might occur. */ public void shutdown() { + stopMetricsLogger(); if (plugins != null) { for (ServicePlugin p : plugins) { try { @@ -3297,4 +3308,41 @@ public class DataNode extends ReconfigurableBase return oobTimeouts[status.getNumber() - Status.OOB_RESTART_VALUE]; } + + /** + * Start a timer to periodically write DataNode metrics to the log file. This + * behavior can be disabled by configuration. + * + * @param metricConf + */ + protected void startMetricsLogger(Configuration metricConf) { + long metricsLoggerPeriodSec = metricConf.getInt( + DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, + DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT); + + if (metricsLoggerPeriodSec <= 0) { + return; + } + + MetricsLoggerTask.makeMetricsLoggerAsync(METRICS_LOG); + + // Schedule the periodic logging. + metricsLoggerTimer = new ScheduledThreadPoolExecutor(1); + metricsLoggerTimer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + metricsLoggerTimer.scheduleWithFixedDelay(new MetricsLoggerTask(METRICS_LOG, + "DataNode", (short) 0), metricsLoggerPeriodSec, metricsLoggerPeriodSec, + TimeUnit.SECONDS); + } + + protected void stopMetricsLogger() { + if (metricsLoggerTimer != null) { + metricsLoggerTimer.shutdown(); + metricsLoggerTimer = null; + } + } + + @VisibleForTesting + ScheduledThreadPoolExecutor getMetricsLoggerTimer() { + return metricsLoggerTimer; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 5ea39192fd9..aec2e98b02a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -23,7 +23,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -45,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; +import org.apache.hadoop.hdfs.server.common.MetricsLoggerTask; import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState; import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; @@ -77,32 +77,20 @@ import org.apache.hadoop.util.ExitUtil.ExitException; import org.apache.hadoop.util.JvmPauseMonitor; import org.apache.hadoop.util.ServicePlugin; import org.apache.hadoop.util.StringUtils; -import org.apache.log4j.Appender; -import org.apache.log4j.AsyncAppender; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.management.Attribute; -import javax.management.AttributeList; -import javax.management.MBeanAttributeInfo; -import javax.management.MBeanInfo; -import javax.management.MBeanServer; -import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import java.io.IOException; import java.io.PrintStream; -import java.lang.management.ManagementFactory; import java.net.InetSocketAddress; import java.net.URI; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -711,46 +699,19 @@ public class NameNode implements NameNodeStatusMXBean { return; } - makeMetricsLoggerAsync(); + MetricsLoggerTask.makeMetricsLoggerAsync(MetricsLog); // Schedule the periodic logging. metricsLoggerTimer = new ScheduledThreadPoolExecutor(1); metricsLoggerTimer.setExecuteExistingDelayedTasksAfterShutdownPolicy( false); - metricsLoggerTimer.scheduleWithFixedDelay(new MetricsLoggerTask(), + metricsLoggerTimer.scheduleWithFixedDelay(new MetricsLoggerTask(MetricsLog, + "NameNode", (short) 128), metricsLoggerPeriodSec, metricsLoggerPeriodSec, TimeUnit.SECONDS); } - /** - * Make the metrics logger async and add all pre-existing appenders - * to the async appender. - */ - private static void makeMetricsLoggerAsync() { - if (!(MetricsLog instanceof Log4JLogger)) { - LOG.warn( - "Metrics logging will not be async since the logger is not log4j"); - return; - } - org.apache.log4j.Logger logger = ((Log4JLogger) MetricsLog).getLogger(); - logger.setAdditivity(false); // Don't pollute NN logs with metrics dump - - @SuppressWarnings("unchecked") - List appenders = Collections.list(logger.getAllAppenders()); - // failsafe against trying to async it more than once - if (!appenders.isEmpty() && !(appenders.get(0) instanceof AsyncAppender)) { - AsyncAppender asyncAppender = new AsyncAppender(); - // change logger to have an async appender containing all the - // previously configured appenders - for (Appender appender : appenders) { - logger.removeAppender(appender); - asyncAppender.addAppender(appender); - } - logger.addAppender(asyncAppender); - } - } - protected void stopMetricsLogger() { if (metricsLoggerTimer != null) { metricsLoggerTimer.shutdown(); @@ -1932,91 +1893,4 @@ public class NameNode implements NameNodeStatusMXBean { break; } } - - private static class MetricsLoggerTask implements Runnable { - private static final int MAX_LOGGED_VALUE_LEN = 128; - private static ObjectName OBJECT_NAME = null; - - static { - try { - OBJECT_NAME = new ObjectName("Hadoop:*"); - } catch (MalformedObjectNameException m) { - // This should not occur in practice since we pass - // a valid pattern to the constructor above. - } - } - - /** - * Write NameNode metrics to the metrics appender when invoked. - */ - @Override - public void run() { - // Skip querying metrics if there are no known appenders. - if (!MetricsLog.isInfoEnabled() || - !hasAppenders(MetricsLog) || - OBJECT_NAME == null) { - return; - } - - MetricsLog.info(" >> Begin NameNode metrics dump"); - final MBeanServer server = ManagementFactory.getPlatformMBeanServer(); - - // Iterate over each MBean. - for (final ObjectName mbeanName : server.queryNames(OBJECT_NAME, null)) { - try { - MBeanInfo mBeanInfo = server.getMBeanInfo(mbeanName); - final String mBeanNameName = MBeans.getMbeanNameName(mbeanName); - final Set attributeNames = getFilteredAttributes(mBeanInfo); - - final AttributeList attributes = - server.getAttributes(mbeanName, - attributeNames.toArray(new String[attributeNames.size()])); - - for (Object o : attributes) { - final Attribute attribute = (Attribute) o; - final Object value = attribute.getValue(); - final String valueStr = - (value != null) ? value.toString() : "null"; - // Truncate the value if it is too long - MetricsLog.info(mBeanNameName + ":" + attribute.getName() + "=" + - (valueStr.length() < MAX_LOGGED_VALUE_LEN ? valueStr : - valueStr.substring(0, MAX_LOGGED_VALUE_LEN) + "...")); - } - } catch (Exception e) { - MetricsLog.error("Failed to get NameNode metrics for mbean " + - mbeanName.toString(), e); - } - } - MetricsLog.info(" << End NameNode metrics dump"); - } - - private static boolean hasAppenders(Log logger) { - if (!(logger instanceof Log4JLogger)) { - // Don't bother trying to determine the presence of appenders. - return true; - } - Log4JLogger log4JLogger = ((Log4JLogger) MetricsLog); - return log4JLogger.getLogger().getAllAppenders().hasMoreElements(); - } - - /** - * Get the list of attributes for the MBean, filtering out a few - * attribute types. - */ - private static Set getFilteredAttributes( - MBeanInfo mBeanInfo) { - Set attributeNames = new HashSet<>(); - for (MBeanAttributeInfo attributeInfo : mBeanInfo.getAttributes()) { - if (!attributeInfo.getType().equals( - "javax.management.openmbean.TabularData") && - !attributeInfo.getType().equals( - "javax.management.openmbean.CompositeData") && - !attributeInfo.getType().equals( - "[Ljavax.management.openmbean.CompositeData;")) { - attributeNames.add(attributeInfo.getName()); - } - } - return attributeNames; - } - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index d5bc913d737..9adf3599dc1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1587,6 +1587,18 @@ + + dfs.datanode.metrics.logger.period.seconds + 600 + + This setting controls how frequently the DataNode logs its metrics. The + logging configuration must also define one or more appenders for + DataNodeMetricsLog for the metrics to be logged. + DataNode metrics logging is disabled if this value is set to zero or + less than zero. + + + dfs.metrics.percentiles.intervals diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java index b4071de7b13..8b43787d2d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java @@ -19,21 +19,38 @@ package org.apache.hadoop.hdfs.server.datanode; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.File; import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; +import org.junit.Assert; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import com.google.common.base.Preconditions; @@ -44,7 +61,10 @@ import com.google.common.base.Preconditions; public class DataNodeTestUtils { private static final String DIR_FAILURE_SUFFIX = ".origin"; - public static DatanodeRegistration + public final static String TEST_CLUSTER_ID = "testClusterID"; + public final static String TEST_POOL_ID = "BP-TEST"; + + public static DatanodeRegistration getDNRegistrationForBP(DataNode dn, String bpid) throws IOException { return dn.getDNRegistrationForBP(bpid); } @@ -231,4 +251,61 @@ public class DataNodeTestUtils { dn.getDirectoryScanner().reconcile(); } } + + /** + * Starts an instance of DataNode with NN mocked. Called should ensure to + * shutdown the DN + * + * @throws IOException + */ + public static DataNode startDNWithMockNN(Configuration conf, + final InetSocketAddress nnSocketAddr, final String dnDataDir) + throws IOException { + + FileSystem.setDefaultUri(conf, "hdfs://" + nnSocketAddr.getHostName() + ":" + + nnSocketAddr.getPort()); + ArrayList locations = new ArrayList(); + File dataDir = new File(dnDataDir); + FileUtil.fullyDelete(dataDir); + dataDir.mkdirs(); + StorageLocation location = StorageLocation.parse(dataDir.getPath()); + locations.add(location); + + final DatanodeProtocolClientSideTranslatorPB namenode = + mock(DatanodeProtocolClientSideTranslatorPB.class); + + Mockito.doAnswer(new Answer() { + @Override + public DatanodeRegistration answer(InvocationOnMock invocation) + throws Throwable { + return (DatanodeRegistration) invocation.getArguments()[0]; + } + }).when(namenode).registerDatanode(Mockito.any(DatanodeRegistration.class)); + + when(namenode.versionRequest()).thenReturn( + new NamespaceInfo(1, TEST_CLUSTER_ID, TEST_POOL_ID, 1L)); + + when( + namenode.sendHeartbeat(Mockito.any(DatanodeRegistration.class), + Mockito.any(StorageReport[].class), Mockito.anyLong(), + Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(), + Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class), + Mockito.anyBoolean())).thenReturn( + new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat( + HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current() + .nextLong() | 1L)); + + DataNode dn = new DataNode(conf, locations, null) { + @Override + DatanodeProtocolClientSideTranslatorPB connectToNN( + InetSocketAddress nnAddr) throws IOException { + Assert.assertEquals(nnSocketAddr, nnAddr); + return namenode; + } + }; + // Trigger a heartbeat so that it acknowledges the NN as active. + dn.getAllBpOs().get(0).triggerHeartbeatForTests(); + + return dn; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetricsLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetricsLogger.java new file mode 100644 index 00000000000..1177a457fae --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetricsLogger.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeoutException; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Appender; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.AsyncAppender; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import com.google.common.base.Supplier; + +/** + * Test periodic logging of DataNode metrics. + */ +public class TestDataNodeMetricsLogger { + static final Log LOG = LogFactory.getLog(TestDataNodeMetricsLogger.class); + + private static final String DATA_DIR = MiniDFSCluster.getBaseDirectory() + + "data"; + + private final static InetSocketAddress NN_ADDR = new InetSocketAddress( + "localhost", 5020); + + private DataNode dn; + + static final Random random = new Random(System.currentTimeMillis()); + + @Rule + public Timeout timeout = new Timeout(300000); + + /** + * Starts an instance of DataNode + * + * @throws IOException + */ + public void startDNForTest(boolean enableMetricsLogging) throws IOException { + Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR); + conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0"); + conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); + conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0"); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + conf.setInt(DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, + enableMetricsLogging ? 1 : 0); // If enabled, log early and log often + + dn = DataNodeTestUtils.startDNWithMockNN(conf, NN_ADDR, DATA_DIR); + } + + /** + * Cleans the resources and closes the instance of datanode + * + * @throws IOException + * if an error occurred + */ + @After + public void tearDown() throws IOException { + if (dn != null) { + try { + dn.shutdown(); + } catch (Exception e) { + LOG.error("Cannot close: ", e); + } finally { + File dir = new File(DATA_DIR); + if (dir.exists()) + Assert.assertTrue("Cannot delete data-node dirs", + FileUtil.fullyDelete(dir)); + } + } + dn = null; + } + + @Test + public void testMetricsLoggerOnByDefault() throws IOException { + startDNForTest(true); + assertNotNull(dn); + assertNotNull(dn.getMetricsLoggerTimer()); + } + + @Test + public void testDisableMetricsLogger() throws IOException { + startDNForTest(false); + assertNotNull(dn); + assertNull(dn.getMetricsLoggerTimer()); + } + + @Test + public void testMetricsLoggerIsAsync() throws IOException { + startDNForTest(true); + assertNotNull(dn); + org.apache.log4j.Logger logger = ((Log4JLogger) DataNode.METRICS_LOG) + .getLogger(); + @SuppressWarnings("unchecked") + List appenders = Collections.list(logger.getAllAppenders()); + assertTrue(appenders.get(0) instanceof AsyncAppender); + } + + /** + * Publish a fake metric under the "Hadoop:" domain and ensure it is logged by + * the metrics logger. + */ + @Test + public void testMetricsLogOutput() throws IOException, InterruptedException, + TimeoutException { + TestFakeMetric metricsProvider = new TestFakeMetric(); + MBeans.register(this.getClass().getSimpleName(), "DummyMetrics", + metricsProvider); + startDNForTest(true); + assertNotNull(dn); + final PatternMatchingAppender appender = new PatternMatchingAppender( + "^.*FakeMetric.*$"); + addAppender(DataNode.METRICS_LOG, appender); + + // Ensure that the supplied pattern was matched. + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return appender.isMatched(); + } + }, 1000, 60000); + + dn.shutdown(); + } + + private void addAppender(Log log, Appender appender) { + org.apache.log4j.Logger logger = ((Log4JLogger) log).getLogger(); + @SuppressWarnings("unchecked") + List appenders = Collections.list(logger.getAllAppenders()); + ((AsyncAppender) appenders.get(0)).addAppender(appender); + } + + public interface TestFakeMetricMXBean { + int getFakeMetric(); + } + + /** + * MBean for testing + */ + public static class TestFakeMetric implements TestFakeMetricMXBean { + @Override + public int getFakeMetric() { + return 0; + } + } + + /** + * An appender that matches logged messages against the given regular + * expression. + */ + public static class PatternMatchingAppender extends AppenderSkeleton { + private final Pattern pattern; + private volatile boolean matched; + + public PatternMatchingAppender(String pattern) { + this.pattern = Pattern.compile(pattern); + this.matched = false; + } + + public boolean isMatched() { + return matched; + } + + @Override + protected void append(LoggingEvent event) { + if (pattern.matcher(event.getMessage().toString()).matches()) { + matched = true; + } + } + + @Override + public void close() { + } + + @Override + public boolean requiresLayout() { + return false; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/log4j.properties b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/log4j.properties index 51897ba5f13..45d91cd4835 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/log4j.properties +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/log4j.properties @@ -34,3 +34,16 @@ log4j.appender.NNMETRICSRFA.layout=org.apache.log4j.PatternLayout log4j.appender.NNMETRICSRFA.layout.ConversionPattern=%d{ISO8601} %m%n log4j.appender.NNMETRICSRFA.MaxBackupIndex=1 log4j.appender.NNMETRICSRFA.MaxFileSize=64MB + +# +# DataNode metrics logging. +# The default is to retain two datanode-metrics.log files up to 64MB each. +# +log4j.logger.DataNodeMetricsLog=INFO,DNMETRICSRFA +log4j.additivity.DataNodeMetricsLog=false +log4j.appender.DNMETRICSRFA=org.apache.log4j.RollingFileAppender +log4j.appender.DNMETRICSRFA.File=${hadoop.log.dir}/datanode-metrics.log +log4j.appender.DNMETRICSRFA.layout=org.apache.log4j.PatternLayout +log4j.appender.DNMETRICSRFA.layout.ConversionPattern=%d{ISO8601} %m%n +log4j.appender.DNMETRICSRFA.MaxBackupIndex=1 +log4j.appender.DNMETRICSRFA.MaxFileSize=64MB