From 5aa892ed486d42ae6b94c4866b92cd2b382ea640 Mon Sep 17 00:00:00 2001 From: Colin Patrick Mccabe Date: Fri, 13 Mar 2015 18:29:49 -0700 Subject: [PATCH] HDFS-7915. The DataNode can sometimes allocate a ShortCircuitShm slot and fail to tell the DFSClient about it because of a network error (cmccabe) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/BlockReaderFactory.java | 23 ++++- .../org/apache/hadoop/hdfs/DFSClient.java | 2 + .../datatransfer/DataTransferProtocol.java | 5 +- .../hdfs/protocol/datatransfer/Receiver.java | 2 +- .../hdfs/protocol/datatransfer/Sender.java | 4 +- .../hdfs/server/datanode/DataXceiver.java | 95 +++++++++++-------- .../server/datanode/ShortCircuitRegistry.java | 13 ++- .../src/main/proto/datatransfer.proto | 11 +++ .../shortcircuit/TestShortCircuitCache.java | 63 ++++++++++++ 10 files changed, 178 insertions(+), 43 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c3f9367008f..ff00b0c86c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1177,6 +1177,9 @@ Release 2.7.0 - UNRELEASED HDFS-7722. DataNode#checkDiskError should also remove Storage when error is found. (Lei Xu via Colin P. McCabe) + HDFS-7915. The DataNode can sometimes allocate a ShortCircuitShm slot and + fail to tell the DFSClient about it because of a network error (cmccabe) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index ba48c797158..1e915b28122 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION; + import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -69,12 +71,23 @@ import com.google.common.base.Preconditions; public class BlockReaderFactory implements ShortCircuitReplicaCreator { static final Log LOG = LogFactory.getLog(BlockReaderFactory.class); + public static class FailureInjector { + public void injectRequestFileDescriptorsFailure() throws IOException { + // do nothing + } + } + @VisibleForTesting static ShortCircuitReplicaCreator createShortCircuitReplicaInfoCallback = null; private final DFSClient.Conf conf; + /** + * Injects failures into specific operations during unit tests. + */ + private final FailureInjector failureInjector; + /** * The file name, for logging and debugging purposes. */ @@ -169,6 +182,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { public BlockReaderFactory(DFSClient.Conf conf) { this.conf = conf; + this.failureInjector = conf.brfFailureInjector; this.remainingCacheTries = conf.nCachedConnRetry; } @@ -518,11 +532,12 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); SlotId slotId = slot == null ? null : slot.getSlotId(); - new Sender(out).requestShortCircuitFds(block, token, slotId, 1); + new Sender(out).requestShortCircuitFds(block, token, slotId, 1, true); DataInputStream in = new DataInputStream(peer.getInputStream()); BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( PBHelper.vintPrefixed(in)); DomainSocket sock = peer.getDomainSocket(); + failureInjector.injectRequestFileDescriptorsFailure(); switch (resp.getStatus()) { case SUCCESS: byte buf[] = new byte[1]; @@ -532,8 +547,13 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { try { ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); + if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) { + LOG.trace("Sending receipt verification byte for slot " + slot); + sock.getOutputStream().write(0); + } replica = new ShortCircuitReplica(key, fis[0], fis[1], cache, Time.monotonicNow(), slot); + return new ShortCircuitReplicaInfo(replica); } catch (IOException e) { // This indicates an error reading from disk, or a format error. Since // it's not a socket communication problem, we return null rather than @@ -545,7 +565,6 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]); } } - return new ShortCircuitReplicaInfo(replica); case ERROR_UNSUPPORTED: if (!resp.hasShortCircuitAccessVersion()) { LOG.warn("short-circuit read access is disabled for " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index aac7b511433..f970fef4722 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -337,6 +337,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, final long shortCircuitCacheStaleThresholdMs; final long keyProviderCacheExpiryMs; + public BlockReaderFactory.FailureInjector brfFailureInjector = + new BlockReaderFactory.FailureInjector(); public Conf(Configuration conf) { // The hdfsTimeout is currently the same as the ipc timeout diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index 4be42a8d358..48e931d741a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -138,10 +138,13 @@ public interface DataTransferProtocol { * to use no slot id. * @param maxVersion Maximum version of the block data the client * can understand. + * @param supportsReceiptVerification True if the client supports + * receipt verification. */ public void requestShortCircuitFds(final ExtendedBlock blk, final Token blockToken, - SlotId slotId, int maxVersion) throws IOException; + SlotId slotId, int maxVersion, boolean supportsReceiptVerification) + throws IOException; /** * Release a pair of short-circuit FDs requested earlier. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index 7994027c6ec..31bdc5e2a52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -186,7 +186,7 @@ public abstract class Receiver implements DataTransferProtocol { try { requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()), PBHelper.convert(proto.getHeader().getToken()), - slotId, proto.getMaxVersion()); + slotId, proto.getMaxVersion(), true); } finally { if (traceScope != null) traceScope.close(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 7fea33efc59..df69125882b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -181,7 +181,8 @@ public class Sender implements DataTransferProtocol { @Override public void requestShortCircuitFds(final ExtendedBlock blk, final Token blockToken, - SlotId slotId, int maxVersion) throws IOException { + SlotId slotId, int maxVersion, boolean supportsReceiptVerification) + throws IOException { OpRequestShortCircuitAccessProto.Builder builder = OpRequestShortCircuitAccessProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader( @@ -189,6 +190,7 @@ public class Sender implements DataTransferProtocol { if (slotId != null) { builder.setSlotId(PBHelper.convert(slotId)); } + builder.setSupportsReceiptVerification(supportsReceiptVerification); OpRequestShortCircuitAccessProto proto = builder.build(); send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index e9547a84e2e..84504fb5993 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -22,6 +22,8 @@ import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ER import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS; +import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION; +import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION; import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT; import static org.apache.hadoop.util.Time.now; @@ -291,64 +293,83 @@ class DataXceiver extends Receiver implements Runnable { @Override public void requestShortCircuitFds(final ExtendedBlock blk, final Token token, - SlotId slotId, int maxVersion) throws IOException { + SlotId slotId, int maxVersion, boolean supportsReceiptVerification) + throws IOException { updateCurrentThreadName("Passing file descriptors for block " + blk); BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder(); FileInputStream fis[] = null; + SlotId registeredSlotId = null; + boolean success = false; try { - if (peer.getDomainSocket() == null) { - throw new IOException("You cannot pass file descriptors over " + - "anything but a UNIX domain socket."); - } - if (slotId != null) { - boolean isCached = datanode.data. - isCached(blk.getBlockPoolId(), blk.getBlockId()); - datanode.shortCircuitRegistry.registerSlot( - ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached); - } try { - fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion); - } finally { - if ((fis == null) && (slotId != null)) { - datanode.shortCircuitRegistry.unregisterSlot(slotId); + if (peer.getDomainSocket() == null) { + throw new IOException("You cannot pass file descriptors over " + + "anything but a UNIX domain socket."); } + if (slotId != null) { + boolean isCached = datanode.data. + isCached(blk.getBlockPoolId(), blk.getBlockId()); + datanode.shortCircuitRegistry.registerSlot( + ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached); + registeredSlotId = slotId; + } + fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion); + Preconditions.checkState(fis != null); + bld.setStatus(SUCCESS); + bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION); + } catch (ShortCircuitFdsVersionException e) { + bld.setStatus(ERROR_UNSUPPORTED); + bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION); + bld.setMessage(e.getMessage()); + } catch (ShortCircuitFdsUnsupportedException e) { + bld.setStatus(ERROR_UNSUPPORTED); + bld.setMessage(e.getMessage()); + } catch (InvalidToken e) { + bld.setStatus(ERROR_ACCESS_TOKEN); + bld.setMessage(e.getMessage()); + } catch (IOException e) { + bld.setStatus(ERROR); + bld.setMessage(e.getMessage()); } - bld.setStatus(SUCCESS); - bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION); - } catch (ShortCircuitFdsVersionException e) { - bld.setStatus(ERROR_UNSUPPORTED); - bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION); - bld.setMessage(e.getMessage()); - } catch (ShortCircuitFdsUnsupportedException e) { - bld.setStatus(ERROR_UNSUPPORTED); - bld.setMessage(e.getMessage()); - } catch (InvalidToken e) { - bld.setStatus(ERROR_ACCESS_TOKEN); - bld.setMessage(e.getMessage()); - } catch (IOException e) { - bld.setStatus(ERROR); - bld.setMessage(e.getMessage()); - } - try { bld.build().writeDelimitedTo(socketOut); if (fis != null) { FileDescriptor fds[] = new FileDescriptor[fis.length]; for (int i = 0; i < fds.length; i++) { fds[i] = fis[i].getFD(); } - byte buf[] = new byte[] { (byte)0 }; - peer.getDomainSocket(). - sendFileDescriptors(fds, buf, 0, buf.length); + byte buf[] = new byte[1]; + if (supportsReceiptVerification) { + buf[0] = (byte)USE_RECEIPT_VERIFICATION.getNumber(); + } else { + buf[0] = (byte)DO_NOT_USE_RECEIPT_VERIFICATION.getNumber(); + } + DomainSocket sock = peer.getDomainSocket(); + sock.sendFileDescriptors(fds, buf, 0, buf.length); + if (supportsReceiptVerification) { + LOG.trace("Reading receipt verification byte for " + slotId); + int val = sock.getInputStream().read(); + if (val < 0) { + throw new EOFException(); + } + } else { + LOG.trace("Receipt verification is not enabled on the DataNode. " + + "Not verifying " + slotId); + } + success = true; } } finally { + if ((!success) && (registeredSlotId != null)) { + LOG.info("Unregistering " + registeredSlotId + " because the " + + "requestShortCircuitFdsForRead operation failed."); + datanode.shortCircuitRegistry.unregisterSlot(registeredSlotId); + } if (ClientTraceLog.isInfoEnabled()) { DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk .getBlockPoolId()); BlockSender.ClientTraceLog.info(String.format( "src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," + " blockid: %s, srvID: %s, success: %b", - blk.getBlockId(), dnR.getDatanodeUuid(), (fis != null) - )); + blk.getBlockId(), dnR.getDatanodeUuid(), success)); } if (fis != null) { IOUtils.cleanup(LOG, fis); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java index 32906f4182d..b32c0d167c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java @@ -30,6 +30,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -83,7 +84,7 @@ public class ShortCircuitRegistry { private static final int SHM_LENGTH = 8192; - private static class RegisteredShm extends ShortCircuitShm + public static class RegisteredShm extends ShortCircuitShm implements DomainSocketWatcher.Handler { private final String clientName; private final ShortCircuitRegistry registry; @@ -383,4 +384,14 @@ public class ShortCircuitRegistry { } IOUtils.closeQuietly(watcher); } + + public static interface Visitor { + void accept(HashMap segments, + HashMultimap slots); + } + + @VisibleForTesting + public synchronized void visit(Visitor visitor) { + visitor.accept(segments, slots); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index d72bb5e150e..8426198feee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -179,6 +179,12 @@ message OpRequestShortCircuitAccessProto { * The shared memory slot to use, if we are using one. */ optional ShortCircuitShmSlotProto slotId = 3; + + /** + * True if the client supports verifying that the file descriptor has been + * sent successfully. + */ + optional bool supportsReceiptVerification = 4 [default = false]; } message ReleaseShortCircuitAccessRequestProto { @@ -230,6 +236,11 @@ enum Status { IN_PROGRESS = 12; } +enum ShortCircuitFdResponse { + DO_NOT_USE_RECEIPT_VERIFICATION = 0; + USE_RECEIPT_VERIFICATION = 1; +} + message PipelineAckProto { required sint64 seqno = 1; repeated uint32 reply = 2; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index bfa871c40f2..7daabd0c4c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -36,13 +36,16 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import com.google.common.collect.HashMultimap; import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.BlockReaderFactory; import org.apache.hadoop.hdfs.BlockReaderTestUtil; +import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSInputStream; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -52,11 +55,14 @@ import org.apache.hadoop.hdfs.net.DomainPeer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; +import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm; import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo; import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.CacheVisitor; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; @@ -615,4 +621,61 @@ public class TestShortCircuitCache { cluster.shutdown(); sockDir.close(); } + + public static class TestCleanupFailureInjector + extends BlockReaderFactory.FailureInjector { + @Override + public void injectRequestFileDescriptorsFailure() throws IOException { + throw new IOException("injected I/O error"); + } + } + + // Regression test for HDFS-7915 + @Test(timeout=60000) + public void testDataXceiverCleansUpSlotsOnFailure() throws Exception { + BlockReaderTestUtil.enableShortCircuitShmTracing(); + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration conf = createShortCircuitConf( + "testDataXceiverCleansUpSlotsOnFailure", sockDir); + conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, + 1000000000L); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + final Path TEST_PATH1 = new Path("/test_file1"); + final Path TEST_PATH2 = new Path("/test_file2"); + final int TEST_FILE_LEN = 4096; + final int SEED = 0xFADE1; + DFSTestUtil.createFile(fs, TEST_PATH1, TEST_FILE_LEN, + (short)1, SEED); + DFSTestUtil.createFile(fs, TEST_PATH2, TEST_FILE_LEN, + (short)1, SEED); + + // The first read should allocate one shared memory segment and slot. + DFSTestUtil.readFileBuffer(fs, TEST_PATH1); + + // The second read should fail, and we should only have 1 segment and 1 slot + // left. + fs.getClient().getConf().brfFailureInjector = + new TestCleanupFailureInjector(); + try { + DFSTestUtil.readFileBuffer(fs, TEST_PATH2); + } catch (Throwable t) { + GenericTestUtils.assertExceptionContains("TCP reads were disabled for " + + "testing, but we failed to do a non-TCP read.", t); + } + ShortCircuitRegistry registry = + cluster.getDataNodes().get(0).getShortCircuitRegistry(); + registry.visit(new ShortCircuitRegistry.Visitor() { + @Override + public void accept(HashMap segments, + HashMultimap slots) { + Assert.assertEquals(1, segments.size()); + Assert.assertEquals(1, slots.size()); + } + }); + cluster.shutdown(); + sockDir.close(); + } }