HDFS-11660. TestFsDatasetCache#testPageRounder fails intermittently with AssertionError.

This commit is contained in:
Andrew Wang 2017-04-19 18:10:04 -07:00
parent c1549352cf
commit 74a723852d
5 changed files with 67 additions and 19 deletions

View File

@ -623,6 +623,7 @@ class BPServiceActor implements Runnable {
//
while (shouldRun()) {
try {
DataNodeFaultInjector.get().startOfferService();
final long startTime = scheduler.monotonicNow();
//
@ -725,6 +726,8 @@ class BPServiceActor implements Runnable {
} catch (IOException e) {
LOG.warn("IOException in offerService", e);
sleepAfterException();
} finally {
DataNodeFaultInjector.get().endOfferService();
}
processQueueMessages();
} // while (shouldRun())

View File

@ -30,7 +30,7 @@ import java.io.IOException;
@VisibleForTesting
@InterfaceAudience.Private
public class DataNodeFaultInjector {
public static DataNodeFaultInjector instance = new DataNodeFaultInjector();
private static DataNodeFaultInjector instance = new DataNodeFaultInjector();
public static DataNodeFaultInjector get() {
return instance;
@ -81,4 +81,8 @@ public class DataNodeFaultInjector {
public void failPipeline(ReplicaInPipeline replicaInfo,
String mirrorAddr) throws IOException { }
public void startOfferService() throws Exception {}
public void endOfferService() throws Exception {}
}

View File

@ -32,6 +32,7 @@ import java.util.List;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import net.jcip.annotations.NotThreadSafe;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -58,6 +59,7 @@ import org.mockito.Mockito;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@NotThreadSafe
public class TestDataNodeMetrics {
private static final Log LOG = LogFactory.getLog(TestDataNodeMetrics.class);
@ -216,6 +218,7 @@ public class TestDataNodeMetrics {
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
final List<FSDataOutputStream> streams = Lists.newArrayList();
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
try {
final FSDataOutputStream out =
cluster.getFileSystem().create(path, (short) 2);
@ -224,7 +227,7 @@ public class TestDataNodeMetrics {
Mockito.doThrow(new IOException("mock IOException")).
when(injector).
writeBlockAfterFlush();
DataNodeFaultInjector.instance = injector;
DataNodeFaultInjector.set(injector);
streams.add(out);
out.writeBytes("old gs data\n");
out.hflush();
@ -250,7 +253,7 @@ public class TestDataNodeMetrics {
if (cluster != null) {
cluster.shutdown();
}
DataNodeFaultInjector.instance = new DataNodeFaultInjector();
DataNodeFaultInjector.set(oldInjector);
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import net.jcip.annotations.NotThreadSafe;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
@ -34,6 +35,8 @@ import java.nio.channels.FileChannel;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -80,8 +83,10 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
@ -91,6 +96,7 @@ import com.google.common.primitives.Ints;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY;
@NotThreadSafe
public class TestFsDatasetCache {
private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class);
@ -110,13 +116,39 @@ public class TestFsDatasetCache {
private static DataNode dn;
private static FsDatasetSpi<?> fsd;
private static DatanodeProtocolClientSideTranslatorPB spyNN;
/**
* Used to pause DN BPServiceActor threads. BPSA threads acquire the
* shared read lock. The test acquires the write lock for exclusive access.
*/
private static ReadWriteLock lock = new ReentrantReadWriteLock(true);
private static final PageRounder rounder = new PageRounder();
private static CacheManipulator prevCacheManipulator;
private static DataNodeFaultInjector oldInjector;
static {
LogManager.getLogger(FsDatasetCache.class).setLevel(Level.DEBUG);
}
@BeforeClass
public static void setUpClass() throws Exception {
oldInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
@Override
public void startOfferService() throws Exception {
lock.readLock().lock();
}
@Override
public void endOfferService() throws Exception {
lock.readLock().unlock();
}
});
}
@AfterClass
public static void tearDownClass() throws Exception {
DataNodeFaultInjector.set(oldInjector);
}
@Before
public void setUp() throws Exception {
conf = new HdfsConfiguration();
@ -143,7 +175,6 @@ public class TestFsDatasetCache {
fsd = dn.getFSDataset();
spyNN = InternalDataNodeTestUtils.spyOnBposToNN(dn, nn);
}
@After
@ -164,18 +195,23 @@ public class TestFsDatasetCache {
}
private static void setHeartbeatResponse(DatanodeCommand[] cmds)
throws IOException {
NNHAStatusHeartbeat ha = new NNHAStatusHeartbeat(HAServiceState.ACTIVE,
fsImage.getLastAppliedOrWrittenTxId());
HeartbeatResponse response =
new HeartbeatResponse(cmds, ha, null,
ThreadLocalRandom.current().nextLong() | 1L);
doReturn(response).when(spyNN).sendHeartbeat(
(DatanodeRegistration) any(),
(StorageReport[]) any(), anyLong(), anyLong(),
anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
anyBoolean(), any(SlowPeerReports.class),
any(SlowDiskReports.class));
throws Exception {
lock.writeLock().lock();
try {
NNHAStatusHeartbeat ha = new NNHAStatusHeartbeat(HAServiceState.ACTIVE,
fsImage.getLastAppliedOrWrittenTxId());
HeartbeatResponse response =
new HeartbeatResponse(cmds, ha, null,
ThreadLocalRandom.current().nextLong() | 1L);
doReturn(response).when(spyNN).sendHeartbeat(
(DatanodeRegistration) any(),
(StorageReport[]) any(), anyLong(), anyLong(),
anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
anyBoolean(), any(SlowPeerReports.class),
any(SlowDiskReports.class));
} finally {
lock.writeLock().unlock();
}
}
private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {

View File

@ -34,6 +34,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import net.jcip.annotations.NotThreadSafe;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
@ -81,6 +82,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.HashMultimap;
@NotThreadSafe
public class TestShortCircuitCache {
static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class);
@ -723,8 +725,8 @@ public class TestShortCircuitCache {
throw new IOException("injected error into sendShmResponse");
}
}).when(failureInjector).sendShortCircuitShmResponse();
DataNodeFaultInjector prevInjector = DataNodeFaultInjector.instance;
DataNodeFaultInjector.instance = failureInjector;
DataNodeFaultInjector prevInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector.set(failureInjector);
try {
// The first read will try to allocate a shared memory segment and slot.
@ -741,7 +743,7 @@ public class TestShortCircuitCache {
cluster.getDataNodes().get(0).getShortCircuitRegistry());
LOG.info("Clearing failure injector and performing another read...");
DataNodeFaultInjector.instance = prevInjector;
DataNodeFaultInjector.set(prevInjector);
fs.getClient().getClientContext().getDomainSocketFactory().clearPathMap();