[HDFS-15813] DataStreamer: keep sending heartbeat packets during flush. Contributed by Daryn Sharp and Jim Brennan

This commit is contained in:
Jim Brennan 2021-02-05 21:20:30 +00:00
parent c22c77af43
commit c4918fb298
2 changed files with 112 additions and 48 deletions

View File

@ -483,6 +483,7 @@ class DataStreamer extends Daemon {
private volatile BlockConstructionStage stage; // block construction stage
protected long bytesSent = 0; // number of bytes that've been sent
private final boolean isLazyPersistFile;
private long lastPacket;
/** Nodes have been used in the pipeline before and have failed. */
private final List<DatanodeInfo> failed = new ArrayList<>();
@ -632,6 +633,7 @@ class DataStreamer extends Daemon {
response = new ResponseProcessor(nodes);
response.start();
stage = BlockConstructionStage.DATA_STREAMING;
lastPacket = Time.monotonicNow();
}
protected void endBlock() {
@ -653,7 +655,6 @@ class DataStreamer extends Daemon {
*/
@Override
public void run() {
long lastPacket = Time.monotonicNow();
TraceScope scope = null;
while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder
@ -666,47 +667,38 @@ class DataStreamer extends Daemon {
// process datanode IO errors if any
boolean doSleep = processDatanodeOrExternalError();
final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
synchronized (dataQueue) {
// wait for a packet to be sent.
long now = Time.monotonicNow();
while ((!shouldStop() && dataQueue.size() == 0 &&
(stage != BlockConstructionStage.DATA_STREAMING ||
now - lastPacket < halfSocketTimeout)) || doSleep) {
long timeout = halfSocketTimeout - (now-lastPacket);
timeout = timeout <= 0 ? 1000 : timeout;
timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
timeout : 1000;
while ((!shouldStop() && dataQueue.isEmpty()) || doSleep) {
long timeout = 1000;
if (stage == BlockConstructionStage.DATA_STREAMING) {
timeout = sendHeartbeat();
}
try {
dataQueue.wait(timeout);
} catch (InterruptedException e) {
LOG.debug("Thread interrupted", e);
}
doSleep = false;
now = Time.monotonicNow();
}
if (shouldStop()) {
continue;
}
// get packet to be sent.
if (dataQueue.isEmpty()) {
one = createHeartbeatPacket();
} else {
try {
backOffIfNecessary();
} catch (InterruptedException e) {
LOG.debug("Thread interrupted", e);
}
one = dataQueue.getFirst(); // regular data packet
SpanContext[] parents = one.getTraceParents();
if (parents != null && parents.length > 0) {
// The original code stored multiple parents in the DFSPacket, and
// use them ALL here when creating a new Span. We only use the
// last one FOR NOW. Moreover, we don't activate the Span for now.
scope = dfsClient.getTracer().
newScope("dataStreamer", parents[0], false);
//scope.getSpan().setParents(parents);
}
try {
backOffIfNecessary();
} catch (InterruptedException e) {
LOG.debug("Thread interrupted", e);
}
one = dataQueue.getFirst(); // regular data packet
SpanContext[] parents = one.getTraceParents();
if (parents != null && parents.length > 0) {
// The original code stored multiple parents in the DFSPacket, and
// use them ALL here when creating a new Span. We only use the
// last one FOR NOW. Moreover, we don't activate the Span for now.
scope = dfsClient.getTracer().
newScope("dataStreamer", parents[0], false);
//scope.getSpan().setParents(parents);
}
}
@ -734,17 +726,8 @@ class DataStreamer extends Daemon {
if (one.isLastPacketInBlock()) {
// wait for all data packets have been successfully acked
synchronized (dataQueue) {
while (!shouldStop() && ackQueue.size() != 0) {
try {
// wait for acks to arrive from datanodes
dataQueue.wait(1000);
} catch (InterruptedException e) {
LOG.debug("Thread interrupted", e);
}
}
}
if (shouldStop()) {
waitForAllAcks();
if(shouldStop()) {
continue;
}
stage = BlockConstructionStage.PIPELINE_CLOSE;
@ -773,8 +756,7 @@ class DataStreamer extends Daemon {
// write out data to remote datanode
try (TraceScope ignored = dfsClient.getTracer().
newScope("DataStreamer#writeTo", spanContext)) {
one.writeTo(blockStream);
blockStream.flush();
sendPacket(one);
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
// write to primary DN. If a failed or restarting node has already
@ -785,7 +767,6 @@ class DataStreamer extends Daemon {
errorState.markFirstNodeIfNotMarked();
throw e;
}
lastPacket = Time.monotonicNow();
// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
@ -800,11 +781,7 @@ class DataStreamer extends Daemon {
// Is this block full?
if (one.isLastPacketInBlock()) {
// wait for the close packet has been acked
synchronized (dataQueue) {
while (!shouldStop() && ackQueue.size() != 0) {
dataQueue.wait(1000);// wait for acks to arrive from datanodes
}
}
waitForAllAcks();
if (shouldStop()) {
continue;
}
@ -845,6 +822,48 @@ class DataStreamer extends Daemon {
closeInternal();
}
private void waitForAllAcks() throws IOException {
// wait until all data packets have been successfully acked
synchronized (dataQueue) {
while (!shouldStop() && !ackQueue.isEmpty()) {
try {
// wait for acks to arrive from datanodes
dataQueue.wait(sendHeartbeat());
} catch (InterruptedException e) {
LOG.debug("Thread interrupted ", e);
}
}
}
}
private void sendPacket(DFSPacket packet) throws IOException {
// write out data to remote datanode
try {
packet.writeTo(blockStream);
blockStream.flush();
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
// write to primary DN. If a failed or restarting node has already
// been recorded by the responder, the following call will have no
// effect. Pipeline recovery can handle only one node error at a
// time. If the primary node fails again during the recovery, it
// will be taken out then.
errorState.markFirstNodeIfNotMarked();
throw e;
}
lastPacket = Time.monotonicNow();
}
private long sendHeartbeat() throws IOException {
final long heartbeatInterval = dfsClient.getConf().getSocketTimeout()/2;
long timeout = heartbeatInterval - (Time.monotonicNow() - lastPacket);
if (timeout <= 0) {
sendPacket(createHeartbeatPacket());
timeout = heartbeatInterval;
}
return timeout;
}
private void closeInternal() {
closeResponder(); // close and join
closeStream();

View File

@ -245,6 +245,51 @@ public class TestClientProtocolForPipelineRecovery {
}
}
/**
* Test to ensure heartbeats continue during a flush in case of
* delayed acks.
*/
@Test
public void testHeartbeatDuringFlush() throws Exception {
// Delay sending acks
DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() {
@Override
public void delaySendingAckToUpstream(final String upstreamAddr)
throws IOException {
try {
Thread.sleep(3500); // delay longer than socket timeout
} catch (InterruptedException ie) {
throw new IOException("Interrupted while sleeping");
}
}
};
DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
// Setting the timeout to be 3 seconds. Heartbeat packet
// should be sent every 1.5 seconds if there is no data traffic.
Configuration conf = new HdfsConfiguration();
conf.set(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, "3000");
MiniDFSCluster cluster = null;
try {
int numDataNodes = 1;
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDataNodes).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
FSDataOutputStream out = fs.create(new Path("delayedack.dat"), (short)2);
out.write(0x31);
out.hflush();
DataNodeFaultInjector.set(dnFaultInjector); // cause ack delay
out.close();
} finally {
DataNodeFaultInjector.set(oldDnInjector);
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test recovery on restart OOB message. It also tests the delivery of
* OOB ack originating from the primary datanode. Since there is only