diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 772eb6689b0..358a485900d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -685,11 +685,6 @@ public void run() { continue; } // get packet to be sent. - try { - backOffIfNecessary(); - } catch (InterruptedException e) { - LOG.debug("Thread interrupted", e); - } one = dataQueue.getFirst(); // regular data packet SpanContext[] parents = one.getTraceParents(); if (parents != null && parents.length > 0) { @@ -702,6 +697,14 @@ public void run() { } } + // The DataStreamer has to release the dataQueue before sleeping, + // otherwise it will cause the ResponseProcessor to accept the ACK delay. + try { + backOffIfNecessary(); + } catch (InterruptedException e) { + LOG.debug("Thread interrupted", e); + } + // get new block from namenode. LOG.debug("stage={}, {}", stage, this); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 8854ec87ead..54c292b0f7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -30,6 +30,7 @@ import java.util.LinkedList; import java.util.Map; import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; @@ -296,6 +297,86 @@ public void testCongestionBackoff() throws IOException { Assert.assertTrue(congestedNodes.isEmpty()); } + @Test(timeout=60000) + public void testCongestionAckDelay() { + DfsClientConf dfsClientConf = mock(DfsClientConf.class); + DFSClient client = mock(DFSClient.class); + when(client.getConf()).thenReturn(dfsClientConf); + when(client.getTracer()).thenReturn(FsTracer.get(new Configuration())); + client.clientRunning = true; + DataStreamer stream = new DataStreamer( + mock(HdfsFileStatus.class), + mock(ExtendedBlock.class), + client, + "foo", null, null, null, null, null, null); + DataOutputStream blockStream = mock(DataOutputStream.class); + Whitebox.setInternalState(stream, "blockStream", blockStream); + Whitebox.setInternalState(stream, "stage", + BlockConstructionStage.PIPELINE_CLOSE); + @SuppressWarnings("unchecked") + LinkedList dataQueue = (LinkedList) + Whitebox.getInternalState(stream, "dataQueue"); + @SuppressWarnings("unchecked") + ArrayList congestedNodes = (ArrayList) + Whitebox.getInternalState(stream, "congestedNodes"); + int backOffMaxTime = (int) + Whitebox.getInternalState(stream, "CONGESTION_BACK_OFF_MAX_TIME_IN_MS"); + DFSPacket[] packet = new DFSPacket[100]; + AtomicBoolean isDelay = new AtomicBoolean(true); + + // ResponseProcessor needs the dataQueue for the next step. + new Thread(() -> { + for (int i = 0; i < 10; i++) { + // In order to ensure that other threads run for a period of time to prevent affecting + // the results. + try { + Thread.sleep(backOffMaxTime / 50); + } catch (InterruptedException e) { + e.printStackTrace(); + } + synchronized (dataQueue) { + congestedNodes.add(mock(DatanodeInfo.class)); + // The DataStreamer releases the dataQueue before sleeping, and the ResponseProcessor + // has time to hold the dataQueue to continuously accept ACKs and add congestedNodes + // to the list. Therefore, congestedNodes.size() is greater than 1. + if (congestedNodes.size() > 1){ + isDelay.set(false); + try { + doThrow(new IOException()).when(blockStream).flush(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + try { + doThrow(new IOException()).when(blockStream).flush(); + } catch (Exception e) { + e.printStackTrace(); + } + // Prevent the DataStreamer from always waiting because the + // dataQueue may be empty, so that the unit test cannot exit. + DFSPacket endPacket = mock(DFSPacket.class); + dataQueue.add(endPacket); + }).start(); + + // The purpose of adding packets to the dataQueue is to make the DataStreamer run + // normally and judge whether to enter the sleep state according to the congestion. + new Thread(() -> { + for (int i = 0; i < 100; i++) { + packet[i] = mock(DFSPacket.class); + dataQueue.add(packet[i]); + try { + Thread.sleep(backOffMaxTime / 100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }).start(); + stream.run(); + Assert.assertFalse(isDelay.get()); + } + @Test public void testNoLocalWriteFlag() throws IOException { DistributedFileSystem fs = cluster.getFileSystem();