HDFS-11603. Improve slow mirror/disk warnings in BlockReceiver.

This commit is contained in:
Arpit Agarwal 2017-04-18 16:20:43 -07:00
parent 6534bc0a01
commit 5c092097b6
3 changed files with 53 additions and 15 deletions

View File

@ -55,6 +55,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
private String softwareVersion;
private List<String> dependentHostNames = new LinkedList<>();
private String upgradeDomain;
public static final DatanodeInfo[] EMPTY_ARRAY = {};
// Datanode administrative states
public enum AdminStates {

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
@ -100,6 +101,7 @@ class BlockReceiver implements Closeable {
private DataTransferThrottler throttler;
private ReplicaOutputStreams streams;
private DatanodeInfo srcDataNode = null;
private DatanodeInfo[] downstreamDNs = DatanodeInfo.EMPTY_ARRAY;
private final DataNode datanode;
volatile private boolean mirrorError;
@ -424,10 +426,10 @@ class BlockReceiver implements Closeable {
}
}
long duration = Time.monotonicNow() - begin;
if (duration > datanodeSlowLogThresholdMs) {
if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
LOG.warn("Slow flushOrSync took " + duration + "ms (threshold="
+ datanodeSlowLogThresholdMs + "ms), isSync:" + isSync + ", flushTotalNanos="
+ flushTotalNanos + "ns");
+ flushTotalNanos + "ns, volume=" + getVolumeBasePath());
}
}
@ -578,9 +580,10 @@ class BlockReceiver implements Closeable {
mirrorAddr,
duration);
trackSendPacketToLastNodeInPipeline(duration);
if (duration > datanodeSlowLogThresholdMs) {
if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), "
+ "downstream DNs=" + Arrays.toString(downstreamDNs));
}
} catch (IOException e) {
handleMirrorOutError(e);
@ -711,9 +714,10 @@ class BlockReceiver implements Closeable {
streams.writeDataToDisk(dataBuf.array(),
startByteToDisk, numBytesToDisk);
long duration = Time.monotonicNow() - begin;
if (duration > datanodeSlowLogThresholdMs) {
if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
LOG.warn("Slow BlockReceiver write data to disk cost:" + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), "
+ "volume=" + getVolumeBasePath());
}
if (duration > maxWriteToDiskMs) {
@ -902,9 +906,10 @@ class BlockReceiver implements Closeable {
}
lastCacheManagementOffset = offsetInBlock;
long duration = Time.monotonicNow() - begin;
if (duration > datanodeSlowLogThresholdMs) {
if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
LOG.warn("Slow manageWriterOsCache took " + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
+ "ms (threshold=" + datanodeSlowLogThresholdMs
+ "ms), volume=" + getVolumeBasePath());
}
}
} catch (Throwable t) {
@ -932,13 +937,7 @@ class BlockReceiver implements Closeable {
boolean responderClosed = false;
mirrorOut = mirrOut;
mirrorAddr = mirrAddr;
isPenultimateNode = ((downstreams != null) && (downstreams.length == 1));
if (isPenultimateNode) {
mirrorNameForMetrics = (downstreams[0].getInfoSecurePort() != 0 ?
downstreams[0].getInfoSecureAddr() : downstreams[0].getInfoAddr());
LOG.debug("Will collect peer metrics for downstream node {}",
mirrorNameForMetrics);
}
initPerfMonitoring(downstreams);
throttler = throttlerArg;
this.replyOut = replyOut;
@ -1058,6 +1057,39 @@ class BlockReceiver implements Closeable {
}
}
/**
* If we have downstream DNs and peerMetrics are enabled, then initialize
* some state for monitoring the performance of downstream DNs.
*
* @param downstreams downstream DNs, or null if there are none.
*/
private void initPerfMonitoring(DatanodeInfo[] downstreams) {
if (downstreams != null && downstreams.length > 0) {
downstreamDNs = downstreams;
isPenultimateNode = (downstreams.length == 1);
if (isPenultimateNode && datanode.getPeerMetrics() != null) {
mirrorNameForMetrics = (downstreams[0].getInfoSecurePort() != 0 ?
downstreams[0].getInfoSecureAddr() : downstreams[0].getInfoAddr());
LOG.debug("Will collect peer metrics for downstream node {}",
mirrorNameForMetrics);
}
}
}
/**
* Fetch the base path of the volume on which this replica resides.
*
* @returns Volume base path as string if available. Else returns the
* the string "unavailable".
*/
private String getVolumeBasePath() {
final FsVolumeSpi volume = replicaHandler.getVolume();
if (volume != null) {
return volume.getBasePath();
}
return "unavailable";
}
/** Cleanup a partial block
* if this write is for a replication request (and not from a client)
*/

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import java.io.Closeable;
import java.io.IOException;
@ -46,4 +47,8 @@ public class ReplicaHandler implements Closeable {
public ReplicaInPipelineInterface getReplica() {
return replica;
}
public FsVolumeSpi getVolume() {
return volumeReference.getVolume();
}
}