HDFS-8070. Pre-HDFS-7915 DFSClient cannot use short circuit on post-HDFS-7915 DataNode (cmccabe)
(cherry picked from commita8898445dc
) (cherry picked from commit3d0385c3c8
)
This commit is contained in:
parent
c2e6d2589a
commit
176d22b440
|
@ -44,6 +44,9 @@ Release 2.7.1 - UNRELEASED
|
||||||
HDFS-8147. StorageGroup in Dispatcher should override equals nad hashCode.
|
HDFS-8147. StorageGroup in Dispatcher should override equals nad hashCode.
|
||||||
(surendra singh lilhore via szetszwo)
|
(surendra singh lilhore via szetszwo)
|
||||||
|
|
||||||
|
HDFS-8070. Pre-HDFS-7915 DFSClient cannot use short circuit on
|
||||||
|
post-HDFS-7915 DataNode (cmccabe)
|
||||||
|
|
||||||
Release 2.7.0 - 2015-04-20
|
Release 2.7.0 - 2015-04-20
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -75,6 +75,9 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
public void injectRequestFileDescriptorsFailure() throws IOException {
|
public void injectRequestFileDescriptorsFailure() throws IOException {
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
public boolean getSupportsReceiptVerification() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -532,7 +535,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
final DataOutputStream out =
|
final DataOutputStream out =
|
||||||
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
|
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
|
||||||
SlotId slotId = slot == null ? null : slot.getSlotId();
|
SlotId slotId = slot == null ? null : slot.getSlotId();
|
||||||
new Sender(out).requestShortCircuitFds(block, token, slotId, 1, true);
|
new Sender(out).requestShortCircuitFds(block, token, slotId, 1,
|
||||||
|
failureInjector.getSupportsReceiptVerification());
|
||||||
DataInputStream in = new DataInputStream(peer.getInputStream());
|
DataInputStream in = new DataInputStream(peer.getInputStream());
|
||||||
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
|
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
|
||||||
PBHelper.vintPrefixed(in));
|
PBHelper.vintPrefixed(in));
|
||||||
|
|
|
@ -186,7 +186,8 @@ public abstract class Receiver implements DataTransferProtocol {
|
||||||
try {
|
try {
|
||||||
requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
|
requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
|
||||||
PBHelper.convert(proto.getHeader().getToken()),
|
PBHelper.convert(proto.getHeader().getToken()),
|
||||||
slotId, proto.getMaxVersion(), true);
|
slotId, proto.getMaxVersion(),
|
||||||
|
proto.getSupportsReceiptVerification());
|
||||||
} finally {
|
} finally {
|
||||||
if (traceScope != null) traceScope.close();
|
if (traceScope != null) traceScope.close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -745,4 +745,47 @@ public class TestShortCircuitCache {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
sockDir.close();
|
sockDir.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class TestPreReceiptVerificationFailureInjector
|
||||||
|
extends BlockReaderFactory.FailureInjector {
|
||||||
|
@Override
|
||||||
|
public boolean getSupportsReceiptVerification() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Regression test for HDFS-8070
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception {
|
||||||
|
BlockReaderTestUtil.enableShortCircuitShmTracing();
|
||||||
|
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
||||||
|
Configuration conf = createShortCircuitConf(
|
||||||
|
"testPreReceiptVerificationDfsClientCanDoScr", sockDir);
|
||||||
|
conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
|
||||||
|
1000000000L);
|
||||||
|
MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
fs.getClient().getConf().brfFailureInjector =
|
||||||
|
new TestPreReceiptVerificationFailureInjector();
|
||||||
|
final Path TEST_PATH1 = new Path("/test_file1");
|
||||||
|
DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE2);
|
||||||
|
final Path TEST_PATH2 = new Path("/test_file2");
|
||||||
|
DFSTestUtil.createFile(fs, TEST_PATH2, 4096, (short)1, 0xFADE2);
|
||||||
|
DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
|
||||||
|
DFSTestUtil.readFileBuffer(fs, TEST_PATH2);
|
||||||
|
ShortCircuitRegistry registry =
|
||||||
|
cluster.getDataNodes().get(0).getShortCircuitRegistry();
|
||||||
|
registry.visit(new ShortCircuitRegistry.Visitor() {
|
||||||
|
@Override
|
||||||
|
public void accept(HashMap<ShmId, RegisteredShm> segments,
|
||||||
|
HashMultimap<ExtendedBlockId, Slot> slots) {
|
||||||
|
Assert.assertEquals(1, segments.size());
|
||||||
|
Assert.assertEquals(2, slots.size());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
cluster.shutdown();
|
||||||
|
sockDir.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue