diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java index e1698c98c03..0a8c9151f1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java @@ -55,6 +55,7 @@ public class DatanodeInfo extends DatanodeID implements Node { private String softwareVersion; private List dependentHostNames = new LinkedList<>(); private String upgradeDomain; + public static final DatanodeInfo[] EMPTY_ARRAY = {}; // Datanode administrative states public enum AdminStates { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index bb6bd5570e2..00109e052d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -100,6 +100,7 @@ class BlockReceiver implements Closeable { private DataTransferThrottler throttler; private ReplicaOutputStreams streams; private DatanodeInfo srcDataNode = null; + private DatanodeInfo[] downstreamDNs = DatanodeInfo.EMPTY_ARRAY; private final DataNode datanode; volatile private boolean mirrorError; @@ -424,10 +425,10 @@ class BlockReceiver implements Closeable { } } long duration = Time.monotonicNow() - begin; - if (duration > datanodeSlowLogThresholdMs) { + if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) { LOG.warn("Slow flushOrSync took " + duration + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), isSync:" + isSync + ", flushTotalNanos=" - + flushTotalNanos + "ns"); + + flushTotalNanos + "ns, volume=" + getVolumeBaseUri()); } } @@ -578,9 +579,10 @@ class BlockReceiver implements Closeable { mirrorAddr, duration); trackSendPacketToLastNodeInPipeline(duration); - if (duration > datanodeSlowLogThresholdMs) { + if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) { LOG.warn("Slow BlockReceiver write packet to mirror took " + duration - + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)"); + + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + + "downstream DNs=" + Arrays.toString(downstreamDNs)); } } catch (IOException e) { handleMirrorOutError(e); @@ -711,9 +713,10 @@ class BlockReceiver implements Closeable { streams.writeDataToDisk(dataBuf.array(), startByteToDisk, numBytesToDisk); long duration = Time.monotonicNow() - begin; - if (duration > datanodeSlowLogThresholdMs) { + if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) { LOG.warn("Slow BlockReceiver write data to disk cost:" + duration - + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)"); + + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + + "volume=" + getVolumeBaseUri()); } if (duration > maxWriteToDiskMs) { @@ -902,9 +905,10 @@ class BlockReceiver implements Closeable { } lastCacheManagementOffset = offsetInBlock; long duration = Time.monotonicNow() - begin; - if (duration > datanodeSlowLogThresholdMs) { + if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) { LOG.warn("Slow manageWriterOsCache took " + duration - + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)"); + + "ms (threshold=" + datanodeSlowLogThresholdMs + + "ms), volume=" + getVolumeBaseUri()); } } } catch (Throwable t) { @@ -932,13 +936,7 @@ class BlockReceiver implements Closeable { boolean responderClosed = false; mirrorOut = mirrOut; mirrorAddr = mirrAddr; - isPenultimateNode = ((downstreams != null) && (downstreams.length == 1)); - if (isPenultimateNode) { - mirrorNameForMetrics = (downstreams[0].getInfoSecurePort() != 0 ? - downstreams[0].getInfoSecureAddr() : downstreams[0].getInfoAddr()); - LOG.debug("Will collect peer metrics for downstream node {}", - mirrorNameForMetrics); - } + initPerfMonitoring(downstreams); throttler = throttlerArg; this.replyOut = replyOut; @@ -1058,6 +1056,39 @@ class BlockReceiver implements Closeable { } } + /** + * If we have downstream DNs and peerMetrics are enabled, then initialize + * some state for monitoring the performance of downstream DNs. + * + * @param downstreams downstream DNs, or null if there are none. + */ + private void initPerfMonitoring(DatanodeInfo[] downstreams) { + if (downstreams != null && downstreams.length > 0) { + downstreamDNs = downstreams; + isPenultimateNode = (downstreams.length == 1); + if (isPenultimateNode && datanode.getPeerMetrics() != null) { + mirrorNameForMetrics = (downstreams[0].getInfoSecurePort() != 0 ? + downstreams[0].getInfoSecureAddr() : downstreams[0].getInfoAddr()); + LOG.debug("Will collect peer metrics for downstream node {}", + mirrorNameForMetrics); + } + } + } + + /** + * Fetch the base URI of the volume on which this replica resides. + * + * @returns Volume base URI as string if available. Else returns the + * the string "unavailable". + */ + private String getVolumeBaseUri() { + final ReplicaInfo ri = replicaInfo.getReplicaInfo(); + if (ri != null && ri.getVolume() != null) { + return ri.getVolume().getBaseURI().toString(); + } + return "unavailable"; + } + /** Cleanup a partial block * if this write is for a replication request (and not from a client) */