diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 2e4ee028473..fb0c1c5b1c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -574,6 +574,8 @@ class BlockReceiver implements Closeable { if (mirrorOut != null && !mirrorError) { try { long begin = Time.monotonicNow(); + // For testing. Normally no-op. + DataNodeFaultInjector.get().stopSendingPacketDownstream(); packetReceiver.mirrorPacketTo(mirrorOut); mirrorOut.flush(); long now = Time.monotonicNow(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 773a64c9a67..398935de451 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -300,11 +300,15 @@ class BlockSender implements java.io.Closeable { // The meta file will contain only the header if the NULL checksum // type was used, or if the replica was written to transient storage. + // Also, when only header portion of a data packet was transferred + // and then pipeline breaks, the meta file can contain only the + // header and 0 byte in the block data file. // Checksum verification is not performed for replicas on transient // storage. The header is important for determining the checksum // type later when lazy persistence copies the block to non-transient // storage and computes the checksum. - if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()) { + if (!replica.isOnTransientStorage() && + metaIn.getLength() >= BlockMetadataHeader.getHeaderSize()) { checksumIn = new DataInputStream(new BufferedInputStream( metaIn, IO_FILE_BUFFER_SIZE)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 0e3869429dd..732742098a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -50,5 +50,7 @@ public class DataNodeFaultInjector { return false; } + public void stopSendingPacketDownstream() throws IOException {} + public void noRegistration() throws IOException { } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index d7aa79af3aa..0eeb3b78edf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -375,4 +375,57 @@ public class TestClientProtocolForPipelineRecovery { } } } + + /** + * Test to make sure the checksum is set correctly after pipeline + * recovery transfers 0 byte partial block. If fails the test case + * will say "java.io.IOException: Failed to replace a bad datanode + * on the existing pipeline due to no more good datanodes being + * available to try." This indicates there was a real failure + * after the staged failure. + */ + @Test + public void testZeroByteBlockRecovery() throws Exception { + // Make the first datanode fail once. With 3 nodes and a block being + // created with 2 replicas, anything more than this planned failure + // will cause a test failure. + DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() { + int tries = 1; + @Override + public void stopSendingPacketDownstream() throws IOException { + if (tries > 0) { + tries--; + try { + Thread.sleep(60000); + } catch (InterruptedException ie) { + throw new IOException("Interrupted while sleeping. Bailing out."); + } + } + } + }; + DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get(); + DataNodeFaultInjector.set(dnFaultInjector); + + Configuration conf = new HdfsConfiguration(); + conf.set(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, "1000"); + conf.set(HdfsClientConfigKeys. + BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY, "ALWAYS"); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + + FileSystem fs = cluster.getFileSystem(); + FSDataOutputStream out = fs.create(new Path("noheartbeat.dat"), (short)2); + out.write(0x31); + out.hflush(); + out.close(); + + } finally { + if (cluster != null) { + cluster.shutdown(); + } + DataNodeFaultInjector.set(oldDnInjector); + } + } }