HDFS-4433. Make TestPeerCache not flaky. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-347@1437680 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2013-01-23 20:01:33 +00:00
parent 797465820b
commit 4e74c52e60
3 changed files with 68 additions and 39 deletions

View File

@ -33,3 +33,5 @@ HDFS-4416. Rename dfs.datanode.domain.socket.path to dfs.domain.socket.path
HDFS-4417. Fix case where local reads get disabled incorrectly HDFS-4417. Fix case where local reads get disabled incorrectly
(Colin Patrick McCabe and todd via todd) (Colin Patrick McCabe and todd via todd)
HDFS-4433. Make TestPeerCache not flaky (Colin Patrick McCabe via todd)

View File

@ -83,32 +83,35 @@ long getTime() {
private Daemon daemon; private Daemon daemon;
/** A map for per user per datanode. */ /** A map for per user per datanode. */
private static LinkedListMultimap<Key, Value> multimap = private final LinkedListMultimap<Key, Value> multimap =
LinkedListMultimap.create(); LinkedListMultimap.create();
private static int capacity; private final int capacity;
private static long expiryPeriod; private final long expiryPeriod;
private static PeerCache instance = new PeerCache(); private static PeerCache instance = null;
private static boolean isInitedOnce = false;
@VisibleForTesting
PeerCache(int c, long e) {
this.capacity = c;
this.expiryPeriod = e;
if (capacity == 0 ) {
LOG.info("SocketCache disabled.");
}
else if (expiryPeriod == 0) {
throw new IllegalStateException("Cannot initialize expiryPeriod to " +
expiryPeriod + "when cache is enabled.");
}
}
public static synchronized PeerCache getInstance(int c, long e) { public static synchronized PeerCache getInstance(int c, long e) {
// capacity is only initialized once // capacity is only initialized once
if (isInitedOnce == false) { if (instance == null) {
capacity = c; instance = new PeerCache(c, e);
expiryPeriod = e;
if (capacity == 0 ) {
LOG.info("SocketCache disabled.");
}
else if (expiryPeriod == 0) {
throw new IllegalStateException("Cannot initialize expiryPeriod to " +
expiryPeriod + "when cache is enabled.");
}
isInitedOnce = true;
} else { //already initialized once } else { //already initialized once
if (capacity != c || expiryPeriod != e) { if (instance.capacity != c || instance.expiryPeriod != e) {
LOG.info("capacity and expiry periods already set to " + capacity + LOG.info("capacity and expiry periods already set to " +
" and " + expiryPeriod + " respectively. Cannot set it to " + c + instance.capacity + " and " + instance.expiryPeriod +
" and " + e); " respectively. Cannot set it to " + c + " and " + e);
} }
} }
@ -267,5 +270,18 @@ synchronized void clear() {
} }
multimap.clear(); multimap.clear();
} }
@VisibleForTesting
void close() {
clear();
if (daemon != null) {
daemon.interrupt();
try {
daemon.join();
} catch (InterruptedException e) {
throw new RuntimeException("failed to join thread");
}
}
daemon = null;
}
} }

View File

