From 32741cf3d25d85a92e3deb11c302cc2a718d71dd Mon Sep 17 00:00:00 2001 From: Colin Patrick Mccabe Date: Fri, 13 Mar 2015 18:40:20 -0700 Subject: [PATCH] Revert "HDFS-7915. The DataNode can sometimes allocate a ShortCircuitShm slot and fail to tell the DFSClient about it because of a network error (cmccabe)" (jenkins didn't run yet) This reverts commit 5aa892ed486d42ae6b94c4866b92cd2b382ea640. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 - .../hadoop/hdfs/BlockReaderFactory.java | 23 +---- .../org/apache/hadoop/hdfs/DFSClient.java | 2 - .../datatransfer/DataTransferProtocol.java | 5 +- .../hdfs/protocol/datatransfer/Receiver.java | 2 +- .../hdfs/protocol/datatransfer/Sender.java | 4 +- .../hdfs/server/datanode/DataXceiver.java | 99 ++++++++----------- .../server/datanode/ShortCircuitRegistry.java | 13 +-- .../src/main/proto/datatransfer.proto | 11 --- .../shortcircuit/TestShortCircuitCache.java | 63 ------------ 10 files changed, 45 insertions(+), 180 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ff00b0c86c2..c3f9367008f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1177,9 +1177,6 @@ Release 2.7.0 - UNRELEASED HDFS-7722. DataNode#checkDiskError should also remove Storage when error is found. (Lei Xu via Colin P. McCabe) - HDFS-7915. The DataNode can sometimes allocate a ShortCircuitShm slot and - fail to tell the DFSClient about it because of a network error (cmccabe) - Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index 1e915b28122..ba48c797158 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hdfs; -import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION; - import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -71,23 +69,12 @@ import com.google.common.base.Preconditions; public class BlockReaderFactory implements ShortCircuitReplicaCreator { static final Log LOG = LogFactory.getLog(BlockReaderFactory.class); - public static class FailureInjector { - public void injectRequestFileDescriptorsFailure() throws IOException { - // do nothing - } - } - @VisibleForTesting static ShortCircuitReplicaCreator createShortCircuitReplicaInfoCallback = null; private final DFSClient.Conf conf; - /** - * Injects failures into specific operations during unit tests. - */ - private final FailureInjector failureInjector; - /** * The file name, for logging and debugging purposes. */ @@ -182,7 +169,6 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { public BlockReaderFactory(DFSClient.Conf conf) { this.conf = conf; - this.failureInjector = conf.brfFailureInjector; this.remainingCacheTries = conf.nCachedConnRetry; } @@ -532,12 +518,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); SlotId slotId = slot == null ? null : slot.getSlotId(); - new Sender(out).requestShortCircuitFds(block, token, slotId, 1, true); + new Sender(out).requestShortCircuitFds(block, token, slotId, 1); DataInputStream in = new DataInputStream(peer.getInputStream()); BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( PBHelper.vintPrefixed(in)); DomainSocket sock = peer.getDomainSocket(); - failureInjector.injectRequestFileDescriptorsFailure(); switch (resp.getStatus()) { case SUCCESS: byte buf[] = new byte[1]; @@ -547,13 +532,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { try { ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); - if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) { - LOG.trace("Sending receipt verification byte for slot " + slot); - sock.getOutputStream().write(0); - } replica = new ShortCircuitReplica(key, fis[0], fis[1], cache, Time.monotonicNow(), slot); - return new ShortCircuitReplicaInfo(replica); } catch (IOException e) { // This indicates an error reading from disk, or a format error. Since // it's not a socket communication problem, we return null rather than @@ -565,6 +545,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]); } } + return new ShortCircuitReplicaInfo(replica); case ERROR_UNSUPPORTED: if (!resp.hasShortCircuitAccessVersion()) { LOG.warn("short-circuit read access is disabled for " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index f970fef4722..aac7b511433 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -337,8 +337,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, final long shortCircuitCacheStaleThresholdMs; final long keyProviderCacheExpiryMs; - public BlockReaderFactory.FailureInjector brfFailureInjector = - new BlockReaderFactory.FailureInjector(); public Conf(Configuration conf) { // The hdfsTimeout is currently the same as the ipc timeout diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index 48e931d741a..4be42a8d358 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -138,13 +138,10 @@ public interface DataTransferProtocol { * to use no slot id. * @param maxVersion Maximum version of the block data the client * can understand. - * @param supportsReceiptVerification True if the client supports - * receipt verification. */ public void requestShortCircuitFds(final ExtendedBlock blk, final Token blockToken, - SlotId slotId, int maxVersion, boolean supportsReceiptVerification) - throws IOException; + SlotId slotId, int maxVersion) throws IOException; /** * Release a pair of short-circuit FDs requested earlier. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index 31bdc5e2a52..7994027c6ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -186,7 +186,7 @@ public abstract class Receiver implements DataTransferProtocol { try { requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()), PBHelper.convert(proto.getHeader().getToken()), - slotId, proto.getMaxVersion(), true); + slotId, proto.getMaxVersion()); } finally { if (traceScope != null) traceScope.close(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index df69125882b..7fea33efc59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -181,8 +181,7 @@ public class Sender implements DataTransferProtocol { @Override public void requestShortCircuitFds(final ExtendedBlock blk, final Token blockToken, - SlotId slotId, int maxVersion, boolean supportsReceiptVerification) - throws IOException { + SlotId slotId, int maxVersion) throws IOException { OpRequestShortCircuitAccessProto.Builder builder = OpRequestShortCircuitAccessProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader( @@ -190,7 +189,6 @@ public class Sender implements DataTransferProtocol { if (slotId != null) { builder.setSlotId(PBHelper.convert(slotId)); } - builder.setSupportsReceiptVerification(supportsReceiptVerification); OpRequestShortCircuitAccessProto proto = builder.build(); send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 84504fb5993..e9547a84e2e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -22,8 +22,6 @@ import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ER import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS; -import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION; -import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION; import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT; import static org.apache.hadoop.util.Time.now; @@ -293,83 +291,64 @@ class DataXceiver extends Receiver implements Runnable { @Override public void requestShortCircuitFds(final ExtendedBlock blk, final Token token, - SlotId slotId, int maxVersion, boolean supportsReceiptVerification) - throws IOException { + SlotId slotId, int maxVersion) throws IOException { updateCurrentThreadName("Passing file descriptors for block " + blk); BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder(); FileInputStream fis[] = null; - SlotId registeredSlotId = null; - boolean success = false; try { - try { - if (peer.getDomainSocket() == null) { - throw new IOException("You cannot pass file descriptors over " + - "anything but a UNIX domain socket."); - } - if (slotId != null) { - boolean isCached = datanode.data. - isCached(blk.getBlockPoolId(), blk.getBlockId()); - datanode.shortCircuitRegistry.registerSlot( - ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached); - registeredSlotId = slotId; - } - fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion); - Preconditions.checkState(fis != null); - bld.setStatus(SUCCESS); - bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION); - } catch (ShortCircuitFdsVersionException e) { - bld.setStatus(ERROR_UNSUPPORTED); - bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION); - bld.setMessage(e.getMessage()); - } catch (ShortCircuitFdsUnsupportedException e) { - bld.setStatus(ERROR_UNSUPPORTED); - bld.setMessage(e.getMessage()); - } catch (InvalidToken e) { - bld.setStatus(ERROR_ACCESS_TOKEN); - bld.setMessage(e.getMessage()); - } catch (IOException e) { - bld.setStatus(ERROR); - bld.setMessage(e.getMessage()); + if (peer.getDomainSocket() == null) { + throw new IOException("You cannot pass file descriptors over " + + "anything but a UNIX domain socket."); } + if (slotId != null) { + boolean isCached = datanode.data. + isCached(blk.getBlockPoolId(), blk.getBlockId()); + datanode.shortCircuitRegistry.registerSlot( + ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached); + } + try { + fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion); + } finally { + if ((fis == null) && (slotId != null)) { + datanode.shortCircuitRegistry.unregisterSlot(slotId); + } + } + bld.setStatus(SUCCESS); + bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION); + } catch (ShortCircuitFdsVersionException e) { + bld.setStatus(ERROR_UNSUPPORTED); + bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION); + bld.setMessage(e.getMessage()); + } catch (ShortCircuitFdsUnsupportedException e) { + bld.setStatus(ERROR_UNSUPPORTED); + bld.setMessage(e.getMessage()); + } catch (InvalidToken e) { + bld.setStatus(ERROR_ACCESS_TOKEN); + bld.setMessage(e.getMessage()); + } catch (IOException e) { + bld.setStatus(ERROR); + bld.setMessage(e.getMessage()); + } + try { bld.build().writeDelimitedTo(socketOut); if (fis != null) { FileDescriptor fds[] = new FileDescriptor[fis.length]; for (int i = 0; i < fds.length; i++) { fds[i] = fis[i].getFD(); } - byte buf[] = new byte[1]; - if (supportsReceiptVerification) { - buf[0] = (byte)USE_RECEIPT_VERIFICATION.getNumber(); - } else { - buf[0] = (byte)DO_NOT_USE_RECEIPT_VERIFICATION.getNumber(); - } - DomainSocket sock = peer.getDomainSocket(); - sock.sendFileDescriptors(fds, buf, 0, buf.length); - if (supportsReceiptVerification) { - LOG.trace("Reading receipt verification byte for " + slotId); - int val = sock.getInputStream().read(); - if (val < 0) { - throw new EOFException(); - } - } else { - LOG.trace("Receipt verification is not enabled on the DataNode. " + - "Not verifying " + slotId); - } - success = true; + byte buf[] = new byte[] { (byte)0 }; + peer.getDomainSocket(). + sendFileDescriptors(fds, buf, 0, buf.length); } } finally { - if ((!success) && (registeredSlotId != null)) { - LOG.info("Unregistering " + registeredSlotId + " because the " + - "requestShortCircuitFdsForRead operation failed."); - datanode.shortCircuitRegistry.unregisterSlot(registeredSlotId); - } if (ClientTraceLog.isInfoEnabled()) { DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk .getBlockPoolId()); BlockSender.ClientTraceLog.info(String.format( "src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," + " blockid: %s, srvID: %s, success: %b", - blk.getBlockId(), dnR.getDatanodeUuid(), success)); + blk.getBlockId(), dnR.getDatanodeUuid(), (fis != null) + )); } if (fis != null) { IOUtils.cleanup(LOG, fis); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java index b32c0d167c5..32906f4182d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java @@ -30,7 +30,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -84,7 +83,7 @@ public class ShortCircuitRegistry { private static final int SHM_LENGTH = 8192; - public static class RegisteredShm extends ShortCircuitShm + private static class RegisteredShm extends ShortCircuitShm implements DomainSocketWatcher.Handler { private final String clientName; private final ShortCircuitRegistry registry; @@ -384,14 +383,4 @@ public class ShortCircuitRegistry { } IOUtils.closeQuietly(watcher); } - - public static interface Visitor { - void accept(HashMap segments, - HashMultimap slots); - } - - @VisibleForTesting - public synchronized void visit(Visitor visitor) { - visitor.accept(segments, slots); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index 8426198feee..d72bb5e150e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -179,12 +179,6 @@ message OpRequestShortCircuitAccessProto { * The shared memory slot to use, if we are using one. */ optional ShortCircuitShmSlotProto slotId = 3; - - /** - * True if the client supports verifying that the file descriptor has been - * sent successfully. - */ - optional bool supportsReceiptVerification = 4 [default = false]; } message ReleaseShortCircuitAccessRequestProto { @@ -236,11 +230,6 @@ enum Status { IN_PROGRESS = 12; } -enum ShortCircuitFdResponse { - DO_NOT_USE_RECEIPT_VERIFICATION = 0; - USE_RECEIPT_VERIFICATION = 1; -} - message PipelineAckProto { required sint64 seqno = 1; repeated uint32 reply = 2; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index 7daabd0c4c8..bfa871c40f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -36,16 +36,13 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import com.google.common.collect.HashMultimap; import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.BlockReaderFactory; import org.apache.hadoop.hdfs.BlockReaderTestUtil; -import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSInputStream; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -55,14 +52,11 @@ import org.apache.hadoop.hdfs.net.DomainPeer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; -import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; -import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm; import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo; import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.CacheVisitor; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; @@ -621,61 +615,4 @@ public class TestShortCircuitCache { cluster.shutdown(); sockDir.close(); } - - public static class TestCleanupFailureInjector - extends BlockReaderFactory.FailureInjector { - @Override - public void injectRequestFileDescriptorsFailure() throws IOException { - throw new IOException("injected I/O error"); - } - } - - // Regression test for HDFS-7915 - @Test(timeout=60000) - public void testDataXceiverCleansUpSlotsOnFailure() throws Exception { - BlockReaderTestUtil.enableShortCircuitShmTracing(); - TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); - Configuration conf = createShortCircuitConf( - "testDataXceiverCleansUpSlotsOnFailure", sockDir); - conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, - 1000000000L); - MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - DistributedFileSystem fs = cluster.getFileSystem(); - final Path TEST_PATH1 = new Path("/test_file1"); - final Path TEST_PATH2 = new Path("/test_file2"); - final int TEST_FILE_LEN = 4096; - final int SEED = 0xFADE1; - DFSTestUtil.createFile(fs, TEST_PATH1, TEST_FILE_LEN, - (short)1, SEED); - DFSTestUtil.createFile(fs, TEST_PATH2, TEST_FILE_LEN, - (short)1, SEED); - - // The first read should allocate one shared memory segment and slot. - DFSTestUtil.readFileBuffer(fs, TEST_PATH1); - - // The second read should fail, and we should only have 1 segment and 1 slot - // left. - fs.getClient().getConf().brfFailureInjector = - new TestCleanupFailureInjector(); - try { - DFSTestUtil.readFileBuffer(fs, TEST_PATH2); - } catch (Throwable t) { - GenericTestUtils.assertExceptionContains("TCP reads were disabled for " + - "testing, but we failed to do a non-TCP read.", t); - } - ShortCircuitRegistry registry = - cluster.getDataNodes().get(0).getShortCircuitRegistry(); - registry.visit(new ShortCircuitRegistry.Visitor() { - @Override - public void accept(HashMap segments, - HashMultimap slots) { - Assert.assertEquals(1, segments.size()); - Assert.assertEquals(1, slots.size()); - } - }); - cluster.shutdown(); - sockDir.close(); - } }