HDFS-3800. improvements to QJM fault testing. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1373587 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-08-15 19:18:57 +00:00
parent d5abe22844
commit 5789de83a7
3 changed files with 247 additions and 34 deletions

View File

@ -24,3 +24,5 @@ HDFS-3798. Avoid throwing NPE when finalizeSegment() is called on invalid segmen
HDFS-3799. QJM: handle empty log segments during recovery (todd) HDFS-3799. QJM: handle empty log segments during recovery (todd)
HDFS-3797. QJM: add segment txid as a parameter to journal() RPC (todd) HDFS-3797. QJM: add segment txid as a parameter to journal() RPC (todd)
HDFS-3800. improvements to QJM fault testing (todd)

View File

@ -61,7 +61,7 @@ public static byte[] createTxnData(int startTxn, int numTxns) throws Exception {
} }
public static void writeSegment(MiniJournalCluster cluster, public static void writeSegment(MiniJournalCluster cluster,
QuorumJournalManager qjm, int startTxId, int numTxns, QuorumJournalManager qjm, long startTxId, int numTxns,
boolean finalize) throws IOException { boolean finalize) throws IOException {
EditLogOutputStream stm = qjm.startLogSegment(startTxId); EditLogOutputStream stm = qjm.startLogSegment(startTxId);
// Should create in-progress // Should create in-progress
@ -81,7 +81,7 @@ public static void writeOp(EditLogOutputStream stm, long txid) throws IOExceptio
stm.write(op); stm.write(op);
} }
public static void writeTxns(EditLogOutputStream stm, int startTxId, int numTxns) public static void writeTxns(EditLogOutputStream stm, long startTxId, int numTxns)
throws IOException { throws IOException {
for (long txid = startTxId; txid < startTxId + numTxns; txid++) { for (long txid = startTxId; txid < startTxId + numTxns; txid++) {
writeOp(stm, txid); writeOp(stm, txid);

View File

@ -17,26 +17,39 @@
*/ */
package org.apache.hadoop.hdfs.qjournal.client; package org.apache.hadoop.hdfs.qjournal.client;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger; import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
import org.apache.hadoop.hdfs.qjournal.client.QuorumException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.junit.BeforeClass; import org.apache.hadoop.hdfs.util.Holder;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
@ -46,23 +59,27 @@
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
public class TestQJMWithFaults { public class TestQJMWithFaults {
private static final Log LOG = LogFactory.getLog( private static final Log LOG = LogFactory.getLog(
TestQJMWithFaults.class); TestQJMWithFaults.class);
private static final String RAND_SEED_PROPERTY =
"TestQJMWithFaults.random-seed";
private static final int NUM_WRITER_ITERS = 500;
private static final int SEGMENTS_PER_WRITER = 2;
private static Configuration conf = new Configuration(); private static Configuration conf = new Configuration();
static { static {
// Don't retry connections - it just slows down the tests. // Don't retry connections - it just slows down the tests.
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
}
private static long MAX_IPC_NUMBER;
// Make tests run faster by avoiding fsync()
EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
}
/** /**
* Run through the creation of a log without any faults injected, * Run through the creation of a log without any faults injected,
@ -70,10 +87,10 @@ public class TestQJMWithFaults {
* bounds for the other test cases, so they can exhaustively explore * bounds for the other test cases, so they can exhaustively explore
* the space of potential failures. * the space of potential failures.
*/ */
@BeforeClass private static long determineMaxIpcNumber() throws Exception {
public static void determineMaxIpcNumber() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build(); MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
long ret;
try { try {
QuorumJournalManager qjm = createInjectableQJM(cluster); QuorumJournalManager qjm = createInjectableQJM(cluster);
qjm.format(FAKE_NSINFO); qjm.format(FAKE_NSINFO);
@ -90,11 +107,12 @@ public static void determineMaxIpcNumber() throws Exception {
// were no failures. // were no failures.
assertEquals(1, ipcCounts.size()); assertEquals(1, ipcCounts.size());
MAX_IPC_NUMBER = ipcCounts.first(); ret = ipcCounts.first();
LOG.info("Max IPC count = " + MAX_IPC_NUMBER); LOG.info("Max IPC count = " + ret);
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }
return ret;
} }
/** /**
@ -106,6 +124,8 @@ public static void determineMaxIpcNumber() throws Exception {
*/ */
@Test @Test
public void testRecoverAfterDoubleFailures() throws Exception { public void testRecoverAfterDoubleFailures() throws Exception {
final long MAX_IPC_NUMBER = determineMaxIpcNumber();
for (int failA = 1; failA <= MAX_IPC_NUMBER; failA++) { for (int failA = 1; failA <= MAX_IPC_NUMBER; failA++) {
for (int failB = 1; failB <= MAX_IPC_NUMBER; failB++) { for (int failB = 1; failB <= MAX_IPC_NUMBER; failB++) {
String injectionStr = "(" + failA + ", " + failB + ")"; String injectionStr = "(" + failA + ", " + failB + ")";
@ -132,17 +152,16 @@ public void testRecoverAfterDoubleFailures() throws Exception {
} }
// Now should be able to recover // Now should be able to recover
try {
qjm = createInjectableQJM(cluster); qjm = createInjectableQJM(cluster);
qjm.recoverUnfinalizedSegments(); long lastRecoveredTxn = QJMTestUtil.recoverAndReturnLastTxn(qjm);
writeSegment(cluster, qjm, lastAckedTxn + 1, 3, true); assertTrue(lastRecoveredTxn >= lastAckedTxn);
// TODO: verify log segments
writeSegment(cluster, qjm, lastRecoveredTxn + 1, 3, true);
} catch (Throwable t) { } catch (Throwable t) {
// Test failure! Rethrow with the test setup info so it can be // Test failure! Rethrow with the test setup info so it can be
// easily triaged. // easily triaged.
throw new RuntimeException("Test failed with injection: " + injectionStr, throw new RuntimeException("Test failed with injection: " + injectionStr,
t); t);
}
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
cluster = null; cluster = null;
@ -151,6 +170,115 @@ public void testRecoverAfterDoubleFailures() throws Exception {
} }
} }
/**
* Test case in which three JournalNodes randomly flip flop between
* up and down states every time they get an RPC.
*
* The writer keeps track of the latest ACKed edit, and on every
* recovery operation, ensures that it recovers at least to that
* point or higher. Since at any given point, a majority of JNs
* may be injecting faults, any writer operation is allowed to fail,
* so long as the exception message indicates it failed due to injected
* faults.
*
* Given a random seed, the test should be entirely deterministic.
*/
@Test
public void testRandomized() throws Exception {
long seed;
Long userSpecifiedSeed = Long.getLong(RAND_SEED_PROPERTY);
if (userSpecifiedSeed != null) {
LOG.info("Using seed specified in system property");
seed = userSpecifiedSeed;
// If the user specifies a seed, then we should gather all the
// IPC trace information so that debugging is easier. This makes
// the test run about 25% slower otherwise.
((Log4JLogger)ProtobufRpcEngine.LOG).getLogger().setLevel(Level.ALL);
} else {
seed = new Random().nextLong();
}
LOG.info("Random seed: " + seed);
Random r = new Random(seed);
MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf)
.build();
// Format the cluster using a non-faulty QJM.
QuorumJournalManager qjmForInitialFormat =
createInjectableQJM(cluster);
qjmForInitialFormat.format(FAKE_NSINFO);
qjmForInitialFormat.close();
try {
long txid = 0;
long lastAcked = 0;
for (int i = 0; i < NUM_WRITER_ITERS; i++) {
LOG.info("Starting writer " + i + "\n-------------------");
QuorumJournalManager qjm = createRandomFaultyQJM(cluster, r);
try {
if (txid > 100) {
qjm.purgeLogsOlderThan(txid - 100);
}
long recovered;
try {
recovered = QJMTestUtil.recoverAndReturnLastTxn(qjm);
} catch (Throwable t) {
LOG.info("Failed recovery", t);
GenericTestUtils.assertExceptionContains("faking being down", t);
continue;
}
assertTrue("Recovered only up to txnid " + recovered +
" but had gotten an ack for " + lastAcked,
recovered >= lastAcked);
txid = recovered + 1;
Holder<Throwable> thrown = new Holder<Throwable>(null);
for (int j = 0; j < SEGMENTS_PER_WRITER; j++) {
lastAcked = writeSegmentUntilCrash(cluster, qjm, txid, 4, thrown);
if (thrown.held != null) {
LOG.info("Failed write", thrown.held);
GenericTestUtils.assertExceptionContains("faking being down",
thrown.held);
break;
}
txid += 4;
}
} finally {
qjm.close();
}
}
} finally {
cluster.shutdown();
}
}
private long writeSegmentUntilCrash(MiniJournalCluster cluster,
QuorumJournalManager qjm, long txid, int numTxns, Holder<Throwable> thrown) {
long firstTxId = txid;
long lastAcked = txid - 1;
try {
EditLogOutputStream stm = qjm.startLogSegment(txid);
for (int i = 0; i < numTxns; i++) {
QJMTestUtil.writeTxns(stm, txid++, 1);
lastAcked++;
}
stm.close();
qjm.finalizeLogSegment(firstTxId, lastAcked);
} catch (Throwable t) {
thrown.held = t;
}
return lastAcked;
}
/** /**
* Run a simple workload of becoming the active writer and writing * Run a simple workload of becoming the active writer and writing
* two log segments: 1-3 and 4-6. * two log segments: 1-3 and 4-6.
@ -180,6 +308,43 @@ private void failIpcNumber(AsyncLogger logger, int idx) {
((InvocationCountingChannel)logger).failIpcNumber(idx); ((InvocationCountingChannel)logger).failIpcNumber(idx);
} }
private static class RandomFaultyChannel extends IPCLoggerChannel {
private final Random random;
private float injectionProbability = 0.1f;
private boolean isUp = true;
public RandomFaultyChannel(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr, long seed) {
super(conf, nsInfo, journalId, addr);
this.random = new Random(seed);
}
@Override
protected QJournalProtocol createProxy() throws IOException {
QJournalProtocol realProxy = super.createProxy();
return mockProxy(
new WrapEveryCall<Object>(realProxy) {
@Override
void beforeCall(InvocationOnMock invocation) throws Exception {
if (random.nextFloat() < injectionProbability) {
isUp = !isUp;
LOG.info("transitioned " + addr + " to " +
(isUp ? "up" : "down"));
}
if (!isUp) {
throw new IOException("Injected - faking being down");
}
}
});
}
@Override
protected ExecutorService createExecutor() {
return MoreExecutors.sameThreadExecutor();
}
}
private static class InvocationCountingChannel extends IPCLoggerChannel { private static class InvocationCountingChannel extends IPCLoggerChannel {
private int rpcCount = 0; private int rpcCount = 0;
private Map<Integer, Callable<Void>> injections = Maps.newHashMap(); private Map<Integer, Callable<Void>> injections = Maps.newHashMap();
@ -211,10 +376,9 @@ private void inject(int beforeRpcNumber, Callable<Void> injectedCode) {
@Override @Override
protected QJournalProtocol createProxy() throws IOException { protected QJournalProtocol createProxy() throws IOException {
final QJournalProtocol realProxy = super.createProxy(); final QJournalProtocol realProxy = super.createProxy();
QJournalProtocol mock = Mockito.mock(QJournalProtocol.class, QJournalProtocol mock = mockProxy(
new Answer<Object>() { new WrapEveryCall<Object>(realProxy) {
@Override void beforeCall(InvocationOnMock invocation) throws Exception {
public Object answer(InvocationOnMock invocation) throws Throwable {
rpcCount++; rpcCount++;
String callStr = "[" + addr + "] " + String callStr = "[" + addr + "] " +
invocation.getMethod().getName() + "(" + invocation.getMethod().getName() + "(" +
@ -228,15 +392,45 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
} else { } else {
LOG.info("IPC call #" + rpcCount + ": " + callStr); LOG.info("IPC call #" + rpcCount + ": " + callStr);
} }
return invocation.getMethod().invoke(realProxy,
invocation.getArguments());
} }
}); });
return mock; return mock;
} }
} }
private static QJournalProtocol mockProxy(WrapEveryCall<Object> wrapper)
throws IOException {
QJournalProtocol mock = Mockito.mock(QJournalProtocol.class,
Mockito.withSettings()
.defaultAnswer(wrapper)
.extraInterfaces(Closeable.class));
Mockito.doNothing().when((Closeable)mock).close();
return mock;
}
private static abstract class WrapEveryCall<T> implements Answer<T> {
private final Object realObj;
WrapEveryCall(Object realObj) {
this.realObj = realObj;
}
@SuppressWarnings("unchecked")
@Override
public T answer(InvocationOnMock invocation) throws Throwable {
beforeCall(invocation);
try {
return (T) invocation.getMethod().invoke(realObj,
invocation.getArguments());
} catch (InvocationTargetException ite) {
throw ite.getCause();
}
}
abstract void beforeCall(InvocationOnMock invocation) throws Exception;
}
private static QuorumJournalManager createInjectableQJM(MiniJournalCluster cluster) private static QuorumJournalManager createInjectableQJM(MiniJournalCluster cluster)
throws IOException, URISyntaxException { throws IOException, URISyntaxException {
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() { AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
@ -249,4 +443,21 @@ public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
return new QuorumJournalManager(conf, cluster.getQuorumJournalURI(JID), return new QuorumJournalManager(conf, cluster.getQuorumJournalURI(JID),
FAKE_NSINFO, spyFactory); FAKE_NSINFO, spyFactory);
} }
private static QuorumJournalManager createRandomFaultyQJM(
MiniJournalCluster cluster, final Random seedGenerator)
throws IOException, URISyntaxException {
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) {
return new RandomFaultyChannel(conf, nsInfo, journalId, addr,
seedGenerator.nextLong());
}
};
return new QuorumJournalManager(conf, cluster.getQuorumJournalURI(JID),
FAKE_NSINFO, spyFactory);
}
} }