HDFS-10917. Collect peer performance statistics on DataNode. Contributed by Xiaobing Zhou.

This commit is contained in:
Xiaoyu Yao 2016-12-22 23:46:58 -08:00
parent e92a77099b
commit 4e9029653d
12 changed files with 850 additions and 17 deletions

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Build a JSON dump of the metrics.
*
* The {@link #toString()} operator dumps out all values collected.
*
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class MetricsJsonBuilder extends MetricsRecordBuilder {
public static final Logger LOG = LoggerFactory
.getLogger(MetricsRecordBuilder.class);
private final MetricsCollector parent;
private Map<String, Object> innerMetrics = new LinkedHashMap<>();
/**
* Build an instance.
* @param parent parent collector. Unused in this instance; only used for
* the {@link #parent()} method
*/
public MetricsJsonBuilder(MetricsCollector parent) {
this.parent = parent;
}
private MetricsRecordBuilder tuple(String key, Object value) {
innerMetrics.put(key, value);
return this;
}
@Override
public MetricsRecordBuilder tag(MetricsInfo info, String value) {
return tuple(info.name(), value);
}
@Override
public MetricsRecordBuilder add(MetricsTag tag) {
return tuple(tag.name(), tag.value());
}
@Override
public MetricsRecordBuilder add(AbstractMetric metric) {
return tuple(metric.info().name(), metric.toString());
}
@Override
public MetricsRecordBuilder setContext(String value) {
return tuple("context", value);
}
@Override
public MetricsRecordBuilder addCounter(MetricsInfo info, int value) {
return tuple(info.name(), value);
}
@Override
public MetricsRecordBuilder addCounter(MetricsInfo info, long value) {
return tuple(info.name(), value);
}
@Override
public MetricsRecordBuilder addGauge(MetricsInfo info, int value) {
return tuple(info.name(), value);
}
@Override
public MetricsRecordBuilder addGauge(MetricsInfo info, long value) {
return tuple(info.name(), value);
}
@Override
public MetricsRecordBuilder addGauge(MetricsInfo info, float value) {
return tuple(info.name(), value);
}
@Override
public MetricsRecordBuilder addGauge(MetricsInfo info, double value) {
return tuple(info.name(), value);
}
@Override
public MetricsCollector parent() {
return parent;
}
@Override
public String toString() {
try {
return new ObjectMapper().writeValueAsString(innerMetrics);
} catch (IOException e) {
LOG.warn("Failed to dump to Json.", e);
return ExceptionUtils.getStackTrace(e);
}
}
}

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.metrics2.lib;
import com.google.common.collect.Sets;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@ -50,7 +49,8 @@ import org.apache.hadoop.metrics2.util.SampleStat;
@InterfaceStability.Evolving
public class MutableRatesWithAggregation extends MutableMetric {
static final Log LOG = LogFactory.getLog(MutableRatesWithAggregation.class);
private final Map<String, MutableRate> globalMetrics = new HashMap<>();
private final Map<String, MutableRate> globalMetrics =
new ConcurrentHashMap<>();
private final Set<Class<?>> protocolCache = Sets.newHashSet();
private final ConcurrentLinkedDeque<WeakReference<ConcurrentMap<String, ThreadSafeSampleStat>>>
@ -107,12 +107,7 @@ public class MutableRatesWithAggregation extends MutableMetric {
// Thread has died; clean up its state
iter.remove();
} else {
// Aggregate the thread's local samples into the global metrics
for (Map.Entry<String, ThreadSafeSampleStat> entry : map.entrySet()) {
String name = entry.getKey();
MutableRate globalMetric = addMetricIfNotExists(name);
entry.getValue().snapshotInto(globalMetric);
}
aggregateLocalStatesToGlobalMetrics(map);
}
}
for (MutableRate globalMetric : globalMetrics.values()) {
@ -120,6 +115,35 @@ public class MutableRatesWithAggregation extends MutableMetric {
}
}
/**
* Collects states maintained in {@link ThreadLocal}, if any.
*/
synchronized void collectThreadLocalStates() {
final ConcurrentMap<String, ThreadSafeSampleStat> localStats =
threadLocalMetricsMap.get();
if (localStats != null) {
aggregateLocalStatesToGlobalMetrics(localStats);
}
}
/**
* Aggregates the thread's local samples into the global metrics. The caller
* should ensure its thread safety.
*/
private void aggregateLocalStatesToGlobalMetrics(
final ConcurrentMap<String, ThreadSafeSampleStat> localStats) {
for (Map.Entry<String, ThreadSafeSampleStat> entry : localStats
.entrySet()) {
String name = entry.getKey();
MutableRate globalMetric = addMetricIfNotExists(name);
entry.getValue().snapshotInto(globalMetric);
}
}
Map<String, MutableRate> getGlobalMetrics() {
return globalMetrics;
}
private synchronized MutableRate addMetricIfNotExists(String name) {
MutableRate metric = globalMetrics.get(name);
if (metric == null) {

View File

@ -0,0 +1,251 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.lib;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import static org.apache.hadoop.metrics2.lib.Interns.*;
/**
* <p>
* This class maintains a group of rolling average metrics. It implements the
* algorithm of rolling average, i.e. a number of sliding windows are kept to
* roll over and evict old subsets of samples. Each window has a subset of
* samples in a stream, where sub-sum and sub-total are collected. All sub-sums
* and sub-totals in all windows will be aggregated to final-sum and final-total
* used to compute final average, which is called rolling average.
* </p>
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class RollingAverages extends MutableMetric implements Closeable {
private final MutableRatesWithAggregation innerMetrics =
new MutableRatesWithAggregation();
private static final ScheduledExecutorService SCHEDULER = Executors
.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("RollingAverages-%d").build());
private ScheduledFuture<?> scheduledTask = null;
private Map<String, MutableRate> currentSnapshot;
private final int numWindows;
private final String avgInfoNameTemplate;
private final String avgInfoDescTemplate;
private static class SumAndCount {
private final double sum;
private final long count;
public SumAndCount(final double sum, final long count) {
this.sum = sum;
this.count = count;
}
public double getSum() {
return sum;
}
public long getCount() {
return count;
}
}
/**
* <p>
* key: metric name
* </p>
* <p>
* value: deque where sub-sums and sub-totals for sliding windows are
* maintained.
* </p>
*/
private Map<String, LinkedBlockingDeque<SumAndCount>> averages =
new ConcurrentHashMap<>();
/**
* Constructor of {@link RollingAverages}.
* @param windowSize
* The number of seconds of each window for which sub set of samples
* are gathered to compute the rolling average, A.K.A. roll over
* interval.
* @param numWindows
* The number of windows maintained to compute the rolling average.
* @param valueName
* of the metric (e.g. "Time", "Latency")
*/
public RollingAverages(
final int windowSize,
final int numWindows,
final String valueName) {
String uvName = StringUtils.capitalize(valueName);
String lvName = StringUtils.uncapitalize(valueName);
avgInfoNameTemplate = "%s" + "RollingAvg"+ uvName;
avgInfoDescTemplate = "Rolling average "+ lvName +" for "+ "%s";
this.numWindows = numWindows;
scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this),
windowSize, windowSize, TimeUnit.SECONDS);
}
/**
* Constructor of {@link RollingAverages}.
* @param windowSize
* The number of seconds of each window for which sub set of samples
* are gathered to compute rolling average, also A.K.A roll over
* interval.
* @param numWindows
* The number of windows maintained in the same time to compute the
* average of the rolling averages.
*/
public RollingAverages(
final int windowSize,
final int numWindows) {
this(windowSize, numWindows, "Time");
}
@Override
public void snapshot(MetricsRecordBuilder builder, boolean all) {
if (all || changed()) {
for (final Entry<String, LinkedBlockingDeque<SumAndCount>> entry
: averages.entrySet()) {
final String name = entry.getKey();
final MetricsInfo avgInfo = info(
String.format(avgInfoNameTemplate, StringUtils.capitalize(name)),
String.format(avgInfoDescTemplate, StringUtils.uncapitalize(name)));
double totalSum = 0;
long totalCount = 0;
for (final SumAndCount sumAndCount : entry.getValue()) {
totalCount += sumAndCount.getCount();
totalSum += sumAndCount.getSum();
}
if (totalCount != 0) {
builder.addGauge(avgInfo, totalSum / totalCount);
}
}
if (changed()) {
clearChanged();
}
}
}
/**
* Collects states maintained in {@link ThreadLocal}, if any.
*/
public void collectThreadLocalStates() {
innerMetrics.collectThreadLocalStates();
}
/**
* @param name
* name of metric
* @param value
* value of metric
*/
public void add(final String name, final long value) {
innerMetrics.add(name, value);
}
private static class RatesRoller implements Runnable {
private final RollingAverages parent;
public RatesRoller(final RollingAverages parent) {
this.parent = parent;
}
@Override
public void run() {
synchronized (parent) {
final MetricsCollectorImpl mc = new MetricsCollectorImpl();
final MetricsRecordBuilder rb = mc.addRecord("RatesRoller");
/**
* snapshot all metrics regardless of being changed or not, in case no
* ops since last snapshot, we will get 0.
*/
parent.innerMetrics.snapshot(rb, true);
Preconditions.checkState(mc.getRecords().size() == 1,
"There must be only one record and it's named with 'RatesRoller'");
parent.currentSnapshot = parent.innerMetrics.getGlobalMetrics();
parent.rollOverAvgs();
}
parent.setChanged();
}
}
/**
* Iterates over snapshot to capture all Avg metrics into rolling structure
* {@link RollingAverages#averages}.
*/
private void rollOverAvgs() {
if (currentSnapshot == null) {
return;
}
for (Map.Entry<String, MutableRate> entry : currentSnapshot.entrySet()) {
final MutableRate rate = entry.getValue();
final LinkedBlockingDeque<SumAndCount> deque = averages.computeIfAbsent(
entry.getKey(),
new Function<String, LinkedBlockingDeque<SumAndCount>>() {
@Override
public LinkedBlockingDeque<SumAndCount> apply(String k) {
return new LinkedBlockingDeque<SumAndCount>(numWindows);
}
});
final SumAndCount sumAndCount = new SumAndCount(
rate.lastStat().total(),
rate.lastStat().numSamples());
/* put newest sum and count to the end */
if (!deque.offerLast(sumAndCount)) {
deque.pollFirst();
deque.offerLast(sumAndCount);
}
}
setChanged();
}
@Override
public void close() throws IOException {
if (scheduledTask != null) {
scheduledTask.cancel(false);
}
scheduledTask = null;
}
}

