HDFS-16293. Client sleeps and holds 'dataQueue' when DataNodes are congested. Contributed by Yuanxin Zhu.
This commit is contained in:
parent
c2afb6a00b
commit
e8e69de106
|
@ -687,11 +687,6 @@ class DataStreamer extends Daemon {
|
|||
continue;
|
||||
}
|
||||
// get packet to be sent.
|
||||
try {
|
||||
backOffIfNecessary();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Thread interrupted", e);
|
||||
}
|
||||
one = dataQueue.getFirst(); // regular data packet
|
||||
SpanContext[] parents = one.getTraceParents();
|
||||
if (parents != null && parents.length > 0) {
|
||||
|
@ -704,6 +699,14 @@ class DataStreamer extends Daemon {
|
|||
}
|
||||
}
|
||||
|
||||
// The DataStreamer has to release the dataQueue before sleeping,
|
||||
// otherwise it will cause the ResponseProcessor to accept the ACK delay.
|
||||
try {
|
||||
backOffIfNecessary();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Thread interrupted", e);
|
||||
}
|
||||
|
||||
// get new block from namenode.
|
||||
LOG.debug("stage={}, {}", stage, this);
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.LinkedList;
|
|||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
|
@ -300,6 +301,86 @@ public class TestDFSOutputStream {
|
|||
Assert.assertTrue(congestedNodes.isEmpty());
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testCongestionAckDelay() {
|
||||
DfsClientConf dfsClientConf = mock(DfsClientConf.class);
|
||||
DFSClient client = mock(DFSClient.class);
|
||||
when(client.getConf()).thenReturn(dfsClientConf);
|
||||
when(client.getTracer()).thenReturn(FsTracer.get(new Configuration()));
|
||||
client.clientRunning = true;
|
||||
DataStreamer stream = new DataStreamer(
|
||||
mock(HdfsFileStatus.class),
|
||||
mock(ExtendedBlock.class),
|
||||
client,
|
||||
"foo", null, null, null, null, null, null);
|
||||
DataOutputStream blockStream = mock(DataOutputStream.class);
|
||||
Whitebox.setInternalState(stream, "blockStream", blockStream);
|
||||
Whitebox.setInternalState(stream, "stage",
|
||||
BlockConstructionStage.PIPELINE_CLOSE);
|
||||
@SuppressWarnings("unchecked")
|
||||
LinkedList<DFSPacket> dataQueue = (LinkedList<DFSPacket>)
|
||||
Whitebox.getInternalState(stream, "dataQueue");
|
||||
@SuppressWarnings("unchecked")
|
||||
ArrayList<DatanodeInfo> congestedNodes = (ArrayList<DatanodeInfo>)
|
||||
Whitebox.getInternalState(stream, "congestedNodes");
|
||||
int backOffMaxTime = (int)
|
||||
Whitebox.getInternalState(stream, "CONGESTION_BACK_OFF_MAX_TIME_IN_MS");
|
||||
DFSPacket[] packet = new DFSPacket[100];
|
||||
AtomicBoolean isDelay = new AtomicBoolean(true);
|
||||
|
||||
// ResponseProcessor needs the dataQueue for the next step.
|
||||
new Thread(() -> {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
// In order to ensure that other threads run for a period of time to prevent affecting
|
||||
// the results.
|
||||
try {
|
||||
Thread.sleep(backOffMaxTime / 50);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
synchronized (dataQueue) {
|
||||
congestedNodes.add(mock(DatanodeInfo.class));
|
||||
// The DataStreamer releases the dataQueue before sleeping, and the ResponseProcessor
|
||||
// has time to hold the dataQueue to continuously accept ACKs and add congestedNodes
|
||||
// to the list. Therefore, congestedNodes.size() is greater than 1.
|
||||
if (congestedNodes.size() > 1){
|
||||
isDelay.set(false);
|
||||
try {
|
||||
doThrow(new IOException()).when(blockStream).flush();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
try {
|
||||
doThrow(new IOException()).when(blockStream).flush();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
// Prevent the DataStreamer from always waiting because the
|
||||
// dataQueue may be empty, so that the unit test cannot exit.
|
||||
DFSPacket endPacket = mock(DFSPacket.class);
|
||||
dataQueue.add(endPacket);
|
||||
}).start();
|
||||
|
||||
// The purpose of adding packets to the dataQueue is to make the DataStreamer run
|
||||
// normally and judge whether to enter the sleep state according to the congestion.
|
||||
new Thread(() -> {
|
||||
for (int i = 0; i < 100; i++) {
|
||||
packet[i] = mock(DFSPacket.class);
|
||||
dataQueue.add(packet[i]);
|
||||
try {
|
||||
Thread.sleep(backOffMaxTime / 100);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
stream.run();
|
||||
Assert.assertFalse(isDelay.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoLocalWriteFlag() throws IOException {
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
|
|
Loading…
Reference in New Issue