diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 406c29cb289..df5a479e8b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -483,6 +483,7 @@ class DataStreamer extends Daemon { private volatile BlockConstructionStage stage; // block construction stage protected long bytesSent = 0; // number of bytes that've been sent private final boolean isLazyPersistFile; + private long lastPacket; /** Nodes have been used in the pipeline before and have failed. */ private final List failed = new ArrayList<>(); @@ -632,6 +633,7 @@ class DataStreamer extends Daemon { response = new ResponseProcessor(nodes); response.start(); stage = BlockConstructionStage.DATA_STREAMING; + lastPacket = Time.monotonicNow(); } protected void endBlock() { @@ -653,7 +655,6 @@ class DataStreamer extends Daemon { */ @Override public void run() { - long lastPacket = Time.monotonicNow(); TraceScope scope = null; while (!streamerClosed && dfsClient.clientRunning) { // if the Responder encountered an error, shutdown Responder @@ -666,47 +667,38 @@ class DataStreamer extends Daemon { // process datanode IO errors if any boolean doSleep = processDatanodeOrExternalError(); - final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2; synchronized (dataQueue) { // wait for a packet to be sent. - long now = Time.monotonicNow(); - while ((!shouldStop() && dataQueue.size() == 0 && - (stage != BlockConstructionStage.DATA_STREAMING || - now - lastPacket < halfSocketTimeout)) || doSleep) { - long timeout = halfSocketTimeout - (now-lastPacket); - timeout = timeout <= 0 ? 1000 : timeout; - timeout = (stage == BlockConstructionStage.DATA_STREAMING)? - timeout : 1000; + while ((!shouldStop() && dataQueue.isEmpty()) || doSleep) { + long timeout = 1000; + if (stage == BlockConstructionStage.DATA_STREAMING) { + timeout = sendHeartbeat(); + } try { dataQueue.wait(timeout); } catch (InterruptedException e) { LOG.debug("Thread interrupted", e); } doSleep = false; - now = Time.monotonicNow(); } if (shouldStop()) { continue; } // get packet to be sent. - if (dataQueue.isEmpty()) { - one = createHeartbeatPacket(); - } else { - try { - backOffIfNecessary(); - } catch (InterruptedException e) { - LOG.debug("Thread interrupted", e); - } - one = dataQueue.getFirst(); // regular data packet - SpanContext[] parents = one.getTraceParents(); - if (parents != null && parents.length > 0) { - // The original code stored multiple parents in the DFSPacket, and - // use them ALL here when creating a new Span. We only use the - // last one FOR NOW. Moreover, we don't activate the Span for now. - scope = dfsClient.getTracer(). - newScope("dataStreamer", parents[0], false); - //scope.getSpan().setParents(parents); - } + try { + backOffIfNecessary(); + } catch (InterruptedException e) { + LOG.debug("Thread interrupted", e); + } + one = dataQueue.getFirst(); // regular data packet + SpanContext[] parents = one.getTraceParents(); + if (parents != null && parents.length > 0) { + // The original code stored multiple parents in the DFSPacket, and + // use them ALL here when creating a new Span. We only use the + // last one FOR NOW. Moreover, we don't activate the Span for now. + scope = dfsClient.getTracer(). + newScope("dataStreamer", parents[0], false); + //scope.getSpan().setParents(parents); } } @@ -734,17 +726,8 @@ class DataStreamer extends Daemon { if (one.isLastPacketInBlock()) { // wait for all data packets have been successfully acked - synchronized (dataQueue) { - while (!shouldStop() && ackQueue.size() != 0) { - try { - // wait for acks to arrive from datanodes - dataQueue.wait(1000); - } catch (InterruptedException e) { - LOG.debug("Thread interrupted", e); - } - } - } - if (shouldStop()) { + waitForAllAcks(); + if(shouldStop()) { continue; } stage = BlockConstructionStage.PIPELINE_CLOSE; @@ -773,8 +756,7 @@ class DataStreamer extends Daemon { // write out data to remote datanode try (TraceScope ignored = dfsClient.getTracer(). newScope("DataStreamer#writeTo", spanContext)) { - one.writeTo(blockStream); - blockStream.flush(); + sendPacket(one); } catch (IOException e) { // HDFS-3398 treat primary DN is down since client is unable to // write to primary DN. If a failed or restarting node has already @@ -785,7 +767,6 @@ class DataStreamer extends Daemon { errorState.markFirstNodeIfNotMarked(); throw e; } - lastPacket = Time.monotonicNow(); // update bytesSent long tmpBytesSent = one.getLastByteOffsetBlock(); @@ -800,11 +781,7 @@ class DataStreamer extends Daemon { // Is this block full? if (one.isLastPacketInBlock()) { // wait for the close packet has been acked - synchronized (dataQueue) { - while (!shouldStop() && ackQueue.size() != 0) { - dataQueue.wait(1000);// wait for acks to arrive from datanodes - } - } + waitForAllAcks(); if (shouldStop()) { continue; } @@ -845,6 +822,48 @@ class DataStreamer extends Daemon { closeInternal(); } + private void waitForAllAcks() throws IOException { + // wait until all data packets have been successfully acked + synchronized (dataQueue) { + while (!shouldStop() && !ackQueue.isEmpty()) { + try { + // wait for acks to arrive from datanodes + dataQueue.wait(sendHeartbeat()); + } catch (InterruptedException e) { + LOG.debug("Thread interrupted ", e); + } + } + } + } + + private void sendPacket(DFSPacket packet) throws IOException { + // write out data to remote datanode + try { + packet.writeTo(blockStream); + blockStream.flush(); + } catch (IOException e) { + // HDFS-3398 treat primary DN is down since client is unable to + // write to primary DN. If a failed or restarting node has already + // been recorded by the responder, the following call will have no + // effect. Pipeline recovery can handle only one node error at a + // time. If the primary node fails again during the recovery, it + // will be taken out then. + errorState.markFirstNodeIfNotMarked(); + throw e; + } + lastPacket = Time.monotonicNow(); + } + + private long sendHeartbeat() throws IOException { + final long heartbeatInterval = dfsClient.getConf().getSocketTimeout()/2; + long timeout = heartbeatInterval - (Time.monotonicNow() - lastPacket); + if (timeout <= 0) { + sendPacket(createHeartbeatPacket()); + timeout = heartbeatInterval; + } + return timeout; + } + private void closeInternal() { closeResponder(); // close and join closeStream(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index e38c0a59777..873af8b5050 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -245,6 +245,51 @@ public class TestClientProtocolForPipelineRecovery { } } + /** + * Test to ensure heartbeats continue during a flush in case of + * delayed acks. + */ + @Test + public void testHeartbeatDuringFlush() throws Exception { + // Delay sending acks + DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() { + @Override + public void delaySendingAckToUpstream(final String upstreamAddr) + throws IOException { + try { + Thread.sleep(3500); // delay longer than socket timeout + } catch (InterruptedException ie) { + throw new IOException("Interrupted while sleeping"); + } + } + }; + DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get(); + + // Setting the timeout to be 3 seconds. Heartbeat packet + // should be sent every 1.5 seconds if there is no data traffic. + Configuration conf = new HdfsConfiguration(); + conf.set(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, "3000"); + MiniDFSCluster cluster = null; + + try { + int numDataNodes = 1; + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numDataNodes).build(); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + FSDataOutputStream out = fs.create(new Path("delayedack.dat"), (short)2); + out.write(0x31); + out.hflush(); + DataNodeFaultInjector.set(dnFaultInjector); // cause ack delay + out.close(); + } finally { + DataNodeFaultInjector.set(oldDnInjector); + if (cluster != null) { + cluster.shutdown(); + } + } + } + /** * Test recovery on restart OOB message. It also tests the delivery of * OOB ack originating from the primary datanode. Since there is only