diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 022c59edbe8..5d2c577f1fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2873,6 +2873,8 @@ public class DataNode extends ReconfigurableBase /** Throttle to block replication when data transfers or writes. */ private DataTransferThrottler throttler; + private IOException lastException = null; + /** * Connect to the first item in the target list. Pass along the * entire target list, the block, and the data. @@ -2917,6 +2919,7 @@ public class DataNode extends ReconfigurableBase final boolean isClient = clientname.length() > 0; try { + DataNodeFaultInjector.get().failTransfer(id); final String dnAddr = targets[0].getXferAddr(connectToDnViaHostname); InetSocketAddress curTarget = NetUtils.createSocketAddr(dnAddr); LOG.debug("Connecting to datanode {}", dnAddr); @@ -2989,8 +2992,10 @@ public class DataNode extends ReconfigurableBase handleBadBlock(b, ie, false); LOG.warn("{}:Failed to transfer {} to {} got", bpReg, b, targets[0], ie); + lastException = ie; } catch (Throwable t) { LOG.error("Failed to transfer block {}", b, t); + lastException = new IOException("Failed to transfer block " + b, t); } finally { decrementXmitsInProgress(); IOUtils.closeStream(blockSender); @@ -3000,6 +3005,10 @@ public class DataNode extends ReconfigurableBase } } + public IOException getLastException() { + return this.lastException; + } + @Override public String toString() { return "DataTransfer " + b + " to " + Arrays.asList(targets); @@ -3507,6 +3516,9 @@ public class DataNode extends ReconfigurableBase throw new IOException("Pipeline recovery for " + b + " is interrupted.", e); } + if (dataTransferTask.getLastException() != null) { + throw dataTransferTask.getLastException(); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index e9a9c690163..16be71adc52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import java.io.FileNotFoundException; import java.io.IOException; @@ -95,6 +96,8 @@ public class DataNodeFaultInjector { public void noRegistration() throws IOException { } + public void failTransfer(DatanodeID sourceDNId) throws IOException { } + public void failMirrorConnection() throws IOException { } public void failPipeline(ReplicaInPipeline replicaInfo, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index 70aa9d7d635..928184f02c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -761,6 +762,131 @@ public class TestClientProtocolForPipelineRecovery { } } + + @Test + public void testPipelineRecoveryWithFailedTransferBlock() throws Exception { + final int chunkSize = 512; + final int oneWriteSize = 5000; + final int totalSize = 1024 * 1024; + final int errorInjectionPos = 512; + Configuration conf = new HdfsConfiguration(); + // Need 5 datanodes to verify the replaceDatanode during pipeline recovery + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(5).build(); + DataNodeFaultInjector old = DataNodeFaultInjector.get(); + + try { + DistributedFileSystem fs = cluster.getFileSystem(); + Path fileName = new Path("/f"); + FSDataOutputStream o = fs.create(fileName); + int count = 0; + // Flush to get the pipeline created. + o.writeBytes("hello"); + o.hflush(); + DFSOutputStream dfsO = (DFSOutputStream) o.getWrappedStream(); + final DatanodeInfo[] pipeline = dfsO.getStreamer().getNodes(); + final String firstDn = pipeline[0].getXferAddr(false); + final String secondDn = pipeline[1].getXferAddr(false); + final AtomicBoolean pipelineFailed = new AtomicBoolean(false); + final AtomicBoolean transferFailed = new AtomicBoolean(false); + + DataNodeFaultInjector.set(new DataNodeFaultInjector() { + @Override + public void failPipeline(ReplicaInPipeline replicaInfo, + String mirror) throws IOException { + if (!secondDn.equals(mirror)) { + // Only fail for first DN + return; + } + if (!pipelineFailed.get() && + (replicaInfo.getBytesAcked() > errorInjectionPos) && + (replicaInfo.getBytesAcked() % chunkSize != 0)) { + int count = 0; + while (count < 10) { + // Fail the pipeline (Throw exception) when: + // 1. bytsAcked is not at chunk boundary (checked in the if + // statement above) + // 2. bytesOnDisk is bigger than bytesAcked and at least + // reaches (or go beyond) the end of the chunk that + // bytesAcked is in (checked in the if statement below). + // At this condition, transferBlock that happens during + // pipeline recovery would transfer extra bytes to make up to the + // end of the chunk. And this is when the block corruption + // described in HDFS-4660 would occur. + if ((replicaInfo.getBytesOnDisk() / chunkSize) - + (replicaInfo.getBytesAcked() / chunkSize) >= 1) { + pipelineFailed.set(true); + throw new IOException( + "Failing Pipeline " + replicaInfo.getBytesAcked() + " : " + + replicaInfo.getBytesOnDisk()); + } + try { + Thread.sleep(200); + } catch (InterruptedException ignored) { + } + count++; + } + } + } + + @Override + public void failTransfer(DatanodeID sourceDNId) throws IOException { + if (sourceDNId.getXferAddr().equals(firstDn)) { + transferFailed.set(true); + throw new IOException( + "Failing Transfer from " + sourceDNId.getXferAddr()); + } + } + }); + + Random r = new Random(); + byte[] b = new byte[oneWriteSize]; + while (count < totalSize) { + r.nextBytes(b); + o.write(b); + count += oneWriteSize; + o.hflush(); + } + + assertTrue("Expected a failure in the pipeline", pipelineFailed.get()); + assertTrue("Expected a failure in the transfer block", transferFailed.get()); + DatanodeInfo[] newNodes = dfsO.getStreamer().getNodes(); + o.close(); + // Trigger block report to NN + for (DataNode d: cluster.getDataNodes()) { + DataNodeTestUtils.triggerBlockReport(d); + } + // Read from the replaced datanode to verify the corruption. So shutdown + // all other nodes in the pipeline. + List pipelineList = Arrays.asList(pipeline); + DatanodeInfo newNode = null; + for (DatanodeInfo node : newNodes) { + if (!pipelineList.contains(node)) { + newNode = node; + break; + } + } + assert newNode != null; + LOG.info("Number of nodes in pipeline: {} newNode {}", + newNodes.length, newNode.getName()); + // shutdown old 2 nodes + for (DatanodeInfo node : newNodes) { + if (node.getName().equals(newNode.getName())) { + continue; + } + LOG.info("shutdown {}", node.getName()); + cluster.stopDataNode(node.getName()); + } + + // Read should be successfull from only the newNode. There should not be + // any corruption reported. + DFSTestUtil.readFile(fs, fileName); + } finally { + DataNodeFaultInjector.set(old); + cluster.shutdown(); + } + } + @Test public void testUpdatePipeLineAfterDNReg()throws Exception { Configuration conf = new HdfsConfiguration();