From 193d27de0a5d23a61cabd41162ebc3292d8526d1 Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Mon, 8 Feb 2016 12:16:05 -0600 Subject: [PATCH] HDFS-9752. Permanent write failures may happen to slow writers during datanode rolling upgrades. Contributed by Walter Su. --- .../org/apache/hadoop/hdfs/DataStreamer.java | 42 +++++++------ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/server/datanode/DataNode.java | 2 +- .../apache/hadoop/hdfs/MiniDFSCluster.java | 22 +++++++ ...TestClientProtocolForPipelineRecovery.java | 60 ++++++++++++++++++- .../hadoop/hdfs/TestRollingUpgrade.java | 4 +- 6 files changed, 109 insertions(+), 24 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index bac4d121c38..e3843ded539 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; @@ -368,9 +369,8 @@ synchronized void checkRestartingNodeDeadline(DatanodeInfo[] nodes) { /** Nodes have been used in the pipeline before and have failed. */ private final List failed = new ArrayList<>(); - /** The last ack sequence number before pipeline failure. */ - private long lastAckedSeqnoBeforeFailure = -1; - private int pipelineRecoveryCount = 0; + /** The times have retried to recover pipeline, for the same packet. */ + private volatile int pipelineRecoveryCount = 0; /** Has the current block been hflushed? */ private boolean isHflushed = false; /** Append on an existing block? */ @@ -1040,6 +1040,7 @@ public void run() { one.setTraceScope(null); } lastAckedSeqno = seqno; + pipelineRecoveryCount = 0; ackQueue.removeFirst(); dataQueue.notifyAll(); @@ -1101,22 +1102,16 @@ private boolean processDatanodeOrExternalError() throws IOException { ackQueue.clear(); } - // Record the new pipeline failure recovery. - if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) { - lastAckedSeqnoBeforeFailure = lastAckedSeqno; - pipelineRecoveryCount = 1; - } else { - // If we had to recover the pipeline five times in a row for the - // same packet, this client likely has corrupt data or corrupting - // during transmission. - if (++pipelineRecoveryCount > 5) { - LOG.warn("Error recovering pipeline for writing " + - block + ". Already retried 5 times for the same packet."); - lastException.set(new IOException("Failing write. Tried pipeline " + - "recovery 5 times without success.")); - streamerClosed = true; - return false; - } + // If we had to recover the pipeline five times in a row for the + // same packet, this client likely has corrupt data or corrupting + // during transmission. + if (!errorState.isRestartingNode() && ++pipelineRecoveryCount > 5) { + LOG.warn("Error recovering pipeline for writing " + + block + ". Already retried 5 times for the same packet."); + lastException.set(new IOException("Failing write. Tried pipeline " + + "recovery 5 times without success.")); + streamerClosed = true; + return false; } setupPipelineForAppendOrRecovery(); @@ -1144,6 +1139,7 @@ private boolean processDatanodeOrExternalError() throws IOException { assert endOfBlockPacket.isLastPacketInBlock(); assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1; lastAckedSeqno = endOfBlockPacket.getSeqno(); + pipelineRecoveryCount = 0; dataQueue.notifyAll(); } endBlock(); @@ -1914,6 +1910,14 @@ boolean streamerClosed(){ return streamerClosed; } + /** + * @return The times have retried to recover pipeline, for the same packet. + */ + @VisibleForTesting + int getPipelineRecoveryCount() { + return pipelineRecoveryCount; + } + void closeSocket() throws IOException { if (s != null) { s.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6c448eb74e1..07f01ff1f4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -2789,6 +2789,9 @@ Release 2.7.3 - UNRELEASED HDFS-9724. Degraded performance in WebHDFS listing as it does not reuse ObjectMapper. (Akira AJISAKA via wheat9) + HDFS-9752. Permanent write failures may happen to slow writers during + datanode rolling upgrades (Walter Su via kihwal) + Release 2.7.2 - 2016-01-25 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index b2425bfe750..480465ca8cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2921,7 +2921,7 @@ public synchronized void shutdownDatanode(boolean forUpgrade) throws IOException // Asynchronously start the shutdown process so that the rpc response can be // sent back. - Thread shutdownThread = new Thread() { + Thread shutdownThread = new Thread("Async datanode shutdown thread") { @Override public void run() { if (!shutdownForUpgrade) { // Delay the shutdown a bit if not doing for restart. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 63561fe6903..2eff12eda2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2187,6 +2187,28 @@ public synchronized DataNodeProperties stopDataNode(String dnName) { return stopDataNode(node); } + /* + * Shutdown a particular datanode + * @param i node index + * @return null if the node index is out of range, else the properties of the + * removed node + */ + public synchronized DataNodeProperties stopDataNodeForUpgrade(int i) + throws IOException { + if (i < 0 || i >= dataNodes.size()) { + return null; + } + DataNodeProperties dnprop = dataNodes.remove(i); + DataNode dn = dnprop.datanode; + LOG.info("MiniDFSCluster Stopping DataNode " + + dn.getDisplayName() + + " from a total of " + (dataNodes.size() + 1) + + " datanodes."); + dn.shutdownDatanode(true); + numDataNodes--; + return dnprop; + } + /** * Restart a datanode * @param dnprop datanode's property diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index 22009fd54d6..d7aa79af3aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -19,6 +19,7 @@ import java.io.IOException; +import com.google.common.base.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -34,6 +35,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -256,7 +258,8 @@ public void testPipelineRecoveryOnOOB() throws Exception { final String[] args1 = {"-shutdownDatanode", dnAddr, "upgrade" }; Assert.assertEquals(0, dfsadmin.run(args1)); // Wait long enough to receive an OOB ack before closing the file. - Thread.sleep(4000); + GenericTestUtils.waitForThreadTermination( + "Async datanode shutdown thread", 100, 10000); // Retart the datanode cluster.restartDataNode(0, true); // The following forces a data packet and end of block packets to be sent. @@ -293,7 +296,8 @@ public void testPipelineRecoveryOnRestartFailure() throws Exception { // issue shutdown to the datanode. final String[] args1 = {"-shutdownDatanode", dnAddr1, "upgrade" }; Assert.assertEquals(0, dfsadmin.run(args1)); - Thread.sleep(4000); + GenericTestUtils.waitForThreadTermination( + "Async datanode shutdown thread", 100, 10000); // This should succeed without restarting the node. The restart will // expire and regular pipeline recovery will kick in. out.close(); @@ -309,7 +313,8 @@ public void testPipelineRecoveryOnRestartFailure() throws Exception { // issue shutdown to the datanode. final String[] args2 = {"-shutdownDatanode", dnAddr2, "upgrade" }; Assert.assertEquals(0, dfsadmin.run(args2)); - Thread.sleep(4000); + GenericTestUtils.waitForThreadTermination( + "Async datanode shutdown thread", 100, 10000); try { // close should fail out.close(); @@ -321,4 +326,53 @@ public void testPipelineRecoveryOnRestartFailure() throws Exception { } } } + + /** + * HDFS-9752. The client keeps sending heartbeat packets during datanode + * rolling upgrades. The client should be able to retry pipeline recovery + * more times than the default. + * (in a row for the same packet, including the heartbeat packet) + * (See{@link DataStreamer#pipelineRecoveryCount}) + */ + @Test(timeout = 60000) + public void testPipelineRecoveryOnDatanodeUpgrade() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitActive(); + FileSystem fileSys = cluster.getFileSystem(); + + Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade"); + DFSTestUtil.createFile(fileSys, file, 10240L, (short) 2, 0L); + final DFSOutputStream out = (DFSOutputStream) (fileSys.append(file). + getWrappedStream()); + out.write(1); + out.hflush(); + + final long oldGs = out.getBlock().getGenerationStamp(); + MiniDFSCluster.DataNodeProperties dnProps = + cluster.stopDataNodeForUpgrade(0); + GenericTestUtils.waitForThreadTermination( + "Async datanode shutdown thread", 100, 10000); + cluster.restartDataNode(dnProps, true); + cluster.waitActive(); + + // wait pipeline to be recovered + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return out.getBlock().getGenerationStamp() > oldGs; + } + }, 100, 10000); + Assert.assertEquals("The pipeline recovery count shouldn't increase", + 0, out.getStreamer().getPipelineRecoveryCount()); + out.write(1); + out.close(); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java index b3279ed5812..b356fb945d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate; import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; @@ -407,7 +408,8 @@ public void testDFSAdminDatanodeUpgradeControlCommands() throws Exception { runCmd(dfsadmin, true, args2); // the datanode should be down. - Thread.sleep(2000); + GenericTestUtils.waitForThreadTermination( + "Async datanode shutdown thread", 100, 10000); Assert.assertFalse("DataNode should exit", dn.isDatanodeUp()); // ping should fail.