diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt index 74e443d74d6..4544210a7a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt @@ -20,3 +20,5 @@ HDFS-3793. Implement genericized format() in QJM (todd) HDFS-3795. QJM: validate journal dir at startup (todd) HDFS-3798. Avoid throwing NPE when finalizeSegment() is called on invalid segment (todd) + +HDFS-3799. QJM: handle empty log segments during recovery (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java index f17ad5e5ef2..89d2858c1ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java @@ -23,7 +23,9 @@ import java.net.MalformedURLException; import java.net.URL; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -111,15 +113,8 @@ public class IPCLoggerChannel implements AsyncLogger { DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT); executor = MoreExecutors.listeningDecorator( - Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("Logger channel to " + addr) - .setUncaughtExceptionHandler( - UncaughtExceptionHandlers.systemExit()) - .build())); + createExecutor()); } - @Override public synchronized void setEpoch(long epoch) { this.epoch = epoch; @@ -154,6 +149,21 @@ public class IPCLoggerChannel implements AsyncLogger { return new QJournalProtocolTranslatorPB(pbproxy); } + + /** + * Separated out for easy overriding in tests. + */ + @VisibleForTesting + protected ExecutorService createExecutor() { + return Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Logger channel to " + addr) + .setUncaughtExceptionHandler( + UncaughtExceptionHandlers.systemExit()) + .build()); + } + @Override public URL buildURLToFetchLogs(long segmentTxId) { Preconditions.checkArgument(segmentTxId > 0, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index 1692771d452..c5ae384d94c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -235,15 +235,30 @@ public class QuorumJournalManager implements JournalManager { } else if (bestResponse.hasSegmentState()) { LOG.info("Using longest log: " + bestEntry); } else { - // TODO: can we get here? what about the following case: - // - 3 JNs, JN1, JN2, JN3 - // - writer starts segment 101 on JN1, then crashes - // - during newEpoch(), we saw the segment on JN1 and decide to recover segment 101 - // - during prepare(), JN1 has actually crashed, and we only talk to JN2 and JN3, + // None of the responses to prepareRecovery() had a segment at the given + // txid. This can happen for example in the following situation: + // - 3 JNs: JN1, JN2, JN3 + // - writer starts segment 101 on JN1, then crashes before + // writing to JN2 and JN3 + // - during newEpoch(), we saw the segment on JN1 and decide to + // recover segment 101 + // - before prepare(), JN1 crashes, and we only talk to JN2 and JN3, // neither of which has any entry for this log. - // Write a test case. - throw new AssertionError("None of the responses " + - "had a log to recover: " + QuorumCall.mapToString(prepareResponses)); + // In this case, it is allowed to do nothing for recovery, since the + // segment wasn't started on a quorum of nodes. + + // Sanity check: we should only get here if none of the responses had + // a log. This should be a postcondition of the recovery comparator, + // but a bug in the comparator might cause us to get here. + for (PrepareRecoveryResponseProto resp : prepareResponses.values()) { + assert !resp.hasSegmentState() : + "One of the loggers had a response, but no best logger " + + "was found."; + } + + LOG.info("None of the responders had a log to recover: " + + QuorumCall.mapToString(prepareResponses)); + return; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index cf2c11d885d..2014267d812 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -101,9 +101,21 @@ class Journal implements Closeable { } LOG.info("Scanning storage " + fjm); List files = fjm.getLogFiles(0); - if (!files.isEmpty()) { - EditLogFile latestLog = files.get(files.size() - 1); - LOG.info("Latest log is " + latestLog); + if (files.isEmpty()) { + curSegmentTxId = HdfsConstants.INVALID_TXID; + return; + } + + EditLogFile latestLog = files.get(files.size() - 1); + latestLog.validateLog(); + LOG.info("Latest log is " + latestLog); + if (latestLog.getLastTxId() == HdfsConstants.INVALID_TXID) { + // the log contains no transactions + LOG.warn("Latest log " + latestLog + " has no transactions. " + + "moving it aside"); + latestLog.moveAsideEmptyFile(); + curSegmentTxId = HdfsConstants.INVALID_TXID; + } else { curSegmentTxId = latestLog.getFirstTxId(); } } @@ -166,6 +178,7 @@ class Journal implements Closeable { if (curSegment != null) { curSegment.close(); curSegment = null; + curSegmentTxId = HdfsConstants.INVALID_TXID; } NewEpochResponseProto.Builder builder = @@ -248,10 +261,37 @@ class Journal implements Closeable { checkRequest(reqInfo); checkFormatted(); - Preconditions.checkState(curSegment == null, - "Can't start a log segment, already writing " + curSegment); - Preconditions.checkState(nextTxId == txid || nextTxId == HdfsConstants.INVALID_TXID, - "Can't start log segment " + txid + " expecting nextTxId=" + nextTxId); + if (curSegment != null) { + LOG.warn("Client is requesting a new log segment " + txid + + " though we are already writing " + curSegment + ". " + + "Aborting the current segment in order to begin the new one."); + // The writer may have lost a connection to us and is now + // re-connecting after the connection came back. + // We should abort our own old segment. + curSegment.abort(); + curSegment = null; + } + + // Paranoid sanity check: we should never overwrite a finalized log file. + // Additionally, if it's in-progress, it should have at most 1 transaction. + // This can happen if the writer crashes exactly at the start of a segment. + EditLogFile existing = fjm.getLogFile(txid); + if (existing != null) { + if (!existing.isInProgress()) { + throw new IllegalStateException("Already have a finalized segment " + + existing + " beginning at " + txid); + } + + // If it's in-progress, it should only contain one transaction, + // because the "startLogSegment" transaction is written alone at the + // start of each segment. + existing.validateLog(); + if (existing.getLastTxId() != existing.getFirstTxId()) { + throw new IllegalStateException("The log file " + + existing + " seems to contain valid transactions"); + } + } + curSegment = fjm.startLogSegment(txid); curSegmentTxId = txid; nextTxId = txid; @@ -360,9 +400,10 @@ class Journal implements Closeable { elf.validateLog(); } if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) { - // no transactions in file - throw new AssertionError("TODO: no transactions in file " + - elf); + LOG.info("Edit log file " + elf + " appears to be empty. " + + "Moving it aside..."); + elf.moveAsideEmptyFile(); + return null; } SegmentStateProto ret = SegmentStateProto.newBuilder() .setStartTxId(segmentTxId) @@ -433,13 +474,16 @@ class Journal implements Closeable { } SegmentStateProto currentSegment = getSegmentInfo(segmentTxId); - // TODO: this can be null, in the case that one of the loggers started - // the next segment, but others did not! add regression test and null - // check in next condition below. - - // TODO: what if they have the same length but one is finalized and the - // other isn't! cover that case. - if (currentSegment.getEndTxId() != segment.getEndTxId()) { + if (currentSegment == null || + currentSegment.getEndTxId() != segment.getEndTxId()) { + if (currentSegment == null) { + LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) + + ": no current segment in place"); + } else { + LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) + + ": old segment " + TextFormat.shortDebugString(segment) + " is " + + "not the right length"); + } syncLog(reqInfo, segment, fromUrl); } else { LOG.info("Skipping download of log " + @@ -481,14 +525,6 @@ class Journal implements Closeable { try { success = tmpFile.renameTo(storage.getInProgressEditLog( segment.getStartTxId())); - if (success) { - // If we're synchronizing the latest segment, update our cached - // info. - // TODO: can this be done more generally? - if (curSegmentTxId == segment.getStartTxId()) { - nextTxId = segment.getEndTxId() + 1; - } - } } finally { if (!success) { if (!tmpFile.delete()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java index c60d895166e..a28985293a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java @@ -460,7 +460,7 @@ public class FileJournalManager implements JournalManager { renameSelf(".corrupt"); } - void moveAsideEmptyFile() throws IOException { + public void moveAsideEmptyFile() throws IOException { assert lastTxId == HdfsConstants.INVALID_TXID; renameSelf(".empty"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index 891bd42bfe2..afe851388a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -30,6 +30,8 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,6 +56,7 @@ import org.junit.Test; import org.mockito.Mockito; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.MoreExecutors; /** * Functional tests for QuorumJournalManager. @@ -206,6 +209,124 @@ public class TestQuorumJournalManager { } } + /** + * Test the case where the NN crashes after starting a new segment + * on all nodes, but before writing the first transaction to it. + */ + @Test + public void testCrashAtBeginningOfSegment() throws Exception { + writeSegment(cluster, qjm, 1, 3, true); + waitForAllPendingCalls(qjm.getLoggerSetForTests()); + + EditLogOutputStream stm = qjm.startLogSegment(4); + try { + waitForAllPendingCalls(qjm.getLoggerSetForTests()); + } finally { + stm.abort(); + } + + + // Make a new QJM + qjm = new QuorumJournalManager( + conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO); + qjm.recoverUnfinalizedSegments(); + checkRecovery(cluster, 1, 3); + + writeSegment(cluster, qjm, 4, 3, true); + } + + @Test + public void testOutOfSyncAtBeginningOfSegment0() throws Exception { + doTestOutOfSyncAtBeginningOfSegment(0); + } + + @Test + public void testOutOfSyncAtBeginningOfSegment1() throws Exception { + doTestOutOfSyncAtBeginningOfSegment(1); + } + + @Test + public void testOutOfSyncAtBeginningOfSegment2() throws Exception { + doTestOutOfSyncAtBeginningOfSegment(2); + } + + /** + * Test the case where, at the beginning of a segment, transactions + * have been written to one JN but not others. + */ + public void doTestOutOfSyncAtBeginningOfSegment(int nodeWithOneTxn) + throws Exception { + + int nodeWithEmptySegment = (nodeWithOneTxn + 1) % 3; + int nodeMissingSegment = (nodeWithOneTxn + 2) % 3; + + writeSegment(cluster, qjm, 1, 3, true); + waitForAllPendingCalls(qjm.getLoggerSetForTests()); + cluster.getJournalNode(nodeMissingSegment).stopAndJoin(0); + + // Open segment on 2/3 nodes + EditLogOutputStream stm = qjm.startLogSegment(4); + try { + waitForAllPendingCalls(qjm.getLoggerSetForTests()); + + // Write transactions to only 1/3 nodes + failLoggerAtTxn(spies.get(nodeWithEmptySegment), 4); + try { + writeTxns(stm, 4, 1); + fail("Did not fail even though 2/3 failed"); + } catch (QuorumException qe) { + GenericTestUtils.assertExceptionContains("mock failure", qe); + } + } finally { + stm.abort(); + } + + // Bring back the down JN. + cluster.restartJournalNode(nodeMissingSegment); + + // Make a new QJM. At this point, the state is as follows: + // A: nodeWithEmptySegment: 1-3 finalized, 4_inprogress (empty) + // B: nodeWithOneTxn: 1-3 finalized, 4_inprogress (1 txn) + // C: nodeMissingSegment: 1-3 finalized + GenericTestUtils.assertGlobEquals( + cluster.getCurrentDir(nodeWithEmptySegment, JID), + "edits_.*", + NNStorage.getFinalizedEditsFileName(1, 3), + NNStorage.getInProgressEditsFileName(4)); + GenericTestUtils.assertGlobEquals( + cluster.getCurrentDir(nodeWithOneTxn, JID), + "edits_.*", + NNStorage.getFinalizedEditsFileName(1, 3), + NNStorage.getInProgressEditsFileName(4)); + GenericTestUtils.assertGlobEquals( + cluster.getCurrentDir(nodeMissingSegment, JID), + "edits_.*", + NNStorage.getFinalizedEditsFileName(1, 3)); + + + // Stop one of the nodes. Since we run this test three + // times, rotating the roles of the nodes, we'll test + // all the permutations. + cluster.getJournalNode(2).stopAndJoin(0); + + qjm = createSpyingQJM(); + qjm.recoverUnfinalizedSegments(); + + if (nodeWithOneTxn == 0 || + nodeWithOneTxn == 1) { + // If the node that had the transaction committed was one of the nodes + // that responded during recovery, then we should have recovered txid + // 4. + checkRecovery(cluster, 4, 4); + writeSegment(cluster, qjm, 5, 3, true); + } else { + // Otherwise, we should have recovered only 1-3 and should be able to + // start a segment at 4. + checkRecovery(cluster, 1, 3); + writeSegment(cluster, qjm, 4, 3, true); + } + } + /** * Test case where a new writer picks up from an old one with no failures @@ -408,8 +529,15 @@ public class TestQuorumJournalManager { @Override public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, String journalId, InetSocketAddress addr) { - return Mockito.spy(IPCLoggerChannel.FACTORY.createLogger( - conf, nsInfo, journalId, addr)); + AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, addr) { + protected ExecutorService createExecutor() { + // Don't parallelize calls to the quorum in the tests. + // This makes the tests more deterministic. + return MoreExecutors.sameThreadExecutor(); + } + }; + + return Mockito.spy(logger); } }; return new QuorumJournalManager( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java index f9539d9bb61..1d52f987f9d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java @@ -204,6 +204,89 @@ public class TestJournal { "No log file to finalize at transaction ID 1000", ise); } } + + /** + * Assume that a client is writing to a journal, but loses its connection + * in the middle of a segment. Thus, any future journal() calls in that + * segment may fail, because some txns were missed while the connection was + * down. + * + * Eventually, the connection comes back, and the NN tries to start a new + * segment at a higher txid. This should abort the old one and succeed. + */ + @Test + public void testAbortOldSegmentIfFinalizeIsMissed() throws Exception { + journal.newEpoch(FAKE_NSINFO, 1); + + // Start a segment at txid 1, and write a batch of 3 txns. + journal.startLogSegment(makeRI(1), 1); + journal.journal(makeRI(2), 1, 3, + QJMTestUtil.createTxnData(1, 3)); + + GenericTestUtils.assertExists( + journal.getStorage().getInProgressEditLog(1)); + + // Try to start new segment at txid 6, this should abort old segment and + // then succeed, allowing us to write txid 6-9. + journal.startLogSegment(makeRI(3), 6); + journal.journal(makeRI(4), 6, 3, + QJMTestUtil.createTxnData(6, 3)); + + // The old segment should *not* be finalized. + GenericTestUtils.assertExists( + journal.getStorage().getInProgressEditLog(1)); + GenericTestUtils.assertExists( + journal.getStorage().getInProgressEditLog(6)); + } + + /** + * Test behavior of startLogSegment() when a segment with the + * same transaction ID already exists. + */ + @Test + public void testStartLogSegmentWhenAlreadyExists() throws Exception { + journal.newEpoch(FAKE_NSINFO, 1); + + // Start a segment at txid 1, and write just 1 transaction. This + // would normally be the START_LOG_SEGMENT transaction. + journal.startLogSegment(makeRI(1), 1); + journal.journal(makeRI(2), 1, 1, + QJMTestUtil.createTxnData(1, 1)); + + // Try to start new segment at txid 1, this should succeed, because + // we are allowed to re-start a segment if we only ever had the + // START_LOG_SEGMENT transaction logged. + journal.startLogSegment(makeRI(3), 1); + journal.journal(makeRI(4), 1, 1, + QJMTestUtil.createTxnData(1, 1)); + + // This time through, write more transactions afterwards, simulating + // real user transactions. + journal.journal(makeRI(5), 2, 3, + QJMTestUtil.createTxnData(2, 3)); + + try { + journal.startLogSegment(makeRI(6), 1); + fail("Did not fail to start log segment which would overwrite " + + "an existing one"); + } catch (IllegalStateException ise) { + GenericTestUtils.assertExceptionContains( + "seems to contain valid transactions", ise); + } + + journal.finalizeLogSegment(makeRI(7), 1, 4); + + // Ensure that we cannot overwrite a finalized segment + try { + journal.startLogSegment(makeRI(8), 1); + fail("Did not fail to start log segment which would overwrite " + + "an existing one"); + } catch (IllegalStateException ise) { + GenericTestUtils.assertExceptionContains( + "have a finalized segment", ise); + } + + } private static RequestInfo makeRI(int serial) { return new RequestInfo(JID, 1, serial); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java index dee1e20b28e..5a07f02d7fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java @@ -118,7 +118,9 @@ public class TestJournalNode { ch.startLogSegment(3).get(); response = ch.newEpoch(4).get(); ch.setEpoch(4); - assertEquals(3, response.getLastSegmentTxId()); + // Because the new segment is empty, it is equivalent to not having + // started writing it. + assertEquals(0, response.getLastSegmentTxId()); } @Test