HDFS-3725. Fix QJM startup when individual JNs have gaps. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1367366 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-07-30 23:35:22 +00:00
parent e1dff3df99
commit e08604907c
3 changed files with 88 additions and 9 deletions

View File

@ -8,3 +8,5 @@ HDFS-3694. Fix getEditLogManifest to fetch httpPort if necessary (todd)
HDFS-3692. Support purgeEditLogs() call to remotely purge logs on JNs (todd) HDFS-3692. Support purgeEditLogs() call to remotely purge logs on JNs (todd)
HDFS-3693. JNStorage should read its storage info even before a writer becomes active (todd) HDFS-3693. JNStorage should read its storage info even before a writer becomes active (todd)
HDFS-3725. Fix QJM startup when individual JNs have gaps (todd)

View File

@ -40,8 +40,8 @@ public class RemoteEditLogManifest {
/** /**
* Check that the logs are contiguous and non-overlapping * Check that the logs are non-overlapping sequences of transactions,
* sequences of transactions, in sorted order * in sorted order. They do not need to be contiguous.
* @throws IllegalStateException if incorrect * @throws IllegalStateException if incorrect
*/ */
private void checkState() { private void checkState() {
@ -50,8 +50,10 @@ public class RemoteEditLogManifest {
RemoteEditLog prev = null; RemoteEditLog prev = null;
for (RemoteEditLog log : logs) { for (RemoteEditLog log : logs) {
if (prev != null) { if (prev != null) {
if (log.getStartTxId() != prev.getEndTxId() + 1) { if (log.getStartTxId() <= prev.getEndTxId()) {
throw new IllegalStateException("Invalid log manifest:" + this); throw new IllegalStateException(
"Invalid log manifest (log " + log + " overlaps " + prev + ")\n"
+ this);
} }
} }

View File

@ -23,12 +23,15 @@ import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.SortedSet; import java.util.SortedSet;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger; import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel; import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
@ -43,7 +46,9 @@ import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -68,9 +73,16 @@ public class TestQuorumJournalManager {
private QuorumJournalManager qjm; private QuorumJournalManager qjm;
private List<AsyncLogger> spies; private List<AsyncLogger> spies;
static {
((Log4JLogger)ProtobufRpcEngine.LOG).getLogger().setLevel(Level.ALL);
}
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
conf = new Configuration(); conf = new Configuration();
// Don't retry connections - it just slows down the tests.
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
cluster = new MiniJournalCluster.Builder(conf) cluster = new MiniJournalCluster.Builder(conf)
.build(); .build();
@ -117,11 +129,7 @@ public class TestQuorumJournalManager {
assertEquals(1, stream.getFirstTxId()); assertEquals(1, stream.getFirstTxId());
assertEquals(3, stream.getLastTxId()); assertEquals(3, stream.getLastTxId());
for (int i = 1; i <= 3; i++) { verifyEdits(streams, 1, 3);
FSEditLogOp op = stream.readOp();
assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode);
assertEquals(i, op.getTransactionId());
}
assertNull(stream.readOp()); assertNull(stream.readOp());
} finally { } finally {
IOUtils.cleanup(LOG, streams.toArray(new Closeable[0])); IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
@ -137,6 +145,7 @@ public class TestQuorumJournalManager {
EditLogInputStream stream = streams.get(0); EditLogInputStream stream = streams.get(0);
assertEquals(1, stream.getFirstTxId()); assertEquals(1, stream.getFirstTxId());
assertEquals(3, stream.getLastTxId()); assertEquals(3, stream.getLastTxId());
verifyEdits(streams, 1, 3);
} finally { } finally {
IOUtils.cleanup(LOG, streams.toArray(new Closeable[0])); IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
streams.clear(); streams.clear();
@ -153,12 +162,43 @@ public class TestQuorumJournalManager {
assertEquals(2, streams.size()); assertEquals(2, streams.size());
assertEquals(4, streams.get(1).getFirstTxId()); assertEquals(4, streams.get(1).getFirstTxId());
assertEquals(6, streams.get(1).getLastTxId()); assertEquals(6, streams.get(1).getLastTxId());
verifyEdits(streams, 1, 6);
} finally { } finally {
IOUtils.cleanup(LOG, streams.toArray(new Closeable[0])); IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
streams.clear(); streams.clear();
} }
} }
/**
* Regression test for HDFS-3725. One of the journal nodes is down
* during the writing of one segment, then comes back up later to
* take part in a later segment. Thus, its local edits are
* not a contiguous sequence. This should be handled correctly.
*/
@Test
public void testOneJNMissingSegments() throws Exception {
writeSegment(qjm, 1, 3, true);
waitForAllPendingCalls(qjm.getLoggerSetForTests());
cluster.getJournalNode(0).stopAndJoin(0);
writeSegment(qjm, 4, 3, true);
waitForAllPendingCalls(qjm.getLoggerSetForTests());
cluster.restartJournalNode(0);
writeSegment(qjm, 7, 3, true);
waitForAllPendingCalls(qjm.getLoggerSetForTests());
cluster.getJournalNode(1).stopAndJoin(0);
QuorumJournalManager readerQjm = createSpyingQJM();
List<EditLogInputStream> streams = Lists.newArrayList();
try {
readerQjm.selectInputStreams(streams, 1, false);
verifyEdits(streams, 1, 9);
} finally {
IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
readerQjm.close();
}
}
/** /**
* TODO: this test needs to be fleshed out to be an exhaustive failure test * TODO: this test needs to be fleshed out to be an exhaustive failure test
* @throws Exception * @throws Exception
@ -425,6 +465,41 @@ public class TestQuorumJournalManager {
stm.flush(); stm.flush();
} }
/**
* Verify that the given list of streams contains exactly the range of
* transactions specified, inclusive.
*/
private void verifyEdits(List<EditLogInputStream> streams,
int firstTxnId, int lastTxnId) throws IOException {
Iterator<EditLogInputStream> iter = streams.iterator();
assertTrue(iter.hasNext());
EditLogInputStream stream = iter.next();
for (int expected = firstTxnId;
expected <= lastTxnId;
expected++) {
FSEditLogOp op = stream.readOp();
while (op == null) {
assertTrue("Expected to find txid " + expected + ", " +
"but no more streams available to read from",
iter.hasNext());
stream = iter.next();
op = stream.readOp();
}
assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode);
assertEquals(expected, op.getTransactionId());
}
assertNull(stream.readOp());
assertFalse("Expected no more txns after " + lastTxnId +
" but more streams are available", iter.hasNext());
}
private static void waitForAllPendingCalls(AsyncLoggerSet als) private static void waitForAllPendingCalls(AsyncLoggerSet als)
throws InterruptedException { throws InterruptedException {
for (AsyncLogger l : als.getLoggersForTests()) { for (AsyncLogger l : als.getLoggersForTests()) {