@ -25,7 +25,6 @@
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.util.HashSet;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -37,12 +36,11 @@
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import com.google.common.collect.HashMultiset;
public class TestPeerCache { public class TestPeerCache {
static final Log LOG = LogFactory.getLog(TestPeerCache.class); static final Log LOG = LogFactory.getLog(TestPeerCache.class);
private static final int CAPACITY = 3;
private static final int EXPIRY_PERIOD = 20;
private static class FakePeer implements Peer { private static class FakePeer implements Peer {
private boolean closed = false; private boolean closed = false;
private final boolean hasDomain; private final boolean hasDomain;
@ -132,11 +130,24 @@ public Object answer(InvocationOnMock invocation)
throw new RuntimeException("injected fault."); throw new RuntimeException("injected fault.");
} }); } });
} }
@Override
public boolean equals(Object o) {
if (!(o instanceof FakePeer)) return false;
FakePeer other = (FakePeer)o;
return hasDomain == other.hasDomain &&
dnId.equals(other.dnId);
}
@Override
public int hashCode() {
return dnId.hashCode() ^ (hasDomain ? 1 : 0);
}
} }
@Test @Test
public void testAddAndRetrieve() throws Exception { public void testAddAndRetrieve() throws Exception {
PeerCache cache = PeerCache.getInstance(3, 100000); PeerCache cache = new PeerCache(3, 100000);
DatanodeID dnId = new DatanodeID("192.168.0.1", DatanodeID dnId = new DatanodeID("192.168.0.1",
"fakehostname", "fake_storage_id", "fakehostname", "fake_storage_id",
100, 101, 102); 100, 101, 102);
@ -146,14 +157,14 @@ public void testAddAndRetrieve() throws Exception {
assertEquals(1, cache.size()); assertEquals(1, cache.size());
assertEquals(peer, cache.get(dnId, false)); assertEquals(peer, cache.get(dnId, false));
assertEquals(0, cache.size()); assertEquals(0, cache.size());
cache.clear(); cache.close();
} }
@Test @Test
public void testExpiry() throws Exception { public void testExpiry() throws Exception {
final int CAPACITY = 3; final int CAPACITY = 3;
final int EXPIRY_PERIOD = 10; final int EXPIRY_PERIOD = 10;
PeerCache cache = PeerCache.getInstance(CAPACITY, EXPIRY_PERIOD); PeerCache cache = new PeerCache(CAPACITY, EXPIRY_PERIOD);
DatanodeID dnIds[] = new DatanodeID[CAPACITY]; DatanodeID dnIds[] = new DatanodeID[CAPACITY];
FakePeer peers[] = new FakePeer[CAPACITY]; FakePeer peers[] = new FakePeer[CAPACITY];
for (int i = 0; i < CAPACITY; ++i) { for (int i = 0; i < CAPACITY; ++i) {
@ -178,13 +189,13 @@ public void testExpiry() throws Exception {
// sleep for another second and see if // sleep for another second and see if
// the daemon thread runs fine on empty cache // the daemon thread runs fine on empty cache
Thread.sleep(EXPIRY_PERIOD * 50); Thread.sleep(EXPIRY_PERIOD * 50);
cache.clear(); cache.close();
} }
@Test @Test
public void testEviction() throws Exception { public void testEviction() throws Exception {
final int CAPACITY = 3; final int CAPACITY = 3;
PeerCache cache = PeerCache.getInstance(CAPACITY, 100000); PeerCache cache = new PeerCache(CAPACITY, 100000);
DatanodeID dnIds[] = new DatanodeID[CAPACITY + 1]; DatanodeID dnIds[] = new DatanodeID[CAPACITY + 1];
FakePeer peers[] = new FakePeer[CAPACITY + 1]; FakePeer peers[] = new FakePeer[CAPACITY + 1];
for (int i = 0; i < dnIds.length; ++i) { for (int i = 0; i < dnIds.length; ++i) {
@ -212,17 +223,17 @@ public void testEviction() throws Exception {
peer.close(); peer.close();
} }
assertEquals(1, cache.size()); assertEquals(1, cache.size());
cache.clear(); cache.close();
} }
@Test @Test
public void testMultiplePeersWithSameDnId() throws Exception { public void testMultiplePeersWithSameKey() throws Exception {
final int CAPACITY = 3; final int CAPACITY = 3;
PeerCache cache = PeerCache.getInstance(CAPACITY, 100000); PeerCache cache = new PeerCache(CAPACITY, 100000);
DatanodeID dnId = new DatanodeID("192.168.0.1", DatanodeID dnId = new DatanodeID("192.168.0.1",
"fakehostname", "fake_storage_id", "fakehostname", "fake_storage_id",
100, 101, 102); 100, 101, 102);
HashSet<FakePeer> peers = new HashSet<FakePeer>(CAPACITY); HashMultiset<FakePeer> peers = HashMultiset.create(CAPACITY);
for (int i = 0; i < CAPACITY; ++i) { for (int i = 0; i < CAPACITY; ++i) {
FakePeer peer = new FakePeer(dnId, false); FakePeer peer = new FakePeer(dnId, false);
peers.add(peer); peers.add(peer);
@ -237,17 +248,17 @@ public void testMultiplePeersWithSameDnId() throws Exception {
peers.remove(peer); peers.remove(peer);
} }
assertEquals(0, cache.size()); assertEquals(0, cache.size());
cache.clear(); cache.close();
} }
@Test @Test
public void testDomainSocketPeers() throws Exception { public void testDomainSocketPeers() throws Exception {
final int CAPACITY = 3; final int CAPACITY = 3;
PeerCache cache = PeerCache.getInstance(CAPACITY, 100000); PeerCache cache = new PeerCache(CAPACITY, 100000);
DatanodeID dnId = new DatanodeID("192.168.0.1", DatanodeID dnId = new DatanodeID("192.168.0.1",
"fakehostname", "fake_storage_id", "fakehostname", "fake_storage_id",
100, 101, 102); 100, 101, 102);
HashSet<FakePeer> peers = new HashSet<FakePeer>(CAPACITY); HashMultiset<FakePeer> peers = HashMultiset.create(CAPACITY);
for (int i = 0; i < CAPACITY; ++i) { for (int i = 0; i < CAPACITY; ++i) {
FakePeer peer = new FakePeer(dnId, i == CAPACITY - 1); FakePeer peer = new FakePeer(dnId, i == CAPACITY - 1);
peers.add(peer); peers.add(peer);
@ -272,6 +283,6 @@ public void testDomainSocketPeers() throws Exception {
peers.remove(peer); peers.remove(peer);
} }
assertEquals(0, cache.size()); assertEquals(0, cache.size());
cache.clear(); cache.close();
} }
} }