HDFS-16293. Client sleeps and holds 'dataQueue' when DataNodes are congested. Contributed by Yuanxin Zhu.

(cherry picked from commit e8e69de106)
This commit is contained in:
Takanobu Asanuma 2021-12-06 10:44:36 +09:00 committed by Chao Sun
parent e50c9224fe
commit 906eb831c6
2 changed files with 89 additions and 5 deletions

View File

@ -685,11 +685,6 @@ class DataStreamer extends Daemon {
continue; continue;
} }
// get packet to be sent. // get packet to be sent.
try {
backOffIfNecessary();
} catch (InterruptedException e) {
LOG.debug("Thread interrupted", e);
}
one = dataQueue.getFirst(); // regular data packet one = dataQueue.getFirst(); // regular data packet
SpanContext[] parents = one.getTraceParents(); SpanContext[] parents = one.getTraceParents();
if (parents != null && parents.length > 0) { if (parents != null && parents.length > 0) {
@ -702,6 +697,14 @@ class DataStreamer extends Daemon {
} }
} }
// The DataStreamer has to release the dataQueue before sleeping,
// otherwise it will cause the ResponseProcessor to accept the ACK delay.
try {
backOffIfNecessary();
} catch (InterruptedException e) {
LOG.debug("Thread interrupted", e);
}
// get new block from namenode. // get new block from namenode.
LOG.debug("stage={}, {}", stage, this); LOG.debug("stage={}, {}", stage, this);

View File

@ -30,6 +30,7 @@ import java.util.EnumSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
@ -296,6 +297,86 @@ public class TestDFSOutputStream {
Assert.assertTrue(congestedNodes.isEmpty()); Assert.assertTrue(congestedNodes.isEmpty());
} }
@Test(timeout=60000)
public void testCongestionAckDelay() {
DfsClientConf dfsClientConf = mock(DfsClientConf.class);
DFSClient client = mock(DFSClient.class);
when(client.getConf()).thenReturn(dfsClientConf);
when(client.getTracer()).thenReturn(FsTracer.get(new Configuration()));
client.clientRunning = true;
DataStreamer stream = new DataStreamer(
mock(HdfsFileStatus.class),
mock(ExtendedBlock.class),
client,
"foo", null, null, null, null, null, null);
DataOutputStream blockStream = mock(DataOutputStream.class);
Whitebox.setInternalState(stream, "blockStream", blockStream);
Whitebox.setInternalState(stream, "stage",
BlockConstructionStage.PIPELINE_CLOSE);
@SuppressWarnings("unchecked")
LinkedList<DFSPacket> dataQueue = (LinkedList<DFSPacket>)
Whitebox.getInternalState(stream, "dataQueue");
@SuppressWarnings("unchecked")
ArrayList<DatanodeInfo> congestedNodes = (ArrayList<DatanodeInfo>)
Whitebox.getInternalState(stream, "congestedNodes");
int backOffMaxTime = (int)
Whitebox.getInternalState(stream, "CONGESTION_BACK_OFF_MAX_TIME_IN_MS");
DFSPacket[] packet = new DFSPacket[100];
AtomicBoolean isDelay = new AtomicBoolean(true);
// ResponseProcessor needs the dataQueue for the next step.
new Thread(() -> {
for (int i = 0; i < 10; i++) {
// In order to ensure that other threads run for a period of time to prevent affecting
// the results.
try {
Thread.sleep(backOffMaxTime / 50);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (dataQueue) {
congestedNodes.add(mock(DatanodeInfo.class));
// The DataStreamer releases the dataQueue before sleeping, and the ResponseProcessor
// has time to hold the dataQueue to continuously accept ACKs and add congestedNodes
// to the list. Therefore, congestedNodes.size() is greater than 1.
if (congestedNodes.size() > 1){
isDelay.set(false);
try {
doThrow(new IOException()).when(blockStream).flush();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
try {
doThrow(new IOException()).when(blockStream).flush();
} catch (Exception e) {
e.printStackTrace();
}
// Prevent the DataStreamer from always waiting because the
// dataQueue may be empty, so that the unit test cannot exit.
DFSPacket endPacket = mock(DFSPacket.class);
dataQueue.add(endPacket);
}).start();
// The purpose of adding packets to the dataQueue is to make the DataStreamer run
// normally and judge whether to enter the sleep state according to the congestion.
new Thread(() -> {
for (int i = 0; i < 100; i++) {
packet[i] = mock(DFSPacket.class);
dataQueue.add(packet[i]);
try {
Thread.sleep(backOffMaxTime / 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
stream.run();
Assert.assertFalse(isDelay.get());
}
@Test @Test
public void testNoLocalWriteFlag() throws IOException { public void testNoLocalWriteFlag() throws IOException {
DistributedFileSystem fs = cluster.getFileSystem(); DistributedFileSystem fs = cluster.getFileSystem();