HDFS-15865. Interrupt DataStreamer thread if no ack (#2728)

(cherry picked from commit bd3da73a0f)
This commit is contained in:
Karthik Palanisamy 2021-05-01 11:05:31 -07:00 committed by S O'Donnell
parent 98aa4fc32c
commit 00c3f3c42f
1 changed files with 12 additions and 0 deletions

View File

@ -892,6 +892,8 @@ class DataStreamer extends Daemon {
try (TraceScope ignored = dfsClient.getTracer(). try (TraceScope ignored = dfsClient.getTracer().
newScope("waitForAckedSeqno")) { newScope("waitForAckedSeqno")) {
LOG.debug("{} waiting for ack for: {}", this, seqno); LOG.debug("{} waiting for ack for: {}", this, seqno);
int dnodes = nodes != null ? nodes.length : 3;
int writeTimeout = dfsClient.getDatanodeWriteTimeout(dnodes);
long begin = Time.monotonicNow(); long begin = Time.monotonicNow();
try { try {
synchronized (dataQueue) { synchronized (dataQueue) {
@ -902,6 +904,16 @@ class DataStreamer extends Daemon {
} }
try { try {
dataQueue.wait(1000); // when we receive an ack, we notify on dataQueue.wait(1000); // when we receive an ack, we notify on
long duration = Time.monotonicNow() - begin;
if (duration > writeTimeout) {
LOG.error("No ack received, took {}ms (threshold={}ms). "
+ "File being written: {}, block: {}, "
+ "Write pipeline datanodes: {}.",
duration, writeTimeout, src, block, nodes);
throw new InterruptedIOException("No ack received after " +
duration / 1000 + "s and a timeout of " +
writeTimeout / 1000 + "s");
}
// dataQueue // dataQueue
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
throw new InterruptedIOException( throw new InterruptedIOException(