From 00c3f3c42f93a5bbf5c48608619867560cc82b09 Mon Sep 17 00:00:00 2001 From: Karthik Palanisamy Date: Sat, 1 May 2021 11:05:31 -0700 Subject: [PATCH] HDFS-15865. Interrupt DataStreamer thread if no ack (#2728) (cherry picked from commit bd3da73a0ff75231340b1168f7805164710bf4fe) --- .../java/org/apache/hadoop/hdfs/DataStreamer.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 3d6d3c53f9e..3e368ae4dcd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -892,6 +892,8 @@ void waitForAckedSeqno(long seqno) throws IOException { try (TraceScope ignored = dfsClient.getTracer(). newScope("waitForAckedSeqno")) { LOG.debug("{} waiting for ack for: {}", this, seqno); + int dnodes = nodes != null ? nodes.length : 3; + int writeTimeout = dfsClient.getDatanodeWriteTimeout(dnodes); long begin = Time.monotonicNow(); try { synchronized (dataQueue) { @@ -902,6 +904,16 @@ void waitForAckedSeqno(long seqno) throws IOException { } try { dataQueue.wait(1000); // when we receive an ack, we notify on + long duration = Time.monotonicNow() - begin; + if (duration > writeTimeout) { + LOG.error("No ack received, took {}ms (threshold={}ms). " + + "File being written: {}, block: {}, " + + "Write pipeline datanodes: {}.", + duration, writeTimeout, src, block, nodes); + throw new InterruptedIOException("No ack received after " + + duration / 1000 + "s and a timeout of " + + writeTimeout / 1000 + "s"); + } // dataQueue } catch (InterruptedException ie) { throw new InterruptedIOException(