From 0ceb1b70f3200873fe1f40c264b91051b4a3d721 Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Mon, 11 Feb 2019 10:09:44 -0800 Subject: [PATCH] HDFS-14260. Replace synchronized method in BlockReceiver with atomic value. Contributed by BELUGA BEHR. --- .../hdfs/server/datanode/BlockReceiver.java | 40 +++++++++++-------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index cb1f73d2fed..95091840430 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -31,6 +31,7 @@ import java.util.ArrayDeque; import java.util.Arrays; import java.util.Queue; +import java.util.concurrent.atomic.AtomicLong; import java.util.zip.Checksum; import org.apache.commons.logging.Log; @@ -142,7 +143,7 @@ class BlockReceiver implements Closeable { private long maxWriteToDiskMs = 0; private boolean pinning; - private long lastSentTime; + private final AtomicLong lastSentTime = new AtomicLong(0L); private long maxSendIdleTime; BlockReceiver(final ExtendedBlock block, final StorageType storageType, @@ -182,7 +183,7 @@ class BlockReceiver implements Closeable { || stage == BlockConstructionStage.TRANSFER_FINALIZED; this.pinning = pinning; - this.lastSentTime = Time.monotonicNow(); + this.lastSentTime.set(Time.monotonicNow()); // Downstream will timeout in readTimeout on receiving the next packet. // If there is no data traffic, a heartbeat packet is sent at // the interval of 0.5*readTimeout. Here, we set 0.9*readTimeout to be @@ -379,23 +380,28 @@ public void close() throws IOException { } } - synchronized void setLastSentTime(long sentTime) { - lastSentTime = sentTime; - } - /** - * It can return false if - * - upstream did not send packet for a long time - * - a packet was received but got stuck in local disk I/O. - * - a packet was received but got stuck on send to mirror. + * Check if a packet was sent within an acceptable period of time. + * + * Some example of when this method may return false: + * + * + * @return true if packet was sent within an acceptable period of time; + * otherwise false. */ - synchronized boolean packetSentInTime() { - long diff = Time.monotonicNow() - lastSentTime; - if (diff > maxSendIdleTime) { - LOG.info("A packet was last sent " + diff + " milliseconds ago."); - return false; + boolean packetSentInTime() { + final long diff = Time.monotonicNow() - this.lastSentTime.get(); + final boolean allowedIdleTime = (diff <= this.maxSendIdleTime); + LOG.debug("A packet was last sent {}ms ago.", diff); + if (!allowedIdleTime) { + LOG.warn("A packet was last sent {}ms ago. Maximum idle time: {}ms.", + diff, this.maxSendIdleTime); } - return true; + return allowedIdleTime; } /** @@ -589,7 +595,7 @@ private int receivePacket() throws IOException { packetReceiver.mirrorPacketTo(mirrorOut); mirrorOut.flush(); long now = Time.monotonicNow(); - setLastSentTime(now); + this.lastSentTime.set(now); long duration = now - begin; DataNodeFaultInjector.get().logDelaySendingPacketDownstream( mirrorAddr,