View File

@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.lib;
import static org.apache.hadoop.metrics2.lib.Interns.info;
import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Matchers.anyDouble;
import static org.mockito.Matchers.eq;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.util.Time;
import org.junit.Test;
/**
* This class tests various cases of the algorithms implemented in
* {@link RollingAverages}.
*/
public class TestRollingAverages {
/**
* Tests if the results are correct if no samples are inserted, dry run of
* empty roll over.
*/
@Test(timeout = 30000)
public void testRollingAveragesEmptyRollover() throws Exception {
final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
/* 5s interval and 2 windows */
try (final RollingAverages rollingAverages = new RollingAverages(5, 2)) {
/* Check it initially */
rollingAverages.snapshot(rb, true);
verify(rb, never()).addGauge(
info("FooRollingAvgTime", "Rolling average time for foo"), (long) 0);
verify(rb, never()).addGauge(
info("BarAvgTime", "Rolling average time for bar"), (long) 0);
/* sleep 6s longer than 5s interval to wait for rollover done */
Thread.sleep(6000);
rollingAverages.snapshot(rb, false);
verify(rb, never()).addGauge(
info("FooRollingAvgTime", "Rolling average time for foo"), (long) 0);
verify(rb, never()).addGauge(
info("BarAvgTime", "Rolling average time for bar"), (long) 0);
}
}
/**
* Tests the case:
* <p>
* 5s interval and 2 sliding windows
* </p>
* <p>
* sample stream: 1000 times 1, 2, and 3, respectively, e.g. [1, 1...1], [2,
* 2...2] and [3, 3...3]
* </p>
*/
@Test(timeout = 30000)
public void testRollingAveragesRollover() throws Exception {
final MetricsRecordBuilder rb = mockMetricsRecordBuilder();
final String name = "foo2";
final int windowSize = 5; // 5s roll over interval
final int numWindows = 2;
final int numOpsPerIteration = 1000;
try (RollingAverages rollingAverages = new RollingAverages(windowSize,
numWindows)) {
/* Push values for three intervals */
final long start = Time.monotonicNow();
for (int i = 1; i <= 3; i++) {
/* insert value */
for (long j = 1; j <= numOpsPerIteration; j++) {
rollingAverages.add(name, i);
}
/**
* Sleep until 1s after the next windowSize seconds interval, to let the
* metrics roll over
*/
final long sleep = (start + (windowSize * 1000 * i) + 1000)
- Time.monotonicNow();
Thread.sleep(sleep);
/* Verify that the window reset, check it has the values we pushed in */
rollingAverages.snapshot(rb, false);
/*
* #1 window with a series of 1 1000
* times, e.g. [1, 1...1], similarly, #2 window, e.g. [2, 2...2],
* #3 window, e.g. [3, 3...3]
*/
final double rollingSum = numOpsPerIteration * (i > 1 ? (i - 1) : 0)
+ numOpsPerIteration * i;
/* one empty window or all 2 windows full */
final long rollingTotal = i > 1 ? 2 * numOpsPerIteration
: numOpsPerIteration;
verify(rb).addGauge(
info("Foo2RollingAvgTime", "Rolling average time for foo2"),
rollingSum / rollingTotal);
/* Verify the metrics were added the right number of times */
verify(rb, times(i)).addGauge(
eq(info("Foo2RollingAvgTime", "Rolling average time for foo2")),
anyDouble());
}
}
}
}

View File

@ -457,6 +457,17 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_METRICS_SESSION_ID_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_METRICS_SESSION_ID_KEY;
public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY =
"dfs.metrics.rolling.average.window.size";
public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT =
3600;
public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY =
"dfs.metrics.rolling.average.window.numbers";
public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT =
48;
public static final String DFS_DATANODE_PEER_STATS_ENABLED_KEY =
"dfs.datanode.peer.stats.enabled";
public static final boolean DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT = false;
public static final String DFS_DATANODE_HOST_NAME_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_HOST_NAME_KEY;
public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY =

View File

@ -93,6 +93,7 @@ class BlockReceiver implements Closeable {
protected final String inAddr;
protected final String myAddr;
private String mirrorAddr;
private String bracketedMirrorAddr;
private DataOutputStream mirrorOut;
private Daemon responder = null;
private DataTransferThrottler throttler;
@ -119,6 +120,7 @@ class BlockReceiver implements Closeable {
/** pipeline stage */
private final BlockConstructionStage stage;
private final boolean isTransfer;
private boolean isPenultimateNode = false;
private boolean syncOnClose;
private long restartBudget;
@ -575,6 +577,7 @@ class BlockReceiver implements Closeable {
DataNodeFaultInjector.get().logDelaySendingPacketDownstream(
mirrorAddr,
duration);
trackSendPacketToLastNodeInPipeline(duration);
if (duration > datanodeSlowLogThresholdMs) {
LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
@ -822,6 +825,33 @@ class BlockReceiver implements Closeable {
return lastPacketInBlock?-1:len;
}
/**
* Only tracks the latency of sending packet to the last node in pipeline.
* This is a conscious design choice.
* <p>
* In the case of pipeline [dn0, dn1, dn2], 5ms latency from dn0 to dn1, 100ms
* from dn1 to dn2, NameNode claims dn2 is slow since it sees 100ms latency to
* dn2. Note that NameNode is not ware of pipeline structure in this context
* and only sees latency between two DataNodes.
* </p>
* <p>
* In another case of the same pipeline, 100ms latency from dn0 to dn1, 5ms
* from dn1 to dn2, NameNode will miss detecting dn1 being slow since it's not
* the last node. However the assumption is that in a busy enough cluster
* there are many other pipelines where dn1 is the last node, e.g. [dn3, dn4,
* dn1]. Also our tracking interval is relatively long enough (at least an
* hour) to improve the chances of the bad DataNodes being the last nodes in
* multiple pipelines.
* </p>
*/
private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) {
if (isPenultimateNode && mirrorAddr != null) {
datanode.getPeerMetrics().addSendPacketDownstream(
bracketedMirrorAddr,
elapsedMs);
}
}
private static byte[] copyLastChunkChecksum(byte[] array, int size, int end) {
return Arrays.copyOfRange(array, end - size, end);
}
@ -886,7 +916,7 @@ class BlockReceiver implements Closeable {
((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
.getRestartOOBStatus());
}
void receiveBlock(
DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode
@ -895,14 +925,16 @@ class BlockReceiver implements Closeable {
DatanodeInfo[] downstreams,
boolean isReplaceBlock) throws IOException {
syncOnClose = datanode.getDnConf().syncOnClose;
boolean responderClosed = false;
mirrorOut = mirrOut;
mirrorAddr = mirrAddr;
throttler = throttlerArg;
syncOnClose = datanode.getDnConf().syncOnClose;
boolean responderClosed = false;
mirrorOut = mirrOut;
mirrorAddr = mirrAddr;
bracketedMirrorAddr = "[" + mirrAddr + "]";
isPenultimateNode = ((downstreams != null) && (downstreams.length == 1));
throttler = throttlerArg;
this.replyOut = replyOut;
this.isReplaceBlock = isReplaceBlock;
this.replyOut = replyOut;
this.isReplaceBlock = isReplaceBlock;
try {
if (isClient && !isTransfer) {

View File

@ -165,6 +165,7 @@ import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
@ -333,6 +334,7 @@ public class DataNode extends ReconfigurableBase
private int infoSecurePort;
DataNodeMetrics metrics;
private DataNodePeerMetrics peerMetrics;
private InetSocketAddress streamingAddr;
// See the note below in incrDatanodeNetworkErrors re: concurrency.
@ -1360,6 +1362,7 @@ public class DataNode extends ReconfigurableBase
initIpcServer();
metrics = DataNodeMetrics.create(getConf(), getDisplayName());
peerMetrics = DataNodePeerMetrics.create(getConf(), getDisplayName());
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
ecWorker = new ErasureCodingWorker(getConf(), this);
@ -1755,11 +1758,15 @@ public class DataNode extends ReconfigurableBase
throw new IOException(ie.getMessage());
}
}
public DataNodeMetrics getMetrics() {
return metrics;
}
public DataNodePeerMetrics getPeerMetrics() {
return peerMetrics;
}
/** Ensure the authentication method is kerberos */
private void checkKerberosAuthMethod(String msg) throws IOException {
// User invoking the call must be same as the datanode user
@ -3437,4 +3444,9 @@ public class DataNode extends ReconfigurableBase
void setBlockScanner(BlockScanner blockScanner) {
this.blockScanner = blockScanner;
}
@Override // DataNodeMXBean
public String getSendPacketDownstreamAvgInfo() {
return peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
}
}

View File

@ -125,4 +125,16 @@ public interface DataNodeMXBean {
* Gets the {@link FileIoProvider} statistics.
*/
String getFileIoProviderStatistics();
/**
* Gets the average info (e.g. time) of SendPacketDownstream when the DataNode
* acts as the penultimate (2nd to the last) node in pipeline.
* <p>
* Example Json:
* {"[185.164.159.81:9801]RollingAvgTime":504.867,
* "[49.236.149.246:9801]RollingAvgTime":504.463,
* "[84.125.113.65:9801]RollingAvgTime":497.954}
* </p>
*/
String getSendPacketDownstreamAvgInfo();
}

View File

@ -323,6 +323,7 @@ class DataXceiver extends Receiver implements Runnable {
LOG.error(s, t);
}
} finally {
collectThreadLocalStates();
if (LOG.isDebugEnabled()) {
LOG.debug(datanode.getDisplayName() + ":Number of active connections is: "
+ datanode.getXceiverCount());
@ -335,6 +336,14 @@ class DataXceiver extends Receiver implements Runnable {
}
}
/**
* In this short living thread, any local states should be collected before
* the thread dies away.
*/
private void collectThreadLocalStates() {
datanode.getPeerMetrics().collectThreadLocalStates();
}
@Override
public void requestShortCircuitFds(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> token,

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.metrics;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.metrics2.MetricsJsonBuilder;
import org.apache.hadoop.metrics2.lib.RollingAverages;
/**
* This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for
* various peer operations.
*/
@InterfaceAudience.Private
public class DataNodePeerMetrics {
static final Log LOG = LogFactory.getLog(DataNodePeerMetrics.class);
private final RollingAverages sendPacketDownstreamRollingAvgerages;
private final String name;
private final boolean peerStatsEnabled;
public DataNodePeerMetrics(
final String name,
final int windowSize,
final int numWindows,
final boolean peerStatsEnabled) {
this.name = name;
this.peerStatsEnabled = peerStatsEnabled;
sendPacketDownstreamRollingAvgerages = new RollingAverages(
windowSize,
numWindows);
}
public String name() {
return name;
}
/**
* Creates an instance of DataNodePeerMetrics, used for registration.
*/
public static DataNodePeerMetrics create(Configuration conf, String dnName) {
final String name = "DataNodePeerActivity-" + (dnName.isEmpty()
? "UndefinedDataNodeName" + ThreadLocalRandom.current().nextInt()
: dnName.replace(':', '-'));
final int windowSize = conf.getInt(
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT);
final int numWindows = conf.getInt(
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT);
final boolean peerStatsEnabled = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
return new DataNodePeerMetrics(
name,
windowSize,
numWindows,
peerStatsEnabled);
}
/**
* Adds invocation and elapsed time of SendPacketDownstream for peer.
* <p>
* The caller should pass in a well-formatted peerAddr. e.g.
* "[192.168.1.110:1010]" is good. This will be translated into a full
* qualified metric name, e.g. "[192.168.1.110:1010]AvgTime".
* </p>
*/
public void addSendPacketDownstream(
final String peerAddr,
final long elapsedMs) {
if (peerStatsEnabled) {
sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs);
}
}
/**
* Dump SendPacketDownstreamRollingAvgTime metrics as JSON.
*/
public String dumpSendPacketDownstreamAvgInfoAsJson() {
final MetricsJsonBuilder builder = new MetricsJsonBuilder(null);
sendPacketDownstreamRollingAvgerages.snapshot(builder, true);
return builder.toString();
}
/**
* Collects states maintained in {@link ThreadLocal}, if any.
*/
public void collectThreadLocalStates() {
sendPacketDownstreamRollingAvgerages.collectThreadLocalStates();
}
}

View File

@ -1971,6 +1971,31 @@
</description>
</property>
<property>
<name>dfs.datanode.peer.stats.enabled</name>
<value>false</value>
<description>
A switch to turn on/off tracking DataNode peer statistics.
</description>
</property>
<property>
<name>dfs.metrics.rolling.average.window.size</name>
<value>3600</value>
<description>
The number of seconds of each window for which sub set of samples are gathered
to compute the rolling average, A.K.A. roll over interval.
</description>
</property>
<property>
<name>dfs.metrics.rolling.average.window.numbers</name>
<value>48</value>
<description>
The number of windows maintained to compute the rolling average.
</description>
</property>
<property>
<name>hadoop.user.group.metrics.percentiles.intervals</name>
<value></value>

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertThat;
/**
* This class tests various cases of DataNode peer metrics.
*/
public class TestDataNodePeerMetrics {
@Test(timeout = 30000)
public void testGetSendPacketDownstreamAvgInfo() throws Exception {
final int windowSize = 5; // 5s roll over interval
final int numWindows = 2; // 2 rolling windows
final int iterations = 3;
final int numOpsPerIteration = 1000;
final Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY,
windowSize);
conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY,
numWindows);
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
final DataNodePeerMetrics peerMetrics = DataNodePeerMetrics.create(
conf,
"Sample-DataNode");
final long start = Time.monotonicNow();
for (int i = 1; i <= iterations; i++) {
final String peerAddr = genPeerAddress();
for (int j = 1; j <= numOpsPerIteration; j++) {
/* simulate to get latency of 1 to 1000 ms */
final long latency = ThreadLocalRandom.current().nextLong(1, 1000);
peerMetrics.addSendPacketDownstream(peerAddr, latency);
}
/**
* Sleep until 1s after the next windowSize seconds interval, to let the
* metrics roll over
*/
final long sleep = (start + (windowSize * 1000 * i) + 1000)
- Time.monotonicNow();
Thread.sleep(sleep);
/* dump avg info */
final String json = peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
/*
* example json:
* {"[185.164.159.81:9801]RollingAvgTime":504.867,
* "[49.236.149.246:9801]RollingAvgTime":504.463,
* "[84.125.113.65:9801]RollingAvgTime":497.954}
*/
assertThat(json, containsString(peerAddr));
}
}
/**
* Simulates to generate different peer addresses, e.g. [84.125.113.65:9801].
*/
private String genPeerAddress() {
final ThreadLocalRandom r = ThreadLocalRandom.current();
return String.format("[%d.%d.%d.%d:9801]",
r.nextInt(1, 256), r.nextInt(1, 256),
r.nextInt(1, 256), r.nextInt(1, 256));
}
}