diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index b6f0b011464..522d577fd70 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -1306,6 +1306,7 @@ class BlockReceiver implements Closeable { long ackRecvNanoTime = 0; try { if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) { + DataNodeFaultInjector.get().failPipeline(replicaInfo, mirrorAddr); // read an ack from downstream datanode ack.readFields(downstreamIn); ackRecvNanoTime = System.nanoTime(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index ad3c1724d5c..08a1fc0cbd2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2403,7 +2403,8 @@ public class DataNode extends ReconfigurableBase blockSender.sendBlock(out, unbufOut, null); // no response necessary - LOG.info(getClass().getSimpleName() + ": Transmitted " + b + LOG.info(getClass().getSimpleName() + ", at " + + DataNode.this.getDisplayName() + ": Transmitted " + b + " (numBytes=" + b.getNumBytes() + ") to " + curTarget); // read ack diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 4ecbdc0207a..931c1241f56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -55,4 +55,7 @@ public class DataNodeFaultInjector { public void noRegistration() throws IOException { } public void failMirrorConnection() throws IOException { } + + public void failPipeline(ReplicaInPipelineInterface replicaInfo, + String mirrorAddr) throws IOException { } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index c4d924e8dcf..129024b8d1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1505,7 +1505,7 @@ class FsDatasetImpl implements FsDatasetSpi { if (!rbw.attemptToSetWriter(null, Thread.currentThread())) { throw new MustStopExistingWriter(rbw); } - LOG.info("Recovering " + rbw); + LOG.info("At " + datanode.getDisplayName() + ", Recovering " + rbw); return recoverRbwImpl(rbw, b, newGS, minBytesRcvd, maxBytesRcvd); } } catch (MustStopExistingWriter e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index 5e320fa29a0..99b617eb821 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -17,9 +17,16 @@ */ package org.apache.hadoop.hdfs; +import static org.junit.Assert.assertTrue; + import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.base.Supplier; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -31,6 +38,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.tools.DFSAdmin; @@ -39,12 +48,15 @@ import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This tests pipeline recovery related client protocol works correct or not. */ public class TestClientProtocolForPipelineRecovery { - + private static final Logger LOG = + LoggerFactory.getLogger(TestClientProtocolForPipelineRecovery.class); @Test public void testGetNewStamp() throws IOException { int numDataNodes = 1; Configuration conf = new HdfsConfiguration(); @@ -477,4 +489,128 @@ public class TestClientProtocolForPipelineRecovery { DataNodeFaultInjector.set(oldDnInjector); } } + + // Test to verify that blocks are no longer corrupted after HDFS-4660. + // Revert HDFS-4660 and the other related ones (HDFS-9220, HDFS-8722), this + // test would fail. + // Scenario: Prior to the fix, block get corrupted when the transferBlock + // happens during pipeline recovery with extra bytes to make up the end of + // chunk. + // For verification, Need to fail the pipeline for last datanode when the + // second datanode have more bytes on disk than already acked bytes. + // This will enable to transfer extra bytes to the newNode to makeup + // end-of-chunk during pipeline recovery. This is achieved by the customized + // DataNodeFaultInjector class in this test. + // For detailed info, please refer to HDFS-4660 and HDFS-10587. HDFS-9220 + // fixes an issue in HDFS-4660 patch, and HDFS-8722 is an optimization. + @Test + public void testPipelineRecoveryWithTransferBlock() throws Exception { + final int chunkSize = 512; + final int oneWriteSize = 5000; + final int totalSize = 1024 * 1024; + final int errorInjectionPos = 512; + Configuration conf = new HdfsConfiguration(); + // Need 4 datanodes to verify the replaceDatanode during pipeline recovery + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); + DataNodeFaultInjector old = DataNodeFaultInjector.get(); + + try { + DistributedFileSystem fs = cluster.getFileSystem(); + Path fileName = new Path("/f"); + FSDataOutputStream o = fs.create(fileName); + int count = 0; + // Flush to get the pipeline created. + o.writeBytes("hello"); + o.hflush(); + DFSOutputStream dfsO = (DFSOutputStream) o.getWrappedStream(); + final DatanodeInfo[] pipeline = dfsO.getStreamer().getNodes(); + final String lastDn = pipeline[2].getXferAddr(false); + final AtomicBoolean failed = new AtomicBoolean(false); + + DataNodeFaultInjector.set(new DataNodeFaultInjector() { + @Override + public void failPipeline(ReplicaInPipelineInterface replicaInfo, + String mirror) throws IOException { + if (!lastDn.equals(mirror)) { + // Only fail for second DN + return; + } + if (!failed.get() && + (replicaInfo.getBytesAcked() > errorInjectionPos) && + (replicaInfo.getBytesAcked() % chunkSize != 0)) { + int count = 0; + while (count < 10) { + // Fail the pipeline (Throw exception) when: + // 1. bytsAcked is not at chunk boundary (checked in the if + // statement above) + // 2. bytesOnDisk is bigger than bytesAcked and at least + // reaches (or go beyond) the end of the chunk that + // bytesAcked is in (checked in the if statement below). + // At this condition, transferBlock that happens during + // pipeline recovery would transfer extra bytes to make up to the + // end of the chunk. And this is when the block corruption + // described in HDFS-4660 would occur. + if ((replicaInfo.getBytesOnDisk() / chunkSize) - + (replicaInfo.getBytesAcked() / chunkSize) >= 1) { + failed.set(true); + throw new IOException( + "Failing Pipeline " + replicaInfo.getBytesAcked() + " : " + + replicaInfo.getBytesOnDisk()); + } + try { + Thread.sleep(200); + } catch (InterruptedException e) { + } + count++; + } + } + } + }); + + Random r = new Random(); + byte[] b = new byte[oneWriteSize]; + while (count < totalSize) { + r.nextBytes(b); + o.write(b); + count += oneWriteSize; + o.hflush(); + } + + assertTrue("Expected a failure in the pipeline", failed.get()); + DatanodeInfo[] newNodes = dfsO.getStreamer().getNodes(); + o.close(); + // Trigger block report to NN + for (DataNode d: cluster.getDataNodes()) { + DataNodeTestUtils.triggerBlockReport(d); + } + // Read from the replaced datanode to verify the corruption. So shutdown + // all other nodes in the pipeline. + List pipelineList = Arrays.asList(pipeline); + DatanodeInfo newNode = null; + for (DatanodeInfo node : newNodes) { + if (!pipelineList.contains(node)) { + newNode = node; + break; + } + } + LOG.info("Number of nodes in pipeline: {} newNode {}", + newNodes.length, newNode.getName()); + // shutdown old 2 nodes + for (int i = 0; i < newNodes.length; i++) { + if (newNodes[i].getName().equals(newNode.getName())) { + continue; + } + LOG.info("shutdown {}", newNodes[i].getName()); + cluster.stopDataNode(newNodes[i].getName()); + } + + // Read should be successfull from only the newNode. There should not be + // any corruption reported. + DFSTestUtil.readFile(fs, fileName); + } finally { + DataNodeFaultInjector.set(old); + cluster.shutdown(); + } + } }