diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 8c3cfe1da9c..b0a0a50fbb8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -589,6 +589,9 @@ Release 2.7.1 - UNRELEASED HDFS-8147. StorageGroup in Dispatcher should override equals nad hashCode. (surendra singh lilhore via szetszwo) + HDFS-8070. Pre-HDFS-7915 DFSClient cannot use short circuit on + post-HDFS-7915 DataNode (cmccabe) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index 5175a873bbb..714cd68817c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -77,6 +77,9 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { public void injectRequestFileDescriptorsFailure() throws IOException { // do nothing } + public boolean getSupportsReceiptVerification() { + return true; + } } @VisibleForTesting @@ -533,7 +536,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); SlotId slotId = slot == null ? null : slot.getSlotId(); - new Sender(out).requestShortCircuitFds(block, token, slotId, 1, true); + new Sender(out).requestShortCircuitFds(block, token, slotId, 1, + failureInjector.getSupportsReceiptVerification()); DataInputStream in = new DataInputStream(peer.getInputStream()); BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( PBHelper.vintPrefixed(in)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index 31bdc5e2a52..a6fbb29ece2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -186,7 +186,8 @@ public abstract class Receiver implements DataTransferProtocol { try { requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()), PBHelper.convert(proto.getHeader().getToken()), - slotId, proto.getMaxVersion(), true); + slotId, proto.getMaxVersion(), + proto.getSupportsReceiptVerification()); } finally { if (traceScope != null) traceScope.close(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index f3c73f2fb03..20c54fe8e52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -743,4 +743,48 @@ public class TestShortCircuitCache { cluster.shutdown(); sockDir.close(); } + + public static class TestPreReceiptVerificationFailureInjector + extends BlockReaderFactory.FailureInjector { + @Override + public boolean getSupportsReceiptVerification() { + return false; + } + } + + // Regression test for HDFS-8070 + @Test(timeout=60000) + public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception { + BlockReaderTestUtil.enableShortCircuitShmTracing(); + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration conf = createShortCircuitConf( + "testPreReceiptVerificationDfsClientCanDoScr", sockDir); + conf.setLong( + HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY, + 1000000000L); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + fs.getClient().getConf().getShortCircuitConf().brfFailureInjector = + new TestPreReceiptVerificationFailureInjector(); + final Path TEST_PATH1 = new Path("/test_file1"); + DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE2); + final Path TEST_PATH2 = new Path("/test_file2"); + DFSTestUtil.createFile(fs, TEST_PATH2, 4096, (short)1, 0xFADE2); + DFSTestUtil.readFileBuffer(fs, TEST_PATH1); + DFSTestUtil.readFileBuffer(fs, TEST_PATH2); + ShortCircuitRegistry registry = + cluster.getDataNodes().get(0).getShortCircuitRegistry(); + registry.visit(new ShortCircuitRegistry.Visitor() { + @Override + public void accept(HashMap segments, + HashMultimap slots) { + Assert.assertEquals(1, segments.size()); + Assert.assertEquals(2, slots.size()); + } + }); + cluster.shutdown(); + sockDir.close(); + } }