diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt index d1096376bcc..cc81fa36076 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt @@ -30,3 +30,6 @@ HDFS-4418. increase default FileInputStreamCache size (todd) HDFS-4416. Rename dfs.datanode.domain.socket.path to dfs.domain.socket.path (Colin Patrick McCabe via todd) + +HDFS-4417. Fix case where local reads get disabled incorrectly +(Colin Patrick McCabe and todd via todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index 7dd18436469..89ed7c4859d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -68,7 +68,7 @@ public class BlockReaderFactory { * case. * @param allowShortCircuitLocalReads True if short-circuit local reads * should be allowed. - * @return New BlockReader instance, or null on error. + * @return New BlockReader instance */ @SuppressWarnings("deprecation") public static BlockReader newBlockReader( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 0f4b7e350fc..69198ff6ddf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -35,8 +35,8 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.hdfs.net.DomainPeer; @@ -56,7 +56,8 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.hdfs.FileInputStreamCache; + +import com.google.common.annotations.VisibleForTesting; /**************************************************************** * DFSInputStream provides bytes from a named file. It handles @@ -64,11 +65,11 @@ import org.apache.hadoop.hdfs.FileInputStreamCache; ****************************************************************/ @InterfaceAudience.Private public class DFSInputStream extends FSInputStream implements ByteBufferReadable { + @VisibleForTesting + static boolean tcpReadsDisabledForTesting = false; private final PeerCache peerCache; - private final DFSClient dfsClient; private boolean closed = false; - private final String src; private final long prefetchSize; private BlockReader blockReader = null; @@ -853,33 +854,23 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable } } - private Peer newPeer(InetSocketAddress addr) throws IOException { + private Peer newTcpPeer(InetSocketAddress addr) throws IOException { Peer peer = null; boolean success = false; Socket sock = null; - DomainSocket domSock = null; - try { - domSock = dfsClient.getDomainSocketFactory().create(addr, this); - if (domSock != null) { - // Create a UNIX Domain peer. - peer = new DomainPeer(domSock); - } else { - // Create a conventional TCP-based Peer. - sock = dfsClient.socketFactory.createSocket(); - NetUtils.connect(sock, addr, - dfsClient.getRandomLocalInterfaceAddr(), - dfsClient.getConf().socketTimeout); - peer = TcpPeerServer.peerFromSocketAndKey(sock, - dfsClient.getDataEncryptionKey()); - } + sock = dfsClient.socketFactory.createSocket(); + NetUtils.connect(sock, addr, + dfsClient.getRandomLocalInterfaceAddr(), + dfsClient.getConf().socketTimeout); + peer = TcpPeerServer.peerFromSocketAndKey(sock, + dfsClient.getDataEncryptionKey()); success = true; return peer; } finally { if (!success) { IOUtils.closeQuietly(peer); IOUtils.closeQuietly(sock); - IOUtils.closeQuietly(domSock); } } } @@ -888,6 +879,9 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable * Retrieve a BlockReader suitable for reading. * This method will reuse the cached connection to the DN if appropriate. * Otherwise, it will create a new connection. + * Throwing an IOException from this method is basically equivalent to + * declaring the DataNode bad, so we try to connect a lot of different ways + * before doing that. * * @param dnAddr Address of the datanode * @param chosenNode Chosen datanode information @@ -912,9 +906,6 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable boolean verifyChecksum, String clientName) throws IOException { - - IOException err = null; - // Firstly, we check to see if we have cached any file descriptors for // local blocks. If so, we can just re-use those file descriptors. FileInputStream fis[] = fileInputStreamCache.get(chosenNode, block); @@ -927,67 +918,84 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum); } - // We retry several times here. - // On the first nCachedConnRetry times, we try to fetch a socket from - // the socketCache and use it. This may fail, since the old socket may - // have been closed by the peer. - // After that, we try to create a new socket using newPeer(). - // This may create either a TCP socket or a UNIX domain socket, depending - // on the configuration and whether the peer is remote. - // If we try to create a UNIX domain socket and fail, we will not try that - // again. Instead, we'll try to create a TCP socket. Only after we've - // failed to create a TCP-based BlockReader will we throw an IOException - // from this function. Throwing an IOException from here is basically - // equivalent to declaring the DataNode bad. - boolean triedNonDomainSocketReader = false; - for (int retries = 0; - retries < nCachedConnRetry || (!triedNonDomainSocketReader); - ++retries) { - Peer peer = null; - if (retries < nCachedConnRetry) { - peer = peerCache.get(chosenNode); - } - if (peer == null) { - peer = newPeer(dnAddr); - if (peer.getDomainSocket() == null) { - triedNonDomainSocketReader = true; - } - } - boolean success = false; + // Look for cached domain peers. + int cacheTries = 0; + DomainSocketFactory dsFactory = dfsClient.getDomainSocketFactory(); + BlockReader reader = null; + for (; cacheTries < nCachedConnRetry; ++cacheTries) { + Peer peer = peerCache.get(chosenNode, true); + if (peer == null) break; try { - boolean allowShortCircuitLocalReads = - (peer.getDomainSocket() != null) && - dfsClient.getConf().shortCircuitLocalReads && - (!shortCircuitForbidden()); - // Here we will try to send either an OP_READ_BLOCK request or an - // OP_REQUEST_SHORT_CIRCUIT_FDS, depending on what kind of block reader - // we're trying to create. - BlockReader blockReader = BlockReaderFactory.newBlockReader( + boolean allowShortCircuitLocalReads = dfsClient.getConf(). + shortCircuitLocalReads && (!shortCircuitForbidden()); + reader = BlockReaderFactory.newBlockReader( dfsClient.conf, file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, - dfsClient.getDomainSocketFactory(), allowShortCircuitLocalReads); - success = true; - return blockReader; - } catch (IOException ex) { - // Our socket is no good. - DFSClient.LOG.debug("Error making BlockReader. " + + dsFactory, allowShortCircuitLocalReads); + return reader; + } catch (IOException ex) { + DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " + "Closing stale " + peer, ex); - if (peer.getDomainSocket() != null) { - // If the Peer that we got the error from was a DomainPeer, - // mark the socket path as bad, so that newDataSocket will not try - // to re-open this socket for a while. - dfsClient.getDomainSocketFactory(). - disableDomainSocketPath(peer.getDomainSocket().getPath()); - } - err = ex; } finally { - if (!success) { + if (reader == null) { IOUtils.closeQuietly(peer); } } } - throw err; + // Try to create a DomainPeer. + DomainSocket domSock = dsFactory.create(dnAddr, this); + if (domSock != null) { + Peer peer = new DomainPeer(domSock); + try { + boolean allowShortCircuitLocalReads = dfsClient.getConf(). + shortCircuitLocalReads && (!shortCircuitForbidden()); + reader = BlockReaderFactory.newBlockReader( + dfsClient.conf, file, block, blockToken, startOffset, + len, verifyChecksum, clientName, peer, chosenNode, + dsFactory, allowShortCircuitLocalReads); + return reader; + } catch (IOException e) { + DFSClient.LOG.warn("failed to connect to " + domSock, e); + } finally { + if (reader == null) { + // If the Peer that we got the error from was a DomainPeer, + // mark the socket path as bad, so that newDataSocket will not try + // to re-open this socket for a while. + dsFactory.disableDomainSocketPath(domSock.getPath()); + IOUtils.closeQuietly(peer); + } + } + } + + // Look for cached peers. + for (; cacheTries < nCachedConnRetry; ++cacheTries) { + Peer peer = peerCache.get(chosenNode, false); + if (peer == null) break; + try { + reader = BlockReaderFactory.newBlockReader( + dfsClient.conf, file, block, blockToken, startOffset, + len, verifyChecksum, clientName, peer, chosenNode, + dsFactory, false); + return reader; + } catch (IOException ex) { + DFSClient.LOG.debug("Error making BlockReader. Closing stale " + + peer, ex); + } finally { + if (reader == null) { + IOUtils.closeQuietly(peer); + } + } + } + if (tcpReadsDisabledForTesting) { + throw new IOException("TCP reads are disabled."); + } + // Try to create a new remote peer. + Peer peer = newTcpPeer(dnAddr); + return BlockReaderFactory.newBlockReader( + dfsClient.conf, file, block, blockToken, startOffset, + len, verifyChecksum, clientName, peer, chosenNode, + dsFactory, false); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java index 09b2ef70b14..dcb4bc18331 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java @@ -39,6 +39,30 @@ import org.apache.hadoop.util.Time; class PeerCache { private static final Log LOG = LogFactory.getLog(PeerCache.class); + private static class Key { + final DatanodeID dnID; + final boolean isDomain; + + Key(DatanodeID dnID, boolean isDomain) { + this.dnID = dnID; + this.isDomain = isDomain; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Key)) { + return false; + } + Key other = (Key)o; + return dnID.equals(other.dnID) && isDomain == other.isDomain; + } + + @Override + public int hashCode() { + return dnID.hashCode() ^ (isDomain ? 1 : 0); + } + } + private static class Value { private final Peer peer; private final long time; @@ -59,7 +83,7 @@ class PeerCache { private Daemon daemon; /** A map for per user per datanode. */ - private static LinkedListMultimap multimap = + private static LinkedListMultimap multimap = LinkedListMultimap.create(); private static int capacity; private static long expiryPeriod; @@ -124,16 +148,18 @@ class PeerCache { /** * Get a cached peer connected to the given DataNode. * @param dnId The DataNode to get a Peer for. + * @param isDomain Whether to retrieve a DomainPeer or not. + * * @return An open Peer connected to the DN, or null if none * was found. */ - public synchronized Peer get(DatanodeID dnId) { + public synchronized Peer get(DatanodeID dnId, boolean isDomain) { if (capacity <= 0) { // disabled return null; } - List sockStreamList = multimap.get(dnId); + List sockStreamList = multimap.get(new Key(dnId, isDomain)); if (sockStreamList == null) { return null; } @@ -168,7 +194,8 @@ class PeerCache { if (capacity == multimap.size()) { evictOldest(); } - multimap.put(dnId, new Value(peer, Time.monotonicNow())); + multimap.put(new Key(dnId, peer.getDomainSocket() != null), + new Value(peer, Time.monotonicNow())); } public synchronized int size() { @@ -180,9 +207,9 @@ class PeerCache { */ private synchronized void evictExpired(long expiryPeriod) { while (multimap.size() != 0) { - Iterator> iter = + Iterator> iter = multimap.entries().iterator(); - Entry entry = iter.next(); + Entry entry = iter.next(); // if oldest socket expired, remove it if (entry == null || Time.monotonicNow() - entry.getValue().getTime() < @@ -201,13 +228,13 @@ class PeerCache { // We can get the oldest element immediately, because of an interesting // property of LinkedListMultimap: its iterator traverses entries in the // order that they were added. - Iterator> iter = + Iterator> iter = multimap.entries().iterator(); if (!iter.hasNext()) { throw new IllegalStateException("Cannot evict from empty cache! " + "capacity: " + capacity); } - Entry entry = iter.next(); + Entry entry = iter.next(); IOUtils.cleanup(LOG, entry.getValue().getPeer()); iter.remove(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 54433e0463c..466635b5e88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -70,7 +70,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java index 9ef0f093ffa..4e785110476 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java @@ -114,7 +114,7 @@ public class TestDataTransferKeepalive { // Take it out of the cache - reading should // give an EOF. - Peer peer = dfsClient.peerCache.get(dn.getDatanodeId()); + Peer peer = dfsClient.peerCache.get(dn.getDatanodeId(), false); assertNotNull(peer); assertEquals(-1, peer.getInputStream().read()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadUnCached.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadUnCached.java new file mode 100644 index 00000000000..634eef8a7e2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadUnCached.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.File; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import static org.hamcrest.CoreMatchers.*; + +/** + * This class tests short-circuit local reads without any FileInputStream or + * Socket caching. This is a regression test for HDFS-4417. + */ +public class TestParallelShortCircuitReadUnCached extends TestParallelReadUtil { + private static TemporarySocketDirectory sockDir; + + @BeforeClass + static public void setupCluster() throws Exception { + if (DomainSocket.getLoadingFailureReason() != null) return; + sockDir = new TemporarySocketDirectory(); + HdfsConfiguration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, + new File(sockDir.getDir(), + "TestParallelShortCircuitReadUnCached._PORT.sock").getAbsolutePath()); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); + conf.setBoolean(DFSConfigKeys. + DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false); + conf.setBoolean(DFSConfigKeys. + DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true); + // We want to test reading from stale sockets. + conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, 1); + conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, + 5 * 60 * 1000); + conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 32); + // Avoid using the FileInputStreamCache. + conf.setInt(DFSConfigKeys. + DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY, 0); + DomainSocket.disableBindPathValidation(); + DFSInputStream.tcpReadsDisabledForTesting = true; + setupCluster(1, conf); + } + + @Before + public void before() { + Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); + } + + @AfterClass + static public void teardownCluster() throws Exception { + if (DomainSocket.getLoadingFailureReason() != null) return; + sockDir.close(); + TestParallelReadUtil.teardownCluster(); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java index bb580bc9534..3c8db549ab9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java @@ -33,22 +33,25 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.net.unix.DomainSocket; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestPeerCache { static final Log LOG = LogFactory.getLog(TestPeerCache.class); private static final int CAPACITY = 3; private static final int EXPIRY_PERIOD = 20; - private static PeerCache cache = - PeerCache.getInstance(CAPACITY, EXPIRY_PERIOD); private static class FakePeer implements Peer { private boolean closed = false; + private final boolean hasDomain; private DatanodeID dnId; - public FakePeer(DatanodeID dnId) { + public FakePeer(DatanodeID dnId, boolean hasDomain) { this.dnId = dnId; + this.hasDomain = hasDomain; } @Override @@ -118,39 +121,50 @@ public class TestPeerCache { @Override public DomainSocket getDomainSocket() { - return null; + if (!hasDomain) return null; + // Return a mock which throws an exception whenever any function is + // called. + return Mockito.mock(DomainSocket.class, + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) + throws Throwable { + throw new RuntimeException("injected fault."); + } }); } } @Test public void testAddAndRetrieve() throws Exception { + PeerCache cache = PeerCache.getInstance(3, 100000); DatanodeID dnId = new DatanodeID("192.168.0.1", "fakehostname", "fake_storage_id", 100, 101, 102); - FakePeer peer = new FakePeer(dnId); + FakePeer peer = new FakePeer(dnId, false); cache.put(dnId, peer); assertTrue(!peer.isClosed()); assertEquals(1, cache.size()); - assertEquals(peer, cache.get(dnId)); + assertEquals(peer, cache.get(dnId, false)); assertEquals(0, cache.size()); cache.clear(); } @Test public void testExpiry() throws Exception { + final int CAPACITY = 3; + final int EXPIRY_PERIOD = 10; + PeerCache cache = PeerCache.getInstance(CAPACITY, EXPIRY_PERIOD); DatanodeID dnIds[] = new DatanodeID[CAPACITY]; FakePeer peers[] = new FakePeer[CAPACITY]; for (int i = 0; i < CAPACITY; ++i) { dnIds[i] = new DatanodeID("192.168.0.1", "fakehostname_" + i, "fake_storage_id", 100, 101, 102); - peers[i] = new FakePeer(dnIds[i]); + peers[i] = new FakePeer(dnIds[i], false); } for (int i = 0; i < CAPACITY; ++i) { cache.put(dnIds[i], peers[i]); } - // Check that the peers are cached - assertEquals(CAPACITY, cache.size()); // Wait for the peers to expire Thread.sleep(EXPIRY_PERIOD * 50); @@ -169,13 +183,15 @@ public class TestPeerCache { @Test public void testEviction() throws Exception { + final int CAPACITY = 3; + PeerCache cache = PeerCache.getInstance(CAPACITY, 100000); DatanodeID dnIds[] = new DatanodeID[CAPACITY + 1]; FakePeer peers[] = new FakePeer[CAPACITY + 1]; for (int i = 0; i < dnIds.length; ++i) { dnIds[i] = new DatanodeID("192.168.0.1", "fakehostname_" + i, "fake_storage_id_" + i, 100, 101, 102); - peers[i] = new FakePeer(dnIds[i]); + peers[i] = new FakePeer(dnIds[i], false); } for (int i = 0; i < CAPACITY; ++i) { cache.put(dnIds[i], peers[i]); @@ -186,11 +202,11 @@ public class TestPeerCache { // Add another entry and check that the first entry was evicted cache.put(dnIds[CAPACITY], peers[CAPACITY]); assertEquals(CAPACITY, cache.size()); - assertSame(null, cache.get(dnIds[0])); + assertSame(null, cache.get(dnIds[0], false)); // Make sure that the other entries are still there for (int i = 1; i < CAPACITY; ++i) { - Peer peer = cache.get(dnIds[i]); + Peer peer = cache.get(dnIds[i], false); assertSame(peers[i], peer); assertTrue(!peer.isClosed()); peer.close(); @@ -201,19 +217,56 @@ public class TestPeerCache { @Test public void testMultiplePeersWithSameDnId() throws Exception { + final int CAPACITY = 3; + PeerCache cache = PeerCache.getInstance(CAPACITY, 100000); DatanodeID dnId = new DatanodeID("192.168.0.1", "fakehostname", "fake_storage_id", 100, 101, 102); HashSet peers = new HashSet(CAPACITY); for (int i = 0; i < CAPACITY; ++i) { - FakePeer peer = new FakePeer(dnId); + FakePeer peer = new FakePeer(dnId, false); peers.add(peer); cache.put(dnId, peer); } // Check that all of the peers ended up in the cache assertEquals(CAPACITY, cache.size()); while (!peers.isEmpty()) { - Peer peer = cache.get(dnId); + Peer peer = cache.get(dnId, false); + assertTrue(peer != null); + assertTrue(!peer.isClosed()); + peers.remove(peer); + } + assertEquals(0, cache.size()); + cache.clear(); + } + + @Test + public void testDomainSocketPeers() throws Exception { + final int CAPACITY = 3; + PeerCache cache = PeerCache.getInstance(CAPACITY, 100000); + DatanodeID dnId = new DatanodeID("192.168.0.1", + "fakehostname", "fake_storage_id", + 100, 101, 102); + HashSet peers = new HashSet(CAPACITY); + for (int i = 0; i < CAPACITY; ++i) { + FakePeer peer = new FakePeer(dnId, i == CAPACITY - 1); + peers.add(peer); + cache.put(dnId, peer); + } + // Check that all of the peers ended up in the cache + assertEquals(CAPACITY, cache.size()); + // Test that get(requireDomainPeer=true) finds the peer with the + // domain socket. + Peer peer = cache.get(dnId, true); + assertTrue(peer.getDomainSocket() != null); + peers.remove(peer); + // Test that get(requireDomainPeer=true) returns null when there are + // no more peers with domain sockets. + peer = cache.get(dnId, true); + assertTrue(peer == null); + // Check that all of the other peers ended up in the cache. + while (!peers.isEmpty()) { + peer = cache.get(dnId, false); assertTrue(peer != null); assertTrue(!peer.isClosed()); peers.remove(peer);