HDFS-14260. Replace synchronized method in BlockReceiver with atomic value. Contributed by BELUGA BEHR.
This commit is contained in:
parent
73b67b2df5
commit
0ceb1b70f3
|
@ -31,6 +31,7 @@ import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.zip.Checksum;
|
import java.util.zip.Checksum;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -142,7 +143,7 @@ class BlockReceiver implements Closeable {
|
||||||
private long maxWriteToDiskMs = 0;
|
private long maxWriteToDiskMs = 0;
|
||||||
|
|
||||||
private boolean pinning;
|
private boolean pinning;
|
||||||
private long lastSentTime;
|
private final AtomicLong lastSentTime = new AtomicLong(0L);
|
||||||
private long maxSendIdleTime;
|
private long maxSendIdleTime;
|
||||||
|
|
||||||
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
|
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
|
||||||
|
@ -182,7 +183,7 @@ class BlockReceiver implements Closeable {
|
||||||
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
|
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
|
||||||
|
|
||||||
this.pinning = pinning;
|
this.pinning = pinning;
|
||||||
this.lastSentTime = Time.monotonicNow();
|
this.lastSentTime.set(Time.monotonicNow());
|
||||||
// Downstream will timeout in readTimeout on receiving the next packet.
|
// Downstream will timeout in readTimeout on receiving the next packet.
|
||||||
// If there is no data traffic, a heartbeat packet is sent at
|
// If there is no data traffic, a heartbeat packet is sent at
|
||||||
// the interval of 0.5*readTimeout. Here, we set 0.9*readTimeout to be
|
// the interval of 0.5*readTimeout. Here, we set 0.9*readTimeout to be
|
||||||
|
@ -379,23 +380,28 @@ class BlockReceiver implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void setLastSentTime(long sentTime) {
|
|
||||||
lastSentTime = sentTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* It can return false if
|
* Check if a packet was sent within an acceptable period of time.
|
||||||
* - upstream did not send packet for a long time
|
*
|
||||||
* - a packet was received but got stuck in local disk I/O.
|
* Some example of when this method may return false:
|
||||||
* - a packet was received but got stuck on send to mirror.
|
* <ul>
|
||||||
|
* <li>Upstream did not send packet for a long time</li>
|
||||||
|
* <li>Packet was received but got stuck in local disk I/O</li>
|
||||||
|
* <li>Packet was received but got stuck on send to mirror</li>
|
||||||
|
* </ul>
|
||||||
|
*
|
||||||
|
* @return true if packet was sent within an acceptable period of time;
|
||||||
|
* otherwise false.
|
||||||
*/
|
*/
|
||||||
synchronized boolean packetSentInTime() {
|
boolean packetSentInTime() {
|
||||||
long diff = Time.monotonicNow() - lastSentTime;
|
final long diff = Time.monotonicNow() - this.lastSentTime.get();
|
||||||
if (diff > maxSendIdleTime) {
|
final boolean allowedIdleTime = (diff <= this.maxSendIdleTime);
|
||||||
LOG.info("A packet was last sent " + diff + " milliseconds ago.");
|
LOG.debug("A packet was last sent {}ms ago.", diff);
|
||||||
return false;
|
if (!allowedIdleTime) {
|
||||||
|
LOG.warn("A packet was last sent {}ms ago. Maximum idle time: {}ms.",
|
||||||
|
diff, this.maxSendIdleTime);
|
||||||
}
|
}
|
||||||
return true;
|
return allowedIdleTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -589,7 +595,7 @@ class BlockReceiver implements Closeable {
|
||||||
packetReceiver.mirrorPacketTo(mirrorOut);
|
packetReceiver.mirrorPacketTo(mirrorOut);
|
||||||
mirrorOut.flush();
|
mirrorOut.flush();
|
||||||
long now = Time.monotonicNow();
|
long now = Time.monotonicNow();
|
||||||
setLastSentTime(now);
|
this.lastSentTime.set(now);
|
||||||
long duration = now - begin;
|
long duration = now - begin;
|
||||||
DataNodeFaultInjector.get().logDelaySendingPacketDownstream(
|
DataNodeFaultInjector.get().logDelaySendingPacketDownstream(
|
||||||
mirrorAddr,
|
mirrorAddr,
|
||||||
|
|
Loading…
Reference in New Issue