diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 0cd20cf6b2c..5ad7419f106 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -92,6 +92,9 @@ Release 2.7.3 - UNRELEASED HDFS-9724. Degraded performance in WebHDFS listing as it does not reuse ObjectMapper. (Akira AJISAKA via wheat9) + HDFS-9752. Permanent write failures may happen to slow writers during + datanode rolling upgrades (Walter Su via kihwal) + HDFS-9784. Example usage is not correct in Transparent Encryption document. (Takashi Ohnishi via aajisaka) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index e8080eea61f..b8de7c4edc0 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -265,9 +265,8 @@ public class DFSOutputStream extends FSOutputSummer /** Nodes have been used in the pipeline before and have failed. */ private final List failed = new ArrayList(); - /** The last ack sequence number before pipeline failure. */ - private long lastAckedSeqnoBeforeFailure = -1; - private int pipelineRecoveryCount = 0; + /** The times have retried to recover pipeline, for the same packet. */ + private volatile int pipelineRecoveryCount = 0; /** Has the current block been hflushed? */ private boolean isHflushed = false; /** Append on an existing block? */ @@ -803,6 +802,7 @@ public class DFSOutputStream extends FSOutputSummer scope = Trace.continueSpan(one.getTraceSpan()); one.setTraceSpan(null); lastAckedSeqno = seqno; + pipelineRecoveryCount = 0; ackQueue.removeFirst(); dataQueue.notifyAll(); @@ -856,23 +856,18 @@ public class DFSOutputStream extends FSOutputSummer ackQueue.clear(); } - // Record the new pipeline failure recovery. - if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) { - lastAckedSeqnoBeforeFailure = lastAckedSeqno; - pipelineRecoveryCount = 1; - } else { - // If we had to recover the pipeline five times in a row for the - // same packet, this client likely has corrupt data or corrupting - // during transmission. - if (++pipelineRecoveryCount > 5) { - DFSClient.LOG.warn("Error recovering pipeline for writing " + - block + ". Already retried 5 times for the same packet."); - lastException.set(new IOException("Failing write. Tried pipeline " + - "recovery 5 times without success.")); - streamerClosed = true; - return false; - } + // If we had to recover the pipeline five times in a row for the + // same packet, this client likely has corrupt data or corrupting + // during transmission. + if (restartingNodeIndex.get() == -1 && ++pipelineRecoveryCount > 5) { + DFSClient.LOG.warn("Error recovering pipeline for writing " + + block + ". Already retried 5 times for the same packet."); + lastException.set(new IOException("Failing write. Tried pipeline " + + "recovery 5 times without success.")); + streamerClosed = true; + return false; } + boolean doSleep = setupPipelineForAppendOrRecovery(); if (!streamerClosed && dfsClient.clientRunning) { @@ -897,6 +892,7 @@ public class DFSOutputStream extends FSOutputSummer assert endOfBlockPacket.isLastPacketInBlock(); assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1; lastAckedSeqno = endOfBlockPacket.getSeqno(); + pipelineRecoveryCount = 0; dataQueue.notifyAll(); } endBlock(); @@ -2381,4 +2377,12 @@ public class DFSOutputStream extends FSOutputSummer System.arraycopy(srcs, 0, dsts, 0, skipIndex); System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex); } + + /** + * @return The times have retried to recover pipeline, for the same packet. + */ + @VisibleForTesting + int getPipelineRecoveryCount() { + return streamer.pipelineRecoveryCount; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index efd5d3916a0..a6e6d8b0191 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -3000,7 +3000,7 @@ public class DataNode extends ReconfigurableBase // Asynchronously start the shutdown process so that the rpc response can be // sent back. - Thread shutdownThread = new Thread() { + Thread shutdownThread = new Thread("Async datanode shutdown thread") { @Override public void run() { if (!shutdownForUpgrade) { // Delay the shutdown a bit if not doing for restart. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index c57d2a60510..c6137f72c97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -1969,6 +1969,28 @@ public class MiniDFSCluster { return stopDataNode(node); } + /* + * Shutdown a particular datanode + * @param i node index + * @return null if the node index is out of range, else the properties of the + * removed node + */ + public synchronized DataNodeProperties stopDataNodeForUpgrade(int i) + throws IOException { + if (i < 0 || i >= dataNodes.size()) { + return null; + } + DataNodeProperties dnprop = dataNodes.remove(i); + DataNode dn = dnprop.datanode; + LOG.info("MiniDFSCluster Stopping DataNode " + + dn.getDisplayName() + + " from a total of " + (dataNodes.size() + 1) + + " datanodes."); + dn.shutdownDatanode(true); + numDataNodes--; + return dnprop; + } + /** * Restart a datanode * @param dnprop datanode's property diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index d71bc4d6ab4..18e0a1b94dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs; import java.io.IOException; +import com.google.common.base.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -33,6 +34,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; @@ -256,7 +258,8 @@ public class TestClientProtocolForPipelineRecovery { final String[] args1 = {"-shutdownDatanode", dnAddr, "upgrade" }; Assert.assertEquals(0, dfsadmin.run(args1)); // Wait long enough to receive an OOB ack before closing the file. - Thread.sleep(4000); + GenericTestUtils.waitForThreadTermination( + "Async datanode shutdown thread", 100, 10000); // Retart the datanode cluster.restartDataNode(0, true); // The following forces a data packet and end of block packets to be sent. @@ -293,7 +296,8 @@ public class TestClientProtocolForPipelineRecovery { // issue shutdown to the datanode. final String[] args1 = {"-shutdownDatanode", dnAddr1, "upgrade" }; Assert.assertEquals(0, dfsadmin.run(args1)); - Thread.sleep(4000); + GenericTestUtils.waitForThreadTermination( + "Async datanode shutdown thread", 100, 10000); // This should succeed without restarting the node. The restart will // expire and regular pipeline recovery will kick in. out.close(); @@ -309,7 +313,8 @@ public class TestClientProtocolForPipelineRecovery { // issue shutdown to the datanode. final String[] args2 = {"-shutdownDatanode", dnAddr2, "upgrade" }; Assert.assertEquals(0, dfsadmin.run(args2)); - Thread.sleep(4000); + GenericTestUtils.waitForThreadTermination( + "Async datanode shutdown thread", 100, 10000); try { // close should fail out.close(); @@ -321,4 +326,53 @@ public class TestClientProtocolForPipelineRecovery { } } } + + /** + * HDFS-9752. The client keeps sending heartbeat packets during datanode + * rolling upgrades. The client should be able to retry pipeline recovery + * more times than the default. + * (in a row for the same packet, including the heartbeat packet) + * (See{@link DataStreamer#pipelineRecoveryCount}) + */ + @Test(timeout = 60000) + public void testPipelineRecoveryOnDatanodeUpgrade() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitActive(); + FileSystem fileSys = cluster.getFileSystem(); + + Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade"); + DFSTestUtil.createFile(fileSys, file, 10240L, (short) 2, 0L); + final DFSOutputStream out = (DFSOutputStream) (fileSys.append(file). + getWrappedStream()); + out.write(1); + out.hflush(); + + final long oldGs = out.getBlock().getGenerationStamp(); + MiniDFSCluster.DataNodeProperties dnProps = + cluster.stopDataNodeForUpgrade(0); + GenericTestUtils.waitForThreadTermination( + "Async datanode shutdown thread", 100, 10000); + cluster.restartDataNode(dnProps, true); + cluster.waitActive(); + + // wait pipeline to be recovered + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return out.getBlock().getGenerationStamp() > oldGs; + } + }, 100, 10000); + Assert.assertEquals("The pipeline recovery count shouldn't increase", + 0, out.getPipelineRecoveryCount()); + out.write(1); + out.close(); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java index 67127c36b2d..898fe635cb4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode; import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate; import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; @@ -404,7 +405,8 @@ public class TestRollingUpgrade { runCmd(dfsadmin, true, args2); // the datanode should be down. - Thread.sleep(2000); + GenericTestUtils.waitForThreadTermination( + "Async datanode shutdown thread", 100, 10000); Assert.assertFalse("DataNode should exit", dn.isDatanodeUp()); // ping should fail.