diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt index e499b3a6444..8dd9daa8317 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt @@ -32,3 +32,5 @@ HDFS-3823. QJM: TestQJMWithFaults fails occasionally because of missed setting o HDFS-3826. QJM: Some trivial logging / exception text improvements. (todd and atm) HDFS-3839. QJM: hadoop-daemon.sh should be updated to accept "journalnode" (eli) + +HDFS-3845. Fixes for edge cases in QJM recovery protocol (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index 645efcdfe3d..4dcc12e4097 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -23,7 +23,6 @@ import java.net.URL; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -52,7 +51,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.collect.ComparisonChain; import com.google.common.collect.Lists; /** @@ -223,7 +221,7 @@ private void recoverUnclosedSegment(long segmentTxId) throws IOException { // TODO: we should collect any "ties" and pass the URL for all of them // when syncing, so we can tolerate failure during recovery better. Entry bestEntry = Collections.max( - prepareResponses.entrySet(), RECOVERY_COMPARATOR); + prepareResponses.entrySet(), SegmentRecoveryComparator.INSTANCE); AsyncLogger bestLogger = bestEntry.getKey(); PrepareRecoveryResponseProto bestResponse = bestEntry.getValue(); @@ -282,35 +280,6 @@ private void recoverUnclosedSegment(long segmentTxId) throws IOException { loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs); } - private static final Comparator> RECOVERY_COMPARATOR = - new Comparator>() { - @Override - public int compare( - Entry a, - Entry b) { - - PrepareRecoveryResponseProto r1 = a.getValue(); - PrepareRecoveryResponseProto r2 = b.getValue(); - - if (r1.hasSegmentState() && r2.hasSegmentState()) { - assert r1.getSegmentState().getStartTxId() == - r2.getSegmentState().getStartTxId() : "bad args: " + r1 + ", " + r2; - } - - return ComparisonChain.start() - // If one of them has accepted something and the other hasn't, - // use the one with an accepted recovery - .compare(r1.hasAcceptedInEpoch(), r2.hasAcceptedInEpoch()) - // If they both accepted, use the one that's more recent - .compare(r1.getAcceptedInEpoch(), - r2.getAcceptedInEpoch()) - // Otherwise, choose based on which log is longer - .compare(r1.hasSegmentState(), r2.hasSegmentState()) - .compare(r1.getSegmentState().getEndTxId(), r2.getSegmentState().getEndTxId()) - .result(); - } - }; - static List createLoggers(Configuration conf, URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/SegmentRecoveryComparator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/SegmentRecoveryComparator.java new file mode 100644 index 00000000000..b8e6ec36646 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/SegmentRecoveryComparator.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.client; + +import java.util.Comparator; +import java.util.Map.Entry; + +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ComparisonChain; +import com.google.common.primitives.Booleans; + +/** + * Compares responses to the prepareRecovery RPC. This is responsible for + * determining the correct length to recover. + */ +class SegmentRecoveryComparator + implements Comparator> { + + static final SegmentRecoveryComparator INSTANCE = new SegmentRecoveryComparator(); + + @Override + public int compare( + Entry a, + Entry b) { + + PrepareRecoveryResponseProto r1 = a.getValue(); + PrepareRecoveryResponseProto r2 = b.getValue(); + + // A response that has data for a segment is always better than one + // that doesn't. + if (r1.hasSegmentState() != r2.hasSegmentState()) { + return Booleans.compare(r1.hasSegmentState(), r2.hasSegmentState()); + } + + if (!r1.hasSegmentState()) { + // Neither has a segment, so neither can be used for recover. + // Call them equal. + return 0; + } + + // They both have a segment. + SegmentStateProto r1Seg = r1.getSegmentState(); + SegmentStateProto r2Seg = r2.getSegmentState(); + + Preconditions.checkArgument(r1Seg.getStartTxId() == r2Seg.getStartTxId(), + "Should only be called with responses for corresponding segments: " + + "%s and %s do not have the same start txid.", r1, r2); + + // If one is in-progress but the other is finalized, + // the finalized one is greater. + if (r1Seg.getIsInProgress() != r2Seg.getIsInProgress()) { + return Booleans.compare(!r1Seg.getIsInProgress(), !r2Seg.getIsInProgress()); + } + + if (!r1Seg.getIsInProgress()) { + // If both are finalized, they should match lengths, and be considered + // equal + if (r1Seg.getEndTxId() != r2Seg.getEndTxId() || + !r1Seg.getMd5Sum().equals(r2Seg.getMd5Sum())) { + throw new AssertionError("finalized segs with different lengths: " + + r1 + ", " + r2); + } + return 0; + } + + // Both are in-progress. + long r1SeenEpoch = Math.max(r1.getAcceptedInEpoch(), r1.getLastWriterEpoch()); + long r2SeenEpoch = Math.max(r2.getAcceptedInEpoch(), r2.getLastWriterEpoch()); + + return ComparisonChain.start() + .compare(r1SeenEpoch, r2SeenEpoch) + .compare(r1.getSegmentState().getEndTxId(), r2.getSegmentState().getEndTxId()) + .result(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index fda8d415c93..c633faf509c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -76,7 +76,17 @@ class Journal implements Closeable { * number of that writer is stored persistently on disk. */ private PersistentLongFile lastPromisedEpoch; + + /** + * The epoch number of the last writer to actually write a transaction. + * This is used to differentiate log segments after a crash at the very + * beginning of a segment. See the the 'testNewerVersionOfSegmentWins' + * test case. + */ + private PersistentLongFile lastWriterEpoch; + private static final String LAST_PROMISED_FILENAME = "last-promised-epoch"; + private static final String LAST_WRITER_EPOCH = "last-writer-epoch"; private final FileJournalManager fjm; @@ -86,6 +96,8 @@ class Journal implements Closeable { File currentDir = storage.getSingularStorageDir().getCurrentDir(); this.lastPromisedEpoch = new PersistentLongFile( new File(currentDir, LAST_PROMISED_FILENAME), 0); + this.lastWriterEpoch = new PersistentLongFile( + new File(currentDir, LAST_WRITER_EPOCH), 0); this.fjm = storage.getJournalManager(); } @@ -201,7 +213,7 @@ synchronized NewEpochResponseProto newEpoch( synchronized void journal(RequestInfo reqInfo, long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException { - checkRequest(reqInfo); + checkWriteRequest(reqInfo); checkFormatted(); // TODO: if a JN goes down and comes back up, then it will throw @@ -260,6 +272,16 @@ private synchronized void checkRequest(RequestInfo reqInfo) throws IOException { // client } + private synchronized void checkWriteRequest(RequestInfo reqInfo) throws IOException { + checkRequest(reqInfo); + + if (reqInfo.getEpoch() != lastWriterEpoch.get()) { + throw new IOException("IPC's epoch " + reqInfo.getEpoch() + + " is not the current writer epoch " + + lastWriterEpoch.get()); + } + } + private void checkFormatted() throws JournalNotFormattedException { if (!storage.isFormatted()) { throw new JournalNotFormattedException("Journal " + storage + @@ -308,6 +330,18 @@ public synchronized void startLogSegment(RequestInfo reqInfo, long txid) } } + long curLastWriterEpoch = lastWriterEpoch.get(); + if (curLastWriterEpoch != reqInfo.getEpoch()) { + LOG.info("Recording lastWriterEpoch = " + reqInfo.getEpoch()); + lastWriterEpoch.set(reqInfo.getEpoch()); + } + + // The fact that we are starting a segment at this txid indicates + // that any previous recovery for this same segment was aborted. + // Otherwise, no writer would have started writing. So, we can + // remove the record of the older segment here. + purgePaxosDecision(txid); + curSegment = fjm.startLogSegment(txid); curSegmentTxId = txid; nextTxId = txid; @@ -350,6 +384,12 @@ public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId, "Trying to re-finalize already finalized log " + elf + " with different endTxId " + endTxId); } + + // Once logs are finalized, a different length will never be decided. + // During recovery, we treat a finalized segment the same as an accepted + // recovery. Thus, we no longer need to keep track of the previously- + // accepted decision. The existence of the finalized log segment is enough. + purgePaxosDecision(elf.getFirstTxId()); } /** @@ -364,6 +404,21 @@ public synchronized void purgeLogsOlderThan(RequestInfo reqInfo, purgePaxosDecisionsOlderThan(minTxIdToKeep); } + /** + * Remove the previously-recorded 'accepted recovery' information + * for a given log segment, once it is no longer necessary. + * @param segmentTxId the transaction ID to purge + * @throws IOException if the file could not be deleted + */ + private void purgePaxosDecision(long segmentTxId) throws IOException { + File paxosFile = storage.getPaxosFile(segmentTxId); + if (paxosFile.exists()) { + if (!paxosFile.delete()) { + throw new IOException("Unable to delete paxos file " + paxosFile); + } + } + } + private void purgePaxosDecisionsOlderThan(long minTxIdToKeep) throws IOException { File dir = storage.getPaxosDir(); @@ -442,18 +497,29 @@ public synchronized PrepareRecoveryResponseProto prepareRecovery( PrepareRecoveryResponseProto.Builder builder = PrepareRecoveryResponseProto.newBuilder(); + + SegmentStateProto segInfo = getSegmentInfo(segmentTxId); + boolean hasFinalizedSegment = segInfo != null && !segInfo.getIsInProgress(); PersistedRecoveryPaxosData previouslyAccepted = getPersistedPaxosData(segmentTxId); - if (previouslyAccepted != null) { + + if (previouslyAccepted != null && !hasFinalizedSegment) { + SegmentStateProto acceptedState = previouslyAccepted.getSegmentState(); + assert acceptedState.getEndTxId() == segInfo.getEndTxId() && + acceptedState.getMd5Sum().equals(segInfo.getMd5Sum()) : + "prev accepted: " + TextFormat.shortDebugString(previouslyAccepted)+ "\n" + + "on disk: " + TextFormat.shortDebugString(segInfo); + builder.setAcceptedInEpoch(previouslyAccepted.getAcceptedInEpoch()) .setSegmentState(previouslyAccepted.getSegmentState()); } else { - SegmentStateProto segInfo = getSegmentInfo(segmentTxId); if (segInfo != null) { builder.setSegmentState(segInfo); } } + builder.setLastWriterEpoch(lastWriterEpoch.get()); + PrepareRecoveryResponseProto resp = builder.build(); LOG.info("Prepared recovery for segment " + segmentTxId + ": " + TextFormat.shortDebugString(resp)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto index 14429633e8e..177758aa924 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto @@ -162,6 +162,7 @@ message PrepareRecoveryRequestProto { message PrepareRecoveryResponseProto { optional SegmentStateProto segmentState = 1; optional uint64 acceptedInEpoch = 2; + required uint64 lastWriterEpoch = 3; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index e9f81d28ec3..d7b5562f2bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -30,9 +30,9 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URISyntaxException; +import java.net.URL; import java.util.List; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; import org.apache.hadoop.hdfs.qjournal.QJMTestUtil; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager; @@ -533,6 +534,212 @@ public void testRecoverAfterIncompleteRecovery() throws Exception { checkRecovery(cluster, 1, 4); } + /** + * Set up the following tricky edge case state which is used by + * multiple tests: + * + * Initial writer: + * - Writing to 3 JNs: JN0, JN1, JN2: + * - A log segment with txnid 1 through 100 succeeds. + * - The first transaction in the next segment only goes to JN0 + * before the writer crashes (eg it is partitioned) + * + * Recovery by another writer: + * - The new NN starts recovery and talks to all three. Thus, it sees + * that the newest log segment which needs recovery is 101. + * - It sends the prepareRecovery(101) call, and decides that the + * recovery length for 101 is only the 1 transaction. + * - It sends acceptRecovery(101-101) to only JN0, before crashing + * + * This yields the following state: + * - JN0: 1-100 finalized, 101_inprogress, accepted recovery: 101-101 + * - JN1: 1-100 finalized, 101_inprogress.empty + * - JN2: 1-100 finalized, 101_inprogress.empty + * (the .empty files got moved aside during recovery) + * @throws Exception + */ + private void setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery() throws Exception { + // Log segment with txns 1-100 succeeds + writeSegment(cluster, qjm, 1, 100, true); + + // startLogSegment only makes it to one of the three nodes + failLoggerAtTxn(spies.get(1), 101); + failLoggerAtTxn(spies.get(2), 101); + + try { + writeSegment(cluster, qjm, 101, 1, true); + fail("Should have failed"); + } catch (QuorumException qe) { + GenericTestUtils.assertExceptionContains("mock failure", qe); + } finally { + qjm.close(); + } + + // Recovery 1: + // make acceptRecovery() only make it to the node which has txid 101 + // this should fail because only 1/3 accepted the recovery + qjm = createSpyingQJM(); + spies = qjm.getLoggerSetForTests().getLoggersForTests(); + futureThrows(new IOException("mock failure")).when(spies.get(1)) + .acceptRecovery(Mockito.any(), Mockito.any()); + futureThrows(new IOException("mock failure")).when(spies.get(2)) + .acceptRecovery(Mockito.any(), Mockito.any()); + + try { + qjm.recoverUnfinalizedSegments(); + fail("Should have failed to recover"); + } catch (QuorumException qe) { + GenericTestUtils.assertExceptionContains("mock failure", qe); + } finally { + qjm.close(); + } + + // Check that we have entered the expected state as described in the + // method javadoc. + GenericTestUtils.assertGlobEquals(cluster.getCurrentDir(0, JID), + "edits_.*", + NNStorage.getFinalizedEditsFileName(1, 100), + NNStorage.getInProgressEditsFileName(101)); + GenericTestUtils.assertGlobEquals(cluster.getCurrentDir(1, JID), + "edits_.*", + NNStorage.getFinalizedEditsFileName(1, 100), + NNStorage.getInProgressEditsFileName(101) + ".empty"); + GenericTestUtils.assertGlobEquals(cluster.getCurrentDir(2, JID), + "edits_.*", + NNStorage.getFinalizedEditsFileName(1, 100), + NNStorage.getInProgressEditsFileName(101) + ".empty"); + + File paxos0 = new File(cluster.getCurrentDir(0, JID), "paxos"); + File paxos1 = new File(cluster.getCurrentDir(1, JID), "paxos"); + File paxos2 = new File(cluster.getCurrentDir(2, JID), "paxos"); + + GenericTestUtils.assertGlobEquals(paxos0, ".*", "101"); + GenericTestUtils.assertGlobEquals(paxos1, ".*"); + GenericTestUtils.assertGlobEquals(paxos2, ".*"); + } + + /** + * Test an edge case discovered by randomized testing. + * + * Starts with the edge case state set up by + * {@link #setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery()} + * + * Recovery 2: + * - New NN starts recovery and only talks to JN1 and JN2. JN0 has + * crashed. Since they have no logs open, they say they don't need + * recovery. + * - Starts writing segment 101, and writes 50 transactions before crashing. + * + * Recovery 3: + * - JN0 has come back to life. + * - New NN starts recovery and talks to all three. All three have + * segments open from txid 101, so it calls prepareRecovery(101) + * - JN0 has an already-accepted value for segment 101, so it replies + * "you should recover 101-101" + * - Former incorrect behavior: NN truncates logs to txid 101 even though + * it should have recovered through 150. + * + * In this case, even though there is an accepted recovery decision, + * the newer log segments should take precedence, since they were written + * in a newer epoch than the recorded decision. + */ + @Test + public void testNewerVersionOfSegmentWins() throws Exception { + setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery(); + + // Now start writing again without JN0 present: + cluster.getJournalNode(0).stopAndJoin(0); + + qjm = createSpyingQJM(); + try { + assertEquals(100, QJMTestUtil.recoverAndReturnLastTxn(qjm)); + + // Write segment but do not finalize + writeSegment(cluster, qjm, 101, 50, false); + } finally { + qjm.close(); + } + + // Now try to recover a new writer, with JN0 present, + // and ensure that all of the above-written transactions are recovered. + cluster.restartJournalNode(0); + qjm = createSpyingQJM(); + try { + assertEquals(150, QJMTestUtil.recoverAndReturnLastTxn(qjm)); + } finally { + qjm.close(); + } + } + + /** + * Test another edge case discovered by randomized testing. + * + * Starts with the edge case state set up by + * {@link #setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery()} + * + * Recovery 2: + * - New NN starts recovery and only talks to JN1 and JN2. JN0 has + * crashed. Since they have no logs open, they say they don't need + * recovery. + * - Before writing any transactions, JN0 comes back to life and + * JN1 crashes. + * - Starts writing segment 101, and writes 50 transactions before crashing. + * + * Recovery 3: + * - JN1 has come back to life. JN2 crashes. + * - New NN starts recovery and talks to all three. All three have + * segments open from txid 101, so it calls prepareRecovery(101) + * - JN0 has an already-accepted value for segment 101, so it replies + * "you should recover 101-101" + * - Former incorrect behavior: NN truncates logs to txid 101 even though + * it should have recovered through 150. + * + * In this case, even though there is an accepted recovery decision, + * the newer log segments should take precedence, since they were written + * in a newer epoch than the recorded decision. + */ + @Test + public void testNewerVersionOfSegmentWins2() throws Exception { + setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery(); + + // Recover without JN0 present. + cluster.getJournalNode(0).stopAndJoin(0); + + qjm = createSpyingQJM(); + try { + assertEquals(100, QJMTestUtil.recoverAndReturnLastTxn(qjm)); + + // After recovery, JN0 comes back to life and JN1 crashes. + cluster.restartJournalNode(0); + cluster.getJournalNode(1).stopAndJoin(0); + + // Write segment but do not finalize + writeSegment(cluster, qjm, 101, 50, false); + } finally { + qjm.close(); + } + + // State: + // JN0: 1-100 finalized, 101_inprogress (txns up to 150) + // Previously, JN0 had an accepted recovery 101-101 from an earlier recovery + // attempt. + // JN1: 1-100 finalized + // JN2: 1-100 finalized, 101_inprogress (txns up to 150) + + // We need to test that the accepted recovery 101-101 on JN0 doesn't + // end up truncating the log back to 101. + + cluster.restartJournalNode(1); + cluster.getJournalNode(2).stopAndJoin(0); + + qjm = createSpyingQJM(); + try { + assertEquals(150, QJMTestUtil.recoverAndReturnLastTxn(qjm)); + } finally { + qjm.close(); + } + } + @Test public void testPurgeLogs() throws Exception { for (int txid = 1; txid <= 5; txid++) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestSegmentRecoveryComparator.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestSegmentRecoveryComparator.java new file mode 100644 index 00000000000..b49b68049aa --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestSegmentRecoveryComparator.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.client; + +import static org.junit.Assert.*; + +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; +import org.junit.Test; +import org.mockito.Mockito; + +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; + +import static org.apache.hadoop.hdfs.qjournal.client.SegmentRecoveryComparator.INSTANCE; + +public class TestSegmentRecoveryComparator { + + private static Map.Entry makeEntry( + PrepareRecoveryResponseProto proto) { + return Maps.immutableEntry(Mockito.mock(AsyncLogger.class), proto); + } + + @Test + public void testComparisons() { + Entry INPROGRESS_1_3 = + makeEntry(PrepareRecoveryResponseProto.newBuilder() + .setSegmentState(SegmentStateProto.newBuilder() + .setStartTxId(1L) + .setEndTxId(3L) + .setMd5Sum(ByteString.EMPTY) + .setIsInProgress(true)) + .setLastWriterEpoch(0L) + .build()); + Entry INPROGRESS_1_4 = + makeEntry(PrepareRecoveryResponseProto.newBuilder() + .setSegmentState(SegmentStateProto.newBuilder() + .setStartTxId(1L) + .setEndTxId(4L) + .setMd5Sum(ByteString.EMPTY) + .setIsInProgress(true)) + .setLastWriterEpoch(0L) + .build()); + Entry INPROGRESS_1_4_ACCEPTED = + makeEntry(PrepareRecoveryResponseProto.newBuilder() + .setSegmentState(SegmentStateProto.newBuilder() + .setStartTxId(1L) + .setEndTxId(4L) + .setMd5Sum(ByteString.EMPTY) + .setIsInProgress(true)) + .setLastWriterEpoch(0L) + .setAcceptedInEpoch(1L) + .build()); + + Entry FINALIZED_1_3 = + makeEntry(PrepareRecoveryResponseProto.newBuilder() + .setSegmentState(SegmentStateProto.newBuilder() + .setStartTxId(1L) + .setEndTxId(3L) + .setMd5Sum(ByteString.EMPTY) + .setIsInProgress(false)) + .setLastWriterEpoch(0L) + .build()); + + // Should compare equal to itself + assertEquals(0, INSTANCE.compare(INPROGRESS_1_3, INPROGRESS_1_3)); + + // Longer log wins. + assertEquals(-1, INSTANCE.compare(INPROGRESS_1_3, INPROGRESS_1_4)); + assertEquals(1, INSTANCE.compare(INPROGRESS_1_4, INPROGRESS_1_3)); + + // Finalized log wins even over a longer in-progress + assertEquals(-1, INSTANCE.compare(INPROGRESS_1_4, FINALIZED_1_3)); + assertEquals(1, INSTANCE.compare(FINALIZED_1_3, INPROGRESS_1_4)); + + // Finalized log wins even if the in-progress one has an accepted + // recovery proposal. + assertEquals(-1, INSTANCE.compare(INPROGRESS_1_4_ACCEPTED, FINALIZED_1_3)); + assertEquals(1, INSTANCE.compare(FINALIZED_1_3, INPROGRESS_1_4_ACCEPTED)); + } +}