HDFS-3799. QJM: handle empty log segments during recovery. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1373183 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-08-15 00:57:24 +00:00
parent 4a9b3c693d
commit c95a1674b6
8 changed files with 321 additions and 45 deletions

View File

@ -20,3 +20,5 @@ HDFS-3793. Implement genericized format() in QJM (todd)
HDFS-3795. QJM: validate journal dir at startup (todd)
HDFS-3798. Avoid throwing NPE when finalizeSegment() is called on invalid segment (todd)
HDFS-3799. QJM: handle empty log segments during recovery (todd)

View File

@ -23,7 +23,9 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -111,15 +113,8 @@ public class IPCLoggerChannel implements AsyncLogger {
DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT);
executor = MoreExecutors.listeningDecorator(
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Logger channel to " + addr)
.setUncaughtExceptionHandler(
UncaughtExceptionHandlers.systemExit())
.build()));
createExecutor());
}
@Override
public synchronized void setEpoch(long epoch) {
this.epoch = epoch;
@ -154,6 +149,21 @@ public class IPCLoggerChannel implements AsyncLogger {
return new QJournalProtocolTranslatorPB(pbproxy);
}
/**
* Separated out for easy overriding in tests.
*/
@VisibleForTesting
protected ExecutorService createExecutor() {
return Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Logger channel to " + addr)
.setUncaughtExceptionHandler(
UncaughtExceptionHandlers.systemExit())
.build());
}
@Override
public URL buildURLToFetchLogs(long segmentTxId) {
Preconditions.checkArgument(segmentTxId > 0,

View File

@ -235,15 +235,30 @@ public class QuorumJournalManager implements JournalManager {
} else if (bestResponse.hasSegmentState()) {
LOG.info("Using longest log: " + bestEntry);
} else {
// TODO: can we get here? what about the following case:
// - 3 JNs, JN1, JN2, JN3
// - writer starts segment 101 on JN1, then crashes
// - during newEpoch(), we saw the segment on JN1 and decide to recover segment 101
// - during prepare(), JN1 has actually crashed, and we only talk to JN2 and JN3,
// None of the responses to prepareRecovery() had a segment at the given
// txid. This can happen for example in the following situation:
// - 3 JNs: JN1, JN2, JN3
// - writer starts segment 101 on JN1, then crashes before
// writing to JN2 and JN3
// - during newEpoch(), we saw the segment on JN1 and decide to
// recover segment 101
// - before prepare(), JN1 crashes, and we only talk to JN2 and JN3,
// neither of which has any entry for this log.
// Write a test case.
throw new AssertionError("None of the responses " +
"had a log to recover: " + QuorumCall.mapToString(prepareResponses));
// In this case, it is allowed to do nothing for recovery, since the
// segment wasn't started on a quorum of nodes.
// Sanity check: we should only get here if none of the responses had
// a log. This should be a postcondition of the recovery comparator,
// but a bug in the comparator might cause us to get here.
for (PrepareRecoveryResponseProto resp : prepareResponses.values()) {
assert !resp.hasSegmentState() :
"One of the loggers had a response, but no best logger " +
"was found.";
}
LOG.info("None of the responders had a log to recover: " +
QuorumCall.mapToString(prepareResponses));
return;
}

View File

@ -101,9 +101,21 @@ class Journal implements Closeable {
}
LOG.info("Scanning storage " + fjm);
List<EditLogFile> files = fjm.getLogFiles(0);
if (!files.isEmpty()) {
EditLogFile latestLog = files.get(files.size() - 1);
LOG.info("Latest log is " + latestLog);
if (files.isEmpty()) {
curSegmentTxId = HdfsConstants.INVALID_TXID;
return;
}
EditLogFile latestLog = files.get(files.size() - 1);
latestLog.validateLog();
LOG.info("Latest log is " + latestLog);
if (latestLog.getLastTxId() == HdfsConstants.INVALID_TXID) {
// the log contains no transactions
LOG.warn("Latest log " + latestLog + " has no transactions. " +
"moving it aside");
latestLog.moveAsideEmptyFile();
curSegmentTxId = HdfsConstants.INVALID_TXID;
} else {
curSegmentTxId = latestLog.getFirstTxId();
}
}
@ -166,6 +178,7 @@ class Journal implements Closeable {
if (curSegment != null) {
curSegment.close();
curSegment = null;
curSegmentTxId = HdfsConstants.INVALID_TXID;
}
NewEpochResponseProto.Builder builder =
@ -248,10 +261,37 @@ class Journal implements Closeable {
checkRequest(reqInfo);
checkFormatted();
Preconditions.checkState(curSegment == null,
"Can't start a log segment, already writing " + curSegment);
Preconditions.checkState(nextTxId == txid || nextTxId == HdfsConstants.INVALID_TXID,
"Can't start log segment " + txid + " expecting nextTxId=" + nextTxId);
if (curSegment != null) {
LOG.warn("Client is requesting a new log segment " + txid +
" though we are already writing " + curSegment + ". " +
"Aborting the current segment in order to begin the new one.");
// The writer may have lost a connection to us and is now
// re-connecting after the connection came back.
// We should abort our own old segment.
curSegment.abort();
curSegment = null;
}
// Paranoid sanity check: we should never overwrite a finalized log file.
// Additionally, if it's in-progress, it should have at most 1 transaction.
// This can happen if the writer crashes exactly at the start of a segment.
EditLogFile existing = fjm.getLogFile(txid);
if (existing != null) {
if (!existing.isInProgress()) {
throw new IllegalStateException("Already have a finalized segment " +
existing + " beginning at " + txid);
}
// If it's in-progress, it should only contain one transaction,
// because the "startLogSegment" transaction is written alone at the
// start of each segment.
existing.validateLog();
if (existing.getLastTxId() != existing.getFirstTxId()) {
throw new IllegalStateException("The log file " +
existing + " seems to contain valid transactions");
}
}
curSegment = fjm.startLogSegment(txid);
curSegmentTxId = txid;
nextTxId = txid;
@ -360,9 +400,10 @@ class Journal implements Closeable {
elf.validateLog();
}
if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) {
// no transactions in file
throw new AssertionError("TODO: no transactions in file " +
elf);
LOG.info("Edit log file " + elf + " appears to be empty. " +
"Moving it aside...");
elf.moveAsideEmptyFile();
return null;
}
SegmentStateProto ret = SegmentStateProto.newBuilder()
.setStartTxId(segmentTxId)
@ -433,13 +474,16 @@ class Journal implements Closeable {
}
SegmentStateProto currentSegment = getSegmentInfo(segmentTxId);
// TODO: this can be null, in the case that one of the loggers started
// the next segment, but others did not! add regression test and null
// check in next condition below.
// TODO: what if they have the same length but one is finalized and the
// other isn't! cover that case.
if (currentSegment.getEndTxId() != segment.getEndTxId()) {
if (currentSegment == null ||
currentSegment.getEndTxId() != segment.getEndTxId()) {
if (currentSegment == null) {
LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
": no current segment in place");
} else {
LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
": old segment " + TextFormat.shortDebugString(segment) + " is " +
"not the right length");
}
syncLog(reqInfo, segment, fromUrl);
} else {
LOG.info("Skipping download of log " +
@ -481,14 +525,6 @@ class Journal implements Closeable {
try {
success = tmpFile.renameTo(storage.getInProgressEditLog(
segment.getStartTxId()));
if (success) {
// If we're synchronizing the latest segment, update our cached
// info.
// TODO: can this be done more generally?
if (curSegmentTxId == segment.getStartTxId()) {
nextTxId = segment.getEndTxId() + 1;
}
}
} finally {
if (!success) {
if (!tmpFile.delete()) {

View File

@ -460,7 +460,7 @@ public class FileJournalManager implements JournalManager {
renameSelf(".corrupt");
}
void moveAsideEmptyFile() throws IOException {
public void moveAsideEmptyFile() throws IOException {
assert lastTxId == HdfsConstants.INVALID_TXID;
renameSelf(".empty");
}

View File

@ -30,6 +30,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -54,6 +56,7 @@ import org.junit.Test;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
/**
* Functional tests for QuorumJournalManager.
@ -206,6 +209,124 @@ public class TestQuorumJournalManager {
}
}
/**
* Test the case where the NN crashes after starting a new segment
* on all nodes, but before writing the first transaction to it.
*/
@Test
public void testCrashAtBeginningOfSegment() throws Exception {
writeSegment(cluster, qjm, 1, 3, true);
waitForAllPendingCalls(qjm.getLoggerSetForTests());
EditLogOutputStream stm = qjm.startLogSegment(4);
try {
waitForAllPendingCalls(qjm.getLoggerSetForTests());
} finally {
stm.abort();
}
// Make a new QJM
qjm = new QuorumJournalManager(
conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO);
qjm.recoverUnfinalizedSegments();
checkRecovery(cluster, 1, 3);
writeSegment(cluster, qjm, 4, 3, true);
}
@Test
public void testOutOfSyncAtBeginningOfSegment0() throws Exception {
doTestOutOfSyncAtBeginningOfSegment(0);
}
@Test
public void testOutOfSyncAtBeginningOfSegment1() throws Exception {
doTestOutOfSyncAtBeginningOfSegment(1);
}
@Test
public void testOutOfSyncAtBeginningOfSegment2() throws Exception {
doTestOutOfSyncAtBeginningOfSegment(2);
}
/**
* Test the case where, at the beginning of a segment, transactions
* have been written to one JN but not others.
*/
public void doTestOutOfSyncAtBeginningOfSegment(int nodeWithOneTxn)
throws Exception {
int nodeWithEmptySegment = (nodeWithOneTxn + 1) % 3;
int nodeMissingSegment = (nodeWithOneTxn + 2) % 3;
writeSegment(cluster, qjm, 1, 3, true);
waitForAllPendingCalls(qjm.getLoggerSetForTests());
cluster.getJournalNode(nodeMissingSegment).stopAndJoin(0);
// Open segment on 2/3 nodes
EditLogOutputStream stm = qjm.startLogSegment(4);
try {
waitForAllPendingCalls(qjm.getLoggerSetForTests());
// Write transactions to only 1/3 nodes
failLoggerAtTxn(spies.get(nodeWithEmptySegment), 4);
try {
writeTxns(stm, 4, 1);
fail("Did not fail even though 2/3 failed");
} catch (QuorumException qe) {
GenericTestUtils.assertExceptionContains("mock failure", qe);
}
} finally {
stm.abort();
}
// Bring back the down JN.
cluster.restartJournalNode(nodeMissingSegment);
// Make a new QJM. At this point, the state is as follows:
// A: nodeWithEmptySegment: 1-3 finalized, 4_inprogress (empty)
// B: nodeWithOneTxn: 1-3 finalized, 4_inprogress (1 txn)
// C: nodeMissingSegment: 1-3 finalized
GenericTestUtils.assertGlobEquals(
cluster.getCurrentDir(nodeWithEmptySegment, JID),
"edits_.*",
NNStorage.getFinalizedEditsFileName(1, 3),
NNStorage.getInProgressEditsFileName(4));
GenericTestUtils.assertGlobEquals(
cluster.getCurrentDir(nodeWithOneTxn, JID),
"edits_.*",
NNStorage.getFinalizedEditsFileName(1, 3),
NNStorage.getInProgressEditsFileName(4));
GenericTestUtils.assertGlobEquals(
cluster.getCurrentDir(nodeMissingSegment, JID),
"edits_.*",
NNStorage.getFinalizedEditsFileName(1, 3));
// Stop one of the nodes. Since we run this test three
// times, rotating the roles of the nodes, we'll test
// all the permutations.
cluster.getJournalNode(2).stopAndJoin(0);
qjm = createSpyingQJM();
qjm.recoverUnfinalizedSegments();
if (nodeWithOneTxn == 0 ||
nodeWithOneTxn == 1) {
// If the node that had the transaction committed was one of the nodes
// that responded during recovery, then we should have recovered txid
// 4.
checkRecovery(cluster, 4, 4);
writeSegment(cluster, qjm, 5, 3, true);
} else {
// Otherwise, we should have recovered only 1-3 and should be able to
// start a segment at 4.
checkRecovery(cluster, 1, 3);
writeSegment(cluster, qjm, 4, 3, true);
}
}
/**
* Test case where a new writer picks up from an old one with no failures
@ -408,8 +529,15 @@ public class TestQuorumJournalManager {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) {
return Mockito.spy(IPCLoggerChannel.FACTORY.createLogger(
conf, nsInfo, journalId, addr));
AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, addr) {
protected ExecutorService createExecutor() {
// Don't parallelize calls to the quorum in the tests.
// This makes the tests more deterministic.
return MoreExecutors.sameThreadExecutor();
}
};
return Mockito.spy(logger);
}
};
return new QuorumJournalManager(

View File

@ -204,6 +204,89 @@ public class TestJournal {
"No log file to finalize at transaction ID 1000", ise);
}
}
/**
* Assume that a client is writing to a journal, but loses its connection
* in the middle of a segment. Thus, any future journal() calls in that
* segment may fail, because some txns were missed while the connection was
* down.
*
* Eventually, the connection comes back, and the NN tries to start a new
* segment at a higher txid. This should abort the old one and succeed.
*/
@Test
public void testAbortOldSegmentIfFinalizeIsMissed() throws Exception {
journal.newEpoch(FAKE_NSINFO, 1);
// Start a segment at txid 1, and write a batch of 3 txns.
journal.startLogSegment(makeRI(1), 1);
journal.journal(makeRI(2), 1, 3,
QJMTestUtil.createTxnData(1, 3));
GenericTestUtils.assertExists(
journal.getStorage().getInProgressEditLog(1));
// Try to start new segment at txid 6, this should abort old segment and
// then succeed, allowing us to write txid 6-9.
journal.startLogSegment(makeRI(3), 6);
journal.journal(makeRI(4), 6, 3,
QJMTestUtil.createTxnData(6, 3));
// The old segment should *not* be finalized.
GenericTestUtils.assertExists(
journal.getStorage().getInProgressEditLog(1));
GenericTestUtils.assertExists(
journal.getStorage().getInProgressEditLog(6));
}
/**
* Test behavior of startLogSegment() when a segment with the
* same transaction ID already exists.
*/
@Test
public void testStartLogSegmentWhenAlreadyExists() throws Exception {
journal.newEpoch(FAKE_NSINFO, 1);
// Start a segment at txid 1, and write just 1 transaction. This
// would normally be the START_LOG_SEGMENT transaction.
journal.startLogSegment(makeRI(1), 1);
journal.journal(makeRI(2), 1, 1,
QJMTestUtil.createTxnData(1, 1));
// Try to start new segment at txid 1, this should succeed, because
// we are allowed to re-start a segment if we only ever had the
// START_LOG_SEGMENT transaction logged.
journal.startLogSegment(makeRI(3), 1);
journal.journal(makeRI(4), 1, 1,
QJMTestUtil.createTxnData(1, 1));
// This time through, write more transactions afterwards, simulating
// real user transactions.
journal.journal(makeRI(5), 2, 3,
QJMTestUtil.createTxnData(2, 3));
try {
journal.startLogSegment(makeRI(6), 1);
fail("Did not fail to start log segment which would overwrite " +
"an existing one");
} catch (IllegalStateException ise) {
GenericTestUtils.assertExceptionContains(
"seems to contain valid transactions", ise);
}
journal.finalizeLogSegment(makeRI(7), 1, 4);
// Ensure that we cannot overwrite a finalized segment
try {
journal.startLogSegment(makeRI(8), 1);
fail("Did not fail to start log segment which would overwrite " +
"an existing one");
} catch (IllegalStateException ise) {
GenericTestUtils.assertExceptionContains(
"have a finalized segment", ise);
}
}
private static RequestInfo makeRI(int serial) {
return new RequestInfo(JID, 1, serial);

View File

@ -118,7 +118,9 @@ public class TestJournalNode {
ch.startLogSegment(3).get();
response = ch.newEpoch(4).get();
ch.setEpoch(4);
assertEquals(3, response.getLastSegmentTxId());
// Because the new segment is empty, it is equivalent to not having
// started writing it.
assertEquals(0, response.getLastSegmentTxId());
}
@Test