diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index abbb1ba1710..5daf2ac07f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; @@ -348,9 +349,8 @@ class DataStreamer extends Daemon { /** Nodes have been used in the pipeline before and have failed. */ private final List failed = new ArrayList<>(); - /** The last ack sequence number before pipeline failure. */ - private long lastAckedSeqnoBeforeFailure = -1; - private int pipelineRecoveryCount = 0; + /** The times have retried to recover pipeline, for the same packet. */ + private volatile int pipelineRecoveryCount = 0; /** Has the current block been hflushed? */ private boolean isHflushed = false; /** Append on an existing block? */ @@ -1013,6 +1013,7 @@ class DataStreamer extends Daemon { one.setTraceScope(null); } lastAckedSeqno = seqno; + pipelineRecoveryCount = 0; ackQueue.removeFirst(); dataQueue.notifyAll(); @@ -1069,22 +1070,16 @@ class DataStreamer extends Daemon { ackQueue.clear(); } - // Record the new pipeline failure recovery. - if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) { - lastAckedSeqnoBeforeFailure = lastAckedSeqno; - pipelineRecoveryCount = 1; - } else { - // If we had to recover the pipeline five times in a row for the - // same packet, this client likely has corrupt data or corrupting - // during transmission. - if (++pipelineRecoveryCount > 5) { - LOG.warn("Error recovering pipeline for writing " + - block + ". Already retried 5 times for the same packet."); - lastException.set(new IOException("Failing write. Tried pipeline " + - "recovery 5 times without success.")); - streamerClosed = true; - return false; - } + // If we had to recover the pipeline five times in a row for the + // same packet, this client likely has corrupt data or corrupting + // during transmission. + if (!errorState.isRestartingNode() && ++pipelineRecoveryCount > 5) { + LOG.warn("Error recovering pipeline for writing " + + block + ". Already retried 5 times for the same packet."); + lastException.set(new IOException("Failing write. Tried pipeline " + + "recovery 5 times without success.")); + streamerClosed = true; + return false; } boolean doSleep = setupPipelineForAppendOrRecovery(); @@ -1111,6 +1106,7 @@ class DataStreamer extends Daemon { assert endOfBlockPacket.isLastPacketInBlock(); assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1; lastAckedSeqno = endOfBlockPacket.getSeqno(); + pipelineRecoveryCount = 0; dataQueue.notifyAll(); } endBlock(); @@ -1907,6 +1903,14 @@ class DataStreamer extends Daemon { return streamerClosed; } + /** + * @return The times have retried to recover pipeline, for the same packet. + */ + @VisibleForTesting + int getPipelineRecoveryCount() { + return pipelineRecoveryCount; + } + void closeSocket() throws IOException { if (s != null) { s.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 9aec98fd0b2..b3283d9352a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1808,6 +1808,9 @@ Release 2.7.3 - UNRELEASED HDFS-9724. Degraded performance in WebHDFS listing as it does not reuse ObjectMapper. (Akira AJISAKA via wheat9) + HDFS-9752. Permanent write failures may happen to slow writers during + datanode rolling upgrades (Walter Su via kihwal) + Release 2.7.2 - 2016-01-25 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 04a986f055d..4f94da491a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2897,7 +2897,7 @@ public class DataNode extends ReconfigurableBase // Asynchronously start the shutdown process so that the rpc response can be // sent back. - Thread shutdownThread = new Thread() { + Thread shutdownThread = new Thread("Async datanode shutdown thread") { @Override public void run() { if (!shutdownForUpgrade) { // Delay the shutdown a bit if not doing for restart. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 67846508153..fa87004079c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2077,6 +2077,28 @@ public class MiniDFSCluster { return stopDataNode(node); } + /* + * Shutdown a particular datanode + * @param i node index + * @return null if the node index is out of range, else the properties of the + * removed node + */ + public synchronized DataNodeProperties stopDataNodeForUpgrade(int i) + throws IOException { + if (i < 0 || i >= dataNodes.size()) { + return null; + } + DataNodeProperties dnprop = dataNodes.remove(i); + DataNode dn = dnprop.datanode; + LOG.info("MiniDFSCluster Stopping DataNode " + + dn.getDisplayName() + + " from a total of " + (dataNodes.size() + 1) + + " datanodes."); + dn.shutdownDatanode(true); + numDataNodes--; + return dnprop; + } + /** * Restart a datanode * @param dnprop datanode's property diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index 22009fd54d6..d7aa79af3aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs; import java.io.IOException; +import com.google.common.base.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -34,6 +35,7 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -256,7 +258,8 @@ public class TestClientProtocolForPipelineRecovery { final String[] args1 = {"-shutdownDatanode", dnAddr, "upgrade" }; Assert.assertEquals(0, dfsadmin.run(args1)); // Wait long enough to receive an OOB ack before closing the file. - Thread.sleep(4000); + GenericTestUtils.waitForThreadTermination( + "Async datanode shutdown thread", 100, 10000); // Retart the datanode cluster.restartDataNode(0, true); // The following forces a data packet and end of block packets to be sent. @@ -293,7 +296,8 @@ public class TestClientProtocolForPipelineRecovery { // issue shutdown to the datanode. final String[] args1 = {"-shutdownDatanode", dnAddr1, "upgrade" }; Assert.assertEquals(0, dfsadmin.run(args1)); - Thread.sleep(4000); + GenericTestUtils.waitForThreadTermination( + "Async datanode shutdown thread", 100, 10000); // This should succeed without restarting the node. The restart will // expire and regular pipeline recovery will kick in. out.close(); @@ -309,7 +313,8 @@ public class TestClientProtocolForPipelineRecovery { // issue shutdown to the datanode. final String[] args2 = {"-shutdownDatanode", dnAddr2, "upgrade" }; Assert.assertEquals(0, dfsadmin.run(args2)); - Thread.sleep(4000); + GenericTestUtils.waitForThreadTermination( + "Async datanode shutdown thread", 100, 10000); try { // close should fail out.close(); @@ -321,4 +326,53 @@ public class TestClientProtocolForPipelineRecovery { } } } + + /** + * HDFS-9752. The client keeps sending heartbeat packets during datanode + * rolling upgrades. The client should be able to retry pipeline recovery + * more times than the default. + * (in a row for the same packet, including the heartbeat packet) + * (See{@link DataStreamer#pipelineRecoveryCount}) + */ + @Test(timeout = 60000) + public void testPipelineRecoveryOnDatanodeUpgrade() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitActive(); + FileSystem fileSys = cluster.getFileSystem(); + + Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade"); + DFSTestUtil.createFile(fileSys, file, 10240L, (short) 2, 0L); + final DFSOutputStream out = (DFSOutputStream) (fileSys.append(file). + getWrappedStream()); + out.write(1); + out.hflush(); + + final long oldGs = out.getBlock().getGenerationStamp(); + MiniDFSCluster.DataNodeProperties dnProps = + cluster.stopDataNodeForUpgrade(0); + GenericTestUtils.waitForThreadTermination( + "Async datanode shutdown thread", 100, 10000); + cluster.restartDataNode(dnProps, true); + cluster.waitActive(); + + // wait pipeline to be recovered + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return out.getBlock().getGenerationStamp() > oldGs; + } + }, 100, 10000); + Assert.assertEquals("The pipeline recovery count shouldn't increase", + 0, out.getStreamer().getPipelineRecoveryCount()); + out.write(1); + out.close(); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java index bf1b42aafb7..ef9941195ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode; import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate; import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; @@ -407,7 +408,8 @@ public class TestRollingUpgrade { runCmd(dfsadmin, true, args2); // the datanode should be down. - Thread.sleep(2000); + GenericTestUtils.waitForThreadTermination( + "Async datanode shutdown thread", 100, 10000); Assert.assertFalse("DataNode should exit", dn.isDatanodeUp()); // ping should fail.