From 42a14b6db92747ab3587da3dc47cafb6428d49be Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Tue, 17 Jan 2017 15:31:14 -0800 Subject: [PATCH] HDFS-10917. Collect peer performance statistics on DataNode. Contributed by Xiaobing Zhou. --- .../hadoop/metrics2/MetricsJsonBuilder.java | 125 +++++++++ .../lib/MutableRatesWithAggregation.java | 40 ++- .../hadoop/metrics2/lib/RollingAverages.java | 253 ++++++++++++++++++ .../metrics2/lib/TestRollingAverages.java | 123 +++++++++ .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 11 + .../hdfs/server/datanode/BlockReceiver.java | 48 +++- .../hadoop/hdfs/server/datanode/DataNode.java | 12 + .../hdfs/server/datanode/DataNodeMXBean.java | 12 + .../hdfs/server/datanode/DataXceiver.java | 9 + .../datanode/metrics/DataNodePeerMetrics.java | 117 ++++++++ .../src/main/resources/hdfs-default.xml | 25 ++ .../datanode/TestDataNodePeerMetrics.java | 92 +++++++ 12 files changed, 851 insertions(+), 16 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java new file mode 100644 index 00000000000..8e42909b07c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsJsonBuilder.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Build a JSON dump of the metrics. + * + * The {@link #toString()} operator dumps out all values collected. + * + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class MetricsJsonBuilder extends MetricsRecordBuilder { + + public static final Logger LOG = LoggerFactory + .getLogger(MetricsRecordBuilder.class); + private final MetricsCollector parent; + private Map innerMetrics = new LinkedHashMap<>(); + + /** + * Build an instance. + * @param parent parent collector. Unused in this instance; only used for + * the {@link #parent()} method + */ + public MetricsJsonBuilder(MetricsCollector parent) { + this.parent = parent; + } + + private MetricsRecordBuilder tuple(String key, Object value) { + innerMetrics.put(key, value); + return this; + } + + @Override + public MetricsRecordBuilder tag(MetricsInfo info, String value) { + return tuple(info.name(), value); + } + + @Override + public MetricsRecordBuilder add(MetricsTag tag) { + return tuple(tag.name(), tag.value()); + } + + @Override + public MetricsRecordBuilder add(AbstractMetric metric) { + return tuple(metric.info().name(), metric.toString()); + } + + @Override + public MetricsRecordBuilder setContext(String value) { + return tuple("context", value); + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, int value) { + return tuple(info.name(), value); + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, long value) { + return tuple(info.name(), value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, int value) { + return tuple(info.name(), value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, long value) { + return tuple(info.name(), value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, float value) { + return tuple(info.name(), value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, double value) { + return tuple(info.name(), value); + } + + @Override + public MetricsCollector parent() { + return parent; + } + + @Override + public String toString() { + try { + return new ObjectMapper().writeValueAsString(innerMetrics); + } catch (IOException e) { + LOG.warn("Failed to dump to Json.", e); + return ExceptionUtils.getStackTrace(e); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java index 64eae039f47..9827ca77e82 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java @@ -21,7 +21,6 @@ package org.apache.hadoop.metrics2.lib; import com.google.common.collect.Sets; import java.lang.ref.WeakReference; import java.lang.reflect.Method; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -50,7 +49,8 @@ import org.apache.hadoop.metrics2.util.SampleStat; @InterfaceStability.Evolving public class MutableRatesWithAggregation extends MutableMetric { static final Log LOG = LogFactory.getLog(MutableRatesWithAggregation.class); - private final Map globalMetrics = new HashMap<>(); + private final Map globalMetrics = + new ConcurrentHashMap<>(); private final Set> protocolCache = Sets.newHashSet(); private final ConcurrentLinkedDeque>> @@ -107,12 +107,7 @@ public class MutableRatesWithAggregation extends MutableMetric { // Thread has died; clean up its state iter.remove(); } else { - // Aggregate the thread's local samples into the global metrics - for (Map.Entry entry : map.entrySet()) { - String name = entry.getKey(); - MutableRate globalMetric = addMetricIfNotExists(name); - entry.getValue().snapshotInto(globalMetric); - } + aggregateLocalStatesToGlobalMetrics(map); } } for (MutableRate globalMetric : globalMetrics.values()) { @@ -120,6 +115,35 @@ public class MutableRatesWithAggregation extends MutableMetric { } } + /** + * Collects states maintained in {@link ThreadLocal}, if any. + */ + synchronized void collectThreadLocalStates() { + final ConcurrentMap localStats = + threadLocalMetricsMap.get(); + if (localStats != null) { + aggregateLocalStatesToGlobalMetrics(localStats); + } + } + + /** + * Aggregates the thread's local samples into the global metrics. The caller + * should ensure its thread safety. + */ + private void aggregateLocalStatesToGlobalMetrics( + final ConcurrentMap localStats) { + for (Map.Entry entry : localStats + .entrySet()) { + String name = entry.getKey(); + MutableRate globalMetric = addMetricIfNotExists(name); + entry.getValue().snapshotInto(globalMetric); + } + } + + Map getGlobalMetrics() { + return globalMetrics; + } + private synchronized MutableRate addMetricIfNotExists(String name) { MutableRate metric = globalMetrics.get(name); if (metric == null) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java new file mode 100644 index 00000000000..73567c05878 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.lib; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import static org.apache.hadoop.metrics2.lib.Interns.*; + +/** + *

+ * This class maintains a group of rolling average metrics. It implements the + * algorithm of rolling average, i.e. a number of sliding windows are kept to + * roll over and evict old subsets of samples. Each window has a subset of + * samples in a stream, where sub-sum and sub-total are collected. All sub-sums + * and sub-totals in all windows will be aggregated to final-sum and final-total + * used to compute final average, which is called rolling average. + *

+ */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class RollingAverages extends MutableMetric implements Closeable { + + private final MutableRatesWithAggregation innerMetrics = + new MutableRatesWithAggregation(); + + private static final ScheduledExecutorService SCHEDULER = Executors + .newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("RollingAverages-%d").build()); + + private ScheduledFuture scheduledTask = null; + private Map currentSnapshot; + private final int numWindows; + private final String avgInfoNameTemplate; + private final String avgInfoDescTemplate; + + private static class SumAndCount { + private final double sum; + private final long count; + + public SumAndCount(final double sum, final long count) { + this.sum = sum; + this.count = count; + } + + public double getSum() { + return sum; + } + + public long getCount() { + return count; + } + } + + /** + *

+ * key: metric name + *

+ *

+ * value: deque where sub-sums and sub-totals for sliding windows are + * maintained. + *

+ */ + private ConcurrentMap> averages = + new ConcurrentHashMap<>(); + + /** + * Constructor of {@link RollingAverages}. + * @param windowSize + * The number of seconds of each window for which sub set of samples + * are gathered to compute the rolling average, A.K.A. roll over + * interval. + * @param numWindows + * The number of windows maintained to compute the rolling average. + * @param valueName + * of the metric (e.g. "Time", "Latency") + */ + public RollingAverages( + final int windowSize, + final int numWindows, + final String valueName) { + String uvName = StringUtils.capitalize(valueName); + String lvName = StringUtils.uncapitalize(valueName); + avgInfoNameTemplate = "%s" + "RollingAvg"+ uvName; + avgInfoDescTemplate = "Rolling average "+ lvName +" for "+ "%s"; + this.numWindows = numWindows; + scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this), + windowSize, windowSize, TimeUnit.SECONDS); + } + + /** + * Constructor of {@link RollingAverages}. + * @param windowSize + * The number of seconds of each window for which sub set of samples + * are gathered to compute rolling average, also A.K.A roll over + * interval. + * @param numWindows + * The number of windows maintained in the same time to compute the + * average of the rolling averages. + */ + public RollingAverages( + final int windowSize, + final int numWindows) { + this(windowSize, numWindows, "Time"); + } + + @Override + public void snapshot(MetricsRecordBuilder builder, boolean all) { + if (all || changed()) { + for (final Entry> entry + : averages.entrySet()) { + final String name = entry.getKey(); + final MetricsInfo avgInfo = info( + String.format(avgInfoNameTemplate, StringUtils.capitalize(name)), + String.format(avgInfoDescTemplate, StringUtils.uncapitalize(name))); + double totalSum = 0; + long totalCount = 0; + + for (final SumAndCount sumAndCount : entry.getValue()) { + totalCount += sumAndCount.getCount(); + totalSum += sumAndCount.getSum(); + } + + if (totalCount != 0) { + builder.addGauge(avgInfo, totalSum / totalCount); + } + } + if (changed()) { + clearChanged(); + } + } + } + + /** + * Collects states maintained in {@link ThreadLocal}, if any. + */ + public void collectThreadLocalStates() { + innerMetrics.collectThreadLocalStates(); + } + + /** + * @param name + * name of metric + * @param value + * value of metric + */ + public void add(final String name, final long value) { + innerMetrics.add(name, value); + } + + private static class RatesRoller implements Runnable { + private final RollingAverages parent; + + public RatesRoller(final RollingAverages parent) { + this.parent = parent; + } + + @Override + public void run() { + synchronized (parent) { + final MetricsCollectorImpl mc = new MetricsCollectorImpl(); + final MetricsRecordBuilder rb = mc.addRecord("RatesRoller"); + /** + * snapshot all metrics regardless of being changed or not, in case no + * ops since last snapshot, we will get 0. + */ + parent.innerMetrics.snapshot(rb, true); + Preconditions.checkState(mc.getRecords().size() == 1, + "There must be only one record and it's named with 'RatesRoller'"); + + parent.currentSnapshot = parent.innerMetrics.getGlobalMetrics(); + parent.rollOverAvgs(); + } + parent.setChanged(); + } + } + + /** + * Iterates over snapshot to capture all Avg metrics into rolling structure + * {@link RollingAverages#averages}. + *

+ * This function is not thread safe, callers should ensure thread safety. + *

+ */ + private void rollOverAvgs() { + if (currentSnapshot == null) { + return; + } + + for (Map.Entry entry : currentSnapshot.entrySet()) { + final MutableRate rate = entry.getValue(); + + LinkedBlockingDeque deque = averages.get(entry.getKey()); + if (deque == null) { + deque = new LinkedBlockingDeque<>(numWindows); + averages.put(entry.getKey(), deque); + } + + final SumAndCount sumAndCount = new SumAndCount( + rate.lastStat().total(), + rate.lastStat().numSamples()); + /* put newest sum and count to the end */ + if (!deque.offerLast(sumAndCount)) { + deque.pollFirst(); + deque.offerLast(sumAndCount); + } + } + + setChanged(); + } + + @Override + public void close() throws IOException { + if (scheduledTask != null) { + scheduledTask.cancel(false); + } + scheduledTask = null; + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java new file mode 100644 index 00000000000..899d98c4b64 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.lib; + +import static org.apache.hadoop.metrics2.lib.Interns.info; +import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Matchers.anyDouble; +import static org.mockito.Matchers.eq; + +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.util.Time; +import org.junit.Test; + +/** + * This class tests various cases of the algorithms implemented in + * {@link RollingAverages}. + */ +public class TestRollingAverages { + /** + * Tests if the results are correct if no samples are inserted, dry run of + * empty roll over. + */ + @Test(timeout = 30000) + public void testRollingAveragesEmptyRollover() throws Exception { + final MetricsRecordBuilder rb = mockMetricsRecordBuilder(); + /* 5s interval and 2 windows */ + try (final RollingAverages rollingAverages = new RollingAverages(5, 2)) { + /* Check it initially */ + rollingAverages.snapshot(rb, true); + verify(rb, never()).addGauge( + info("FooRollingAvgTime", "Rolling average time for foo"), (long) 0); + verify(rb, never()).addGauge( + info("BarAvgTime", "Rolling average time for bar"), (long) 0); + + /* sleep 6s longer than 5s interval to wait for rollover done */ + Thread.sleep(6000); + rollingAverages.snapshot(rb, false); + verify(rb, never()).addGauge( + info("FooRollingAvgTime", "Rolling average time for foo"), (long) 0); + verify(rb, never()).addGauge( + info("BarAvgTime", "Rolling average time for bar"), (long) 0); + } + } + + /** + * Tests the case: + *

+ * 5s interval and 2 sliding windows + *

+ *

+ * sample stream: 1000 times 1, 2, and 3, respectively, e.g. [1, 1...1], [2, + * 2...2] and [3, 3...3] + *

+ */ + @Test(timeout = 30000) + public void testRollingAveragesRollover() throws Exception { + final MetricsRecordBuilder rb = mockMetricsRecordBuilder(); + final String name = "foo2"; + final int windowSize = 5; // 5s roll over interval + final int numWindows = 2; + final int numOpsPerIteration = 1000; + try (RollingAverages rollingAverages = new RollingAverages(windowSize, + numWindows)) { + + /* Push values for three intervals */ + final long start = Time.monotonicNow(); + for (int i = 1; i <= 3; i++) { + /* insert value */ + for (long j = 1; j <= numOpsPerIteration; j++) { + rollingAverages.add(name, i); + } + + /** + * Sleep until 1s after the next windowSize seconds interval, to let the + * metrics roll over + */ + final long sleep = (start + (windowSize * 1000 * i) + 1000) + - Time.monotonicNow(); + Thread.sleep(sleep); + + /* Verify that the window reset, check it has the values we pushed in */ + rollingAverages.snapshot(rb, false); + + /* + * #1 window with a series of 1 1000 + * times, e.g. [1, 1...1], similarly, #2 window, e.g. [2, 2...2], + * #3 window, e.g. [3, 3...3] + */ + final double rollingSum = numOpsPerIteration * (i > 1 ? (i - 1) : 0) + + numOpsPerIteration * i; + /* one empty window or all 2 windows full */ + final long rollingTotal = i > 1 ? 2 * numOpsPerIteration + : numOpsPerIteration; + verify(rb).addGauge( + info("Foo2RollingAvgTime", "Rolling average time for foo2"), + rollingSum / rollingTotal); + + /* Verify the metrics were added the right number of times */ + verify(rb, times(i)).addGauge( + eq(info("Foo2RollingAvgTime", "Rolling average time for foo2")), + anyDouble()); + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index d4ec567e732..86848ce0284 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -432,6 +432,17 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_METRICS_SESSION_ID_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_METRICS_SESSION_ID_KEY; public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals"; + public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY = + "dfs.metrics.rolling.average.window.size"; + public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT = + 3600; + public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY = + "dfs.metrics.rolling.average.window.numbers"; + public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT = + 48; + public static final String DFS_DATANODE_PEER_STATS_ENABLED_KEY = + "dfs.datanode.peer.stats.enabled"; + public static final boolean DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT = false; public static final String DFS_DATANODE_HOST_NAME_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_HOST_NAME_KEY; public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 96ffe8efb0e..1b4bc2e927e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -93,6 +93,7 @@ class BlockReceiver implements Closeable { protected final String inAddr; protected final String myAddr; private String mirrorAddr; + private String bracketedMirrorAddr; private DataOutputStream mirrorOut; private Daemon responder = null; private DataTransferThrottler throttler; @@ -119,6 +120,7 @@ class BlockReceiver implements Closeable { /** pipeline stage */ private final BlockConstructionStage stage; private final boolean isTransfer; + private boolean isPenultimateNode = false; private boolean syncOnClose; private long restartBudget; @@ -573,6 +575,7 @@ class BlockReceiver implements Closeable { DataNodeFaultInjector.get().logDelaySendingPacketDownstream( mirrorAddr, duration); + trackSendPacketToLastNodeInPipeline(duration); if (duration > datanodeSlowLogThresholdMs) { LOG.warn("Slow BlockReceiver write packet to mirror took " + duration + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)"); @@ -820,6 +823,33 @@ class BlockReceiver implements Closeable { return lastPacketInBlock?-1:len; } + /** + * Only tracks the latency of sending packet to the last node in pipeline. + * This is a conscious design choice. + *

+ * In the case of pipeline [dn0, dn1, dn2], 5ms latency from dn0 to dn1, 100ms + * from dn1 to dn2, NameNode claims dn2 is slow since it sees 100ms latency to + * dn2. Note that NameNode is not ware of pipeline structure in this context + * and only sees latency between two DataNodes. + *

+ *

+ * In another case of the same pipeline, 100ms latency from dn0 to dn1, 5ms + * from dn1 to dn2, NameNode will miss detecting dn1 being slow since it's not + * the last node. However the assumption is that in a busy enough cluster + * there are many other pipelines where dn1 is the last node, e.g. [dn3, dn4, + * dn1]. Also our tracking interval is relatively long enough (at least an + * hour) to improve the chances of the bad DataNodes being the last nodes in + * multiple pipelines. + *

+ */ + private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) { + if (isPenultimateNode && mirrorAddr != null) { + datanode.getPeerMetrics().addSendPacketDownstream( + bracketedMirrorAddr, + elapsedMs); + } + } + private static byte[] copyLastChunkChecksum(byte[] array, int size, int end) { return Arrays.copyOfRange(array, end - size, end); } @@ -884,7 +914,7 @@ class BlockReceiver implements Closeable { ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck .getRestartOOBStatus()); } - + void receiveBlock( DataOutputStream mirrOut, // output to next datanode DataInputStream mirrIn, // input from next datanode @@ -893,14 +923,16 @@ class BlockReceiver implements Closeable { DatanodeInfo[] downstreams, boolean isReplaceBlock) throws IOException { - syncOnClose = datanode.getDnConf().syncOnClose; - boolean responderClosed = false; - mirrorOut = mirrOut; - mirrorAddr = mirrAddr; - throttler = throttlerArg; + syncOnClose = datanode.getDnConf().syncOnClose; + boolean responderClosed = false; + mirrorOut = mirrOut; + mirrorAddr = mirrAddr; + bracketedMirrorAddr = "[" + mirrAddr + "]"; + isPenultimateNode = ((downstreams != null) && (downstreams.length == 1)); + throttler = throttlerArg; - this.replyOut = replyOut; - this.isReplaceBlock = isReplaceBlock; + this.replyOut = replyOut; + this.isReplaceBlock = isReplaceBlock; try { if (isClient && !isTransfer) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index aa12e43792d..fa3e0993187 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -166,6 +166,7 @@ import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResour import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; +import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics; import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -333,6 +334,7 @@ public class DataNode extends ReconfigurableBase private int infoSecurePort; DataNodeMetrics metrics; + private DataNodePeerMetrics peerMetrics; private InetSocketAddress streamingAddr; // See the note below in incrDatanodeNetworkErrors re: concurrency. @@ -1328,6 +1330,7 @@ public class DataNode extends ReconfigurableBase initIpcServer(); metrics = DataNodeMetrics.create(getConf(), getDisplayName()); + peerMetrics = DataNodePeerMetrics.create(getConf(), getDisplayName()); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); blockRecoveryWorker = new BlockRecoveryWorker(this); @@ -1723,6 +1726,10 @@ public class DataNode extends ReconfigurableBase return metrics; } + public DataNodePeerMetrics getPeerMetrics() { + return peerMetrics; + } + /** Ensure the authentication method is kerberos */ private void checkKerberosAuthMethod(String msg) throws IOException { // User invoking the call must be same as the datanode user @@ -3295,4 +3302,9 @@ public class DataNode extends ReconfigurableBase void setBlockScanner(BlockScanner blockScanner) { this.blockScanner = blockScanner; } + + @Override // DataNodeMXBean + public String getSendPacketDownstreamAvgInfo() { + return peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java index 66d1c6d0f12..5249ff50298 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java @@ -105,4 +105,16 @@ public interface DataNodeMXBean { * Gets the network error counts on a per-Datanode basis. */ public Map> getDatanodeNetworkCounts(); + + /** + * Gets the average info (e.g. time) of SendPacketDownstream when the + * DataNode acts as the penultimate (2nd to the last) node in pipeline. + *

+ * Example Json: { + * "[185.164.159.81:9801]RollingAvgTime":504.867, + * "[49.236.149.246:9801]RollingAvgTime":504.463, + * "[84.125.113.65:9801]RollingAvgTime":497.954} + *

+ */ + String getSendPacketDownstreamAvgInfo(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 7c36ab19c88..345ff6135b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -321,6 +321,7 @@ class DataXceiver extends Receiver implements Runnable { LOG.error(s, t); } } finally { + collectThreadLocalStates(); if (LOG.isDebugEnabled()) { LOG.debug(datanode.getDisplayName() + ":Number of active connections is: " + datanode.getXceiverCount()); @@ -333,6 +334,14 @@ class DataXceiver extends Receiver implements Runnable { } } + /** + * In this short living thread, any local states should be collected before + * the thread dies away. + */ + private void collectThreadLocalStates() { + datanode.getPeerMetrics().collectThreadLocalStates(); + } + @Override public void requestShortCircuitFds(final ExtendedBlock blk, final Token token, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java new file mode 100644 index 00000000000..9344d1b5a5a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.metrics; + +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.metrics2.MetricsJsonBuilder; +import org.apache.hadoop.metrics2.lib.RollingAverages; + +/** + * This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for + * various peer operations. + */ +@InterfaceAudience.Private +public class DataNodePeerMetrics { + + static final Log LOG = LogFactory.getLog(DataNodePeerMetrics.class); + + private final RollingAverages sendPacketDownstreamRollingAvgerages; + + private final String name; + private final boolean peerStatsEnabled; + + public DataNodePeerMetrics( + final String name, + final int windowSize, + final int numWindows, + final boolean peerStatsEnabled) { + this.name = name; + this.peerStatsEnabled = peerStatsEnabled; + sendPacketDownstreamRollingAvgerages = new RollingAverages( + windowSize, + numWindows); + } + + public String name() { + return name; + } + + /** + * Creates an instance of DataNodePeerMetrics, used for registration. + */ + public static DataNodePeerMetrics create(Configuration conf, String dnName) { + final String name = "DataNodePeerActivity-" + (dnName.isEmpty() + ? "UndefinedDataNodeName" + ThreadLocalRandom.current().nextInt() + : dnName.replace(':', '-')); + + final int windowSize = conf.getInt( + DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY, + DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT); + final int numWindows = conf.getInt( + DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY, + DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT); + final boolean peerStatsEnabled = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, + DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT); + + return new DataNodePeerMetrics( + name, + windowSize, + numWindows, + peerStatsEnabled); + } + + /** + * Adds invocation and elapsed time of SendPacketDownstream for peer. + *

+ * The caller should pass in a well-formatted peerAddr. e.g. + * "[192.168.1.110:1010]" is good. This will be translated into a full + * qualified metric name, e.g. "[192.168.1.110:1010]AvgTime". + *

+ */ + public void addSendPacketDownstream( + final String peerAddr, + final long elapsedMs) { + if (peerStatsEnabled) { + sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs); + } + } + + /** + * Dump SendPacketDownstreamRollingAvgTime metrics as JSON. + */ + public String dumpSendPacketDownstreamAvgInfoAsJson() { + final MetricsJsonBuilder builder = new MetricsJsonBuilder(null); + sendPacketDownstreamRollingAvgerages.snapshot(builder, true); + return builder.toString(); + } + + /** + * Collects states maintained in {@link ThreadLocal}, if any. + */ + public void collectThreadLocalStates() { + sendPacketDownstreamRollingAvgerages.collectThreadLocalStates(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index baa1f2d1366..73e6f5ca46b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1921,6 +1921,31 @@ + + dfs.datanode.peer.stats.enabled + false + + A switch to turn on/off tracking DataNode peer statistics. + + + + + dfs.metrics.rolling.average.window.size + 3600 + + The number of seconds of each window for which sub set of samples are gathered + to compute the rolling average, A.K.A. roll over interval. + + + + + dfs.metrics.rolling.average.window.numbers + 48 + + The number of windows maintained to compute the rolling average. + + + hadoop.user.group.metrics.percentiles.intervals diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java new file mode 100644 index 00000000000..5af54a4cbbb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertThat; + +/** + * This class tests various cases of DataNode peer metrics. + */ +public class TestDataNodePeerMetrics { + + @Test(timeout = 30000) + public void testGetSendPacketDownstreamAvgInfo() throws Exception { + final int windowSize = 5; // 5s roll over interval + final int numWindows = 2; // 2 rolling windows + final int iterations = 3; + final int numOpsPerIteration = 1000; + + final Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY, + windowSize); + conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY, + numWindows); + conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true); + + final DataNodePeerMetrics peerMetrics = DataNodePeerMetrics.create( + conf, + "Sample-DataNode"); + final long start = Time.monotonicNow(); + for (int i = 1; i <= iterations; i++) { + final String peerAddr = genPeerAddress(); + for (int j = 1; j <= numOpsPerIteration; j++) { + /* simulate to get latency of 1 to 1000 ms */ + final long latency = ThreadLocalRandom.current().nextLong(1, 1000); + peerMetrics.addSendPacketDownstream(peerAddr, latency); + } + + /** + * Sleep until 1s after the next windowSize seconds interval, to let the + * metrics roll over + */ + final long sleep = (start + (windowSize * 1000 * i) + 1000) + - Time.monotonicNow(); + Thread.sleep(sleep); + + /* dump avg info */ + final String json = peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson(); + + /* + * example json: + * {"[185.164.159.81:9801]RollingAvgTime":504.867, + * "[49.236.149.246:9801]RollingAvgTime":504.463, + * "[84.125.113.65:9801]RollingAvgTime":497.954} + */ + assertThat(json, containsString(peerAddr)); + } + } + + /** + * Simulates to generate different peer addresses, e.g. [84.125.113.65:9801]. + */ + private String genPeerAddress() { + final ThreadLocalRandom r = ThreadLocalRandom.current(); + return String.format("[%d.%d.%d.%d:9801]", + r.nextInt(1, 256), r.nextInt(1, 256), + r.nextInt(1, 256), r.nextInt(1, 256)); + } +}