HDFS-3845. Fixes for edge cases in QJM recovery protocol. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1377809 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a8ff292669
commit
1e68d4726b
|
@ -32,3 +32,5 @@ HDFS-3823. QJM: TestQJMWithFaults fails occasionally because of missed setting o
|
||||||
HDFS-3826. QJM: Some trivial logging / exception text improvements. (todd and atm)
|
HDFS-3826. QJM: Some trivial logging / exception text improvements. (todd and atm)
|
||||||
|
|
||||||
HDFS-3839. QJM: hadoop-daemon.sh should be updated to accept "journalnode" (eli)
|
HDFS-3839. QJM: hadoop-daemon.sh should be updated to accept "journalnode" (eli)
|
||||||
|
|
||||||
|
HDFS-3845. Fixes for edge cases in QJM recovery protocol (todd)
|
||||||
|
|
|
@ -23,7 +23,6 @@ import java.net.URI;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
@ -52,7 +51,6 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.ComparisonChain;
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -223,7 +221,7 @@ public class QuorumJournalManager implements JournalManager {
|
||||||
// TODO: we should collect any "ties" and pass the URL for all of them
|
// TODO: we should collect any "ties" and pass the URL for all of them
|
||||||
// when syncing, so we can tolerate failure during recovery better.
|
// when syncing, so we can tolerate failure during recovery better.
|
||||||
Entry<AsyncLogger, PrepareRecoveryResponseProto> bestEntry = Collections.max(
|
Entry<AsyncLogger, PrepareRecoveryResponseProto> bestEntry = Collections.max(
|
||||||
prepareResponses.entrySet(), RECOVERY_COMPARATOR);
|
prepareResponses.entrySet(), SegmentRecoveryComparator.INSTANCE);
|
||||||
AsyncLogger bestLogger = bestEntry.getKey();
|
AsyncLogger bestLogger = bestEntry.getKey();
|
||||||
PrepareRecoveryResponseProto bestResponse = bestEntry.getValue();
|
PrepareRecoveryResponseProto bestResponse = bestEntry.getValue();
|
||||||
|
|
||||||
|
@ -282,35 +280,6 @@ public class QuorumJournalManager implements JournalManager {
|
||||||
loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs);
|
loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final Comparator<Entry<AsyncLogger, PrepareRecoveryResponseProto>> RECOVERY_COMPARATOR =
|
|
||||||
new Comparator<Entry<AsyncLogger, PrepareRecoveryResponseProto>>() {
|
|
||||||
@Override
|
|
||||||
public int compare(
|
|
||||||
Entry<AsyncLogger, PrepareRecoveryResponseProto> a,
|
|
||||||
Entry<AsyncLogger, PrepareRecoveryResponseProto> b) {
|
|
||||||
|
|
||||||
PrepareRecoveryResponseProto r1 = a.getValue();
|
|
||||||
PrepareRecoveryResponseProto r2 = b.getValue();
|
|
||||||
|
|
||||||
if (r1.hasSegmentState() && r2.hasSegmentState()) {
|
|
||||||
assert r1.getSegmentState().getStartTxId() ==
|
|
||||||
r2.getSegmentState().getStartTxId() : "bad args: " + r1 + ", " + r2;
|
|
||||||
}
|
|
||||||
|
|
||||||
return ComparisonChain.start()
|
|
||||||
// If one of them has accepted something and the other hasn't,
|
|
||||||
// use the one with an accepted recovery
|
|
||||||
.compare(r1.hasAcceptedInEpoch(), r2.hasAcceptedInEpoch())
|
|
||||||
// If they both accepted, use the one that's more recent
|
|
||||||
.compare(r1.getAcceptedInEpoch(),
|
|
||||||
r2.getAcceptedInEpoch())
|
|
||||||
// Otherwise, choose based on which log is longer
|
|
||||||
.compare(r1.hasSegmentState(), r2.hasSegmentState())
|
|
||||||
.compare(r1.getSegmentState().getEndTxId(), r2.getSegmentState().getEndTxId())
|
|
||||||
.result();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
static List<AsyncLogger> createLoggers(Configuration conf,
|
static List<AsyncLogger> createLoggers(Configuration conf,
|
||||||
URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory)
|
URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -0,0 +1,93 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.qjournal.client;
|
||||||
|
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.ComparisonChain;
|
||||||
|
import com.google.common.primitives.Booleans;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compares responses to the prepareRecovery RPC. This is responsible for
|
||||||
|
* determining the correct length to recover.
|
||||||
|
*/
|
||||||
|
class SegmentRecoveryComparator
|
||||||
|
implements Comparator<Entry<AsyncLogger, PrepareRecoveryResponseProto>> {
|
||||||
|
|
||||||
|
static final SegmentRecoveryComparator INSTANCE = new SegmentRecoveryComparator();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compare(
|
||||||
|
Entry<AsyncLogger, PrepareRecoveryResponseProto> a,
|
||||||
|
Entry<AsyncLogger, PrepareRecoveryResponseProto> b) {
|
||||||
|
|
||||||
|
PrepareRecoveryResponseProto r1 = a.getValue();
|
||||||
|
PrepareRecoveryResponseProto r2 = b.getValue();
|
||||||
|
|
||||||
|
// A response that has data for a segment is always better than one
|
||||||
|
// that doesn't.
|
||||||
|
if (r1.hasSegmentState() != r2.hasSegmentState()) {
|
||||||
|
return Booleans.compare(r1.hasSegmentState(), r2.hasSegmentState());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!r1.hasSegmentState()) {
|
||||||
|
// Neither has a segment, so neither can be used for recover.
|
||||||
|
// Call them equal.
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// They both have a segment.
|
||||||
|
SegmentStateProto r1Seg = r1.getSegmentState();
|
||||||
|
SegmentStateProto r2Seg = r2.getSegmentState();
|
||||||
|
|
||||||
|
Preconditions.checkArgument(r1Seg.getStartTxId() == r2Seg.getStartTxId(),
|
||||||
|
"Should only be called with responses for corresponding segments: " +
|
||||||
|
"%s and %s do not have the same start txid.", r1, r2);
|
||||||
|
|
||||||
|
// If one is in-progress but the other is finalized,
|
||||||
|
// the finalized one is greater.
|
||||||
|
if (r1Seg.getIsInProgress() != r2Seg.getIsInProgress()) {
|
||||||
|
return Booleans.compare(!r1Seg.getIsInProgress(), !r2Seg.getIsInProgress());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!r1Seg.getIsInProgress()) {
|
||||||
|
// If both are finalized, they should match lengths, and be considered
|
||||||
|
// equal
|
||||||
|
if (r1Seg.getEndTxId() != r2Seg.getEndTxId() ||
|
||||||
|
!r1Seg.getMd5Sum().equals(r2Seg.getMd5Sum())) {
|
||||||
|
throw new AssertionError("finalized segs with different lengths: " +
|
||||||
|
r1 + ", " + r2);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Both are in-progress.
|
||||||
|
long r1SeenEpoch = Math.max(r1.getAcceptedInEpoch(), r1.getLastWriterEpoch());
|
||||||
|
long r2SeenEpoch = Math.max(r2.getAcceptedInEpoch(), r2.getLastWriterEpoch());
|
||||||
|
|
||||||
|
return ComparisonChain.start()
|
||||||
|
.compare(r1SeenEpoch, r2SeenEpoch)
|
||||||
|
.compare(r1.getSegmentState().getEndTxId(), r2.getSegmentState().getEndTxId())
|
||||||
|
.result();
|
||||||
|
}
|
||||||
|
}
|
|
@ -76,7 +76,17 @@ class Journal implements Closeable {
|
||||||
* number of that writer is stored persistently on disk.
|
* number of that writer is stored persistently on disk.
|
||||||
*/
|
*/
|
||||||
private PersistentLongFile lastPromisedEpoch;
|
private PersistentLongFile lastPromisedEpoch;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The epoch number of the last writer to actually write a transaction.
|
||||||
|
* This is used to differentiate log segments after a crash at the very
|
||||||
|
* beginning of a segment. See the the 'testNewerVersionOfSegmentWins'
|
||||||
|
* test case.
|
||||||
|
*/
|
||||||
|
private PersistentLongFile lastWriterEpoch;
|
||||||
|
|
||||||
private static final String LAST_PROMISED_FILENAME = "last-promised-epoch";
|
private static final String LAST_PROMISED_FILENAME = "last-promised-epoch";
|
||||||
|
private static final String LAST_WRITER_EPOCH = "last-writer-epoch";
|
||||||
|
|
||||||
private final FileJournalManager fjm;
|
private final FileJournalManager fjm;
|
||||||
|
|
||||||
|
@ -86,6 +96,8 @@ class Journal implements Closeable {
|
||||||
File currentDir = storage.getSingularStorageDir().getCurrentDir();
|
File currentDir = storage.getSingularStorageDir().getCurrentDir();
|
||||||
this.lastPromisedEpoch = new PersistentLongFile(
|
this.lastPromisedEpoch = new PersistentLongFile(
|
||||||
new File(currentDir, LAST_PROMISED_FILENAME), 0);
|
new File(currentDir, LAST_PROMISED_FILENAME), 0);
|
||||||
|
this.lastWriterEpoch = new PersistentLongFile(
|
||||||
|
new File(currentDir, LAST_WRITER_EPOCH), 0);
|
||||||
|
|
||||||
this.fjm = storage.getJournalManager();
|
this.fjm = storage.getJournalManager();
|
||||||
}
|
}
|
||||||
|
@ -201,7 +213,7 @@ class Journal implements Closeable {
|
||||||
synchronized void journal(RequestInfo reqInfo,
|
synchronized void journal(RequestInfo reqInfo,
|
||||||
long segmentTxId, long firstTxnId,
|
long segmentTxId, long firstTxnId,
|
||||||
int numTxns, byte[] records) throws IOException {
|
int numTxns, byte[] records) throws IOException {
|
||||||
checkRequest(reqInfo);
|
checkWriteRequest(reqInfo);
|
||||||
checkFormatted();
|
checkFormatted();
|
||||||
|
|
||||||
// TODO: if a JN goes down and comes back up, then it will throw
|
// TODO: if a JN goes down and comes back up, then it will throw
|
||||||
|
@ -260,6 +272,16 @@ class Journal implements Closeable {
|
||||||
// client
|
// client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private synchronized void checkWriteRequest(RequestInfo reqInfo) throws IOException {
|
||||||
|
checkRequest(reqInfo);
|
||||||
|
|
||||||
|
if (reqInfo.getEpoch() != lastWriterEpoch.get()) {
|
||||||
|
throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
|
||||||
|
" is not the current writer epoch " +
|
||||||
|
lastWriterEpoch.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void checkFormatted() throws JournalNotFormattedException {
|
private void checkFormatted() throws JournalNotFormattedException {
|
||||||
if (!storage.isFormatted()) {
|
if (!storage.isFormatted()) {
|
||||||
throw new JournalNotFormattedException("Journal " + storage +
|
throw new JournalNotFormattedException("Journal " + storage +
|
||||||
|
@ -308,6 +330,18 @@ class Journal implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long curLastWriterEpoch = lastWriterEpoch.get();
|
||||||
|
if (curLastWriterEpoch != reqInfo.getEpoch()) {
|
||||||
|
LOG.info("Recording lastWriterEpoch = " + reqInfo.getEpoch());
|
||||||
|
lastWriterEpoch.set(reqInfo.getEpoch());
|
||||||
|
}
|
||||||
|
|
||||||
|
// The fact that we are starting a segment at this txid indicates
|
||||||
|
// that any previous recovery for this same segment was aborted.
|
||||||
|
// Otherwise, no writer would have started writing. So, we can
|
||||||
|
// remove the record of the older segment here.
|
||||||
|
purgePaxosDecision(txid);
|
||||||
|
|
||||||
curSegment = fjm.startLogSegment(txid);
|
curSegment = fjm.startLogSegment(txid);
|
||||||
curSegmentTxId = txid;
|
curSegmentTxId = txid;
|
||||||
nextTxId = txid;
|
nextTxId = txid;
|
||||||
|
@ -350,6 +384,12 @@ class Journal implements Closeable {
|
||||||
"Trying to re-finalize already finalized log " +
|
"Trying to re-finalize already finalized log " +
|
||||||
elf + " with different endTxId " + endTxId);
|
elf + " with different endTxId " + endTxId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Once logs are finalized, a different length will never be decided.
|
||||||
|
// During recovery, we treat a finalized segment the same as an accepted
|
||||||
|
// recovery. Thus, we no longer need to keep track of the previously-
|
||||||
|
// accepted decision. The existence of the finalized log segment is enough.
|
||||||
|
purgePaxosDecision(elf.getFirstTxId());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -364,6 +404,21 @@ class Journal implements Closeable {
|
||||||
purgePaxosDecisionsOlderThan(minTxIdToKeep);
|
purgePaxosDecisionsOlderThan(minTxIdToKeep);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove the previously-recorded 'accepted recovery' information
|
||||||
|
* for a given log segment, once it is no longer necessary.
|
||||||
|
* @param segmentTxId the transaction ID to purge
|
||||||
|
* @throws IOException if the file could not be deleted
|
||||||
|
*/
|
||||||
|
private void purgePaxosDecision(long segmentTxId) throws IOException {
|
||||||
|
File paxosFile = storage.getPaxosFile(segmentTxId);
|
||||||
|
if (paxosFile.exists()) {
|
||||||
|
if (!paxosFile.delete()) {
|
||||||
|
throw new IOException("Unable to delete paxos file " + paxosFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void purgePaxosDecisionsOlderThan(long minTxIdToKeep)
|
private void purgePaxosDecisionsOlderThan(long minTxIdToKeep)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
File dir = storage.getPaxosDir();
|
File dir = storage.getPaxosDir();
|
||||||
|
@ -443,17 +498,28 @@ class Journal implements Closeable {
|
||||||
PrepareRecoveryResponseProto.Builder builder =
|
PrepareRecoveryResponseProto.Builder builder =
|
||||||
PrepareRecoveryResponseProto.newBuilder();
|
PrepareRecoveryResponseProto.newBuilder();
|
||||||
|
|
||||||
|
SegmentStateProto segInfo = getSegmentInfo(segmentTxId);
|
||||||
|
boolean hasFinalizedSegment = segInfo != null && !segInfo.getIsInProgress();
|
||||||
|
|
||||||
PersistedRecoveryPaxosData previouslyAccepted = getPersistedPaxosData(segmentTxId);
|
PersistedRecoveryPaxosData previouslyAccepted = getPersistedPaxosData(segmentTxId);
|
||||||
if (previouslyAccepted != null) {
|
|
||||||
|
if (previouslyAccepted != null && !hasFinalizedSegment) {
|
||||||
|
SegmentStateProto acceptedState = previouslyAccepted.getSegmentState();
|
||||||
|
assert acceptedState.getEndTxId() == segInfo.getEndTxId() &&
|
||||||
|
acceptedState.getMd5Sum().equals(segInfo.getMd5Sum()) :
|
||||||
|
"prev accepted: " + TextFormat.shortDebugString(previouslyAccepted)+ "\n" +
|
||||||
|
"on disk: " + TextFormat.shortDebugString(segInfo);
|
||||||
|
|
||||||
builder.setAcceptedInEpoch(previouslyAccepted.getAcceptedInEpoch())
|
builder.setAcceptedInEpoch(previouslyAccepted.getAcceptedInEpoch())
|
||||||
.setSegmentState(previouslyAccepted.getSegmentState());
|
.setSegmentState(previouslyAccepted.getSegmentState());
|
||||||
} else {
|
} else {
|
||||||
SegmentStateProto segInfo = getSegmentInfo(segmentTxId);
|
|
||||||
if (segInfo != null) {
|
if (segInfo != null) {
|
||||||
builder.setSegmentState(segInfo);
|
builder.setSegmentState(segInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
builder.setLastWriterEpoch(lastWriterEpoch.get());
|
||||||
|
|
||||||
PrepareRecoveryResponseProto resp = builder.build();
|
PrepareRecoveryResponseProto resp = builder.build();
|
||||||
LOG.info("Prepared recovery for segment " + segmentTxId + ": " +
|
LOG.info("Prepared recovery for segment " + segmentTxId + ": " +
|
||||||
TextFormat.shortDebugString(resp));
|
TextFormat.shortDebugString(resp));
|
||||||
|
|
|
@ -162,6 +162,7 @@ message PrepareRecoveryRequestProto {
|
||||||
message PrepareRecoveryResponseProto {
|
message PrepareRecoveryResponseProto {
|
||||||
optional SegmentStateProto segmentState = 1;
|
optional SegmentStateProto segmentState = 1;
|
||||||
optional uint64 acceptedInEpoch = 2;
|
optional uint64 acceptedInEpoch = 2;
|
||||||
|
required uint64 lastWriterEpoch = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -30,9 +30,9 @@ import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
import java.net.URL;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
|
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
|
||||||
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
|
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
|
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
|
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
|
||||||
|
@ -533,6 +534,212 @@ public class TestQuorumJournalManager {
|
||||||
checkRecovery(cluster, 1, 4);
|
checkRecovery(cluster, 1, 4);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set up the following tricky edge case state which is used by
|
||||||
|
* multiple tests:
|
||||||
|
*
|
||||||
|
* Initial writer:
|
||||||
|
* - Writing to 3 JNs: JN0, JN1, JN2:
|
||||||
|
* - A log segment with txnid 1 through 100 succeeds.
|
||||||
|
* - The first transaction in the next segment only goes to JN0
|
||||||
|
* before the writer crashes (eg it is partitioned)
|
||||||
|
*
|
||||||
|
* Recovery by another writer:
|
||||||
|
* - The new NN starts recovery and talks to all three. Thus, it sees
|
||||||
|
* that the newest log segment which needs recovery is 101.
|
||||||
|
* - It sends the prepareRecovery(101) call, and decides that the
|
||||||
|
* recovery length for 101 is only the 1 transaction.
|
||||||
|
* - It sends acceptRecovery(101-101) to only JN0, before crashing
|
||||||
|
*
|
||||||
|
* This yields the following state:
|
||||||
|
* - JN0: 1-100 finalized, 101_inprogress, accepted recovery: 101-101
|
||||||
|
* - JN1: 1-100 finalized, 101_inprogress.empty
|
||||||
|
* - JN2: 1-100 finalized, 101_inprogress.empty
|
||||||
|
* (the .empty files got moved aside during recovery)
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
private void setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery() throws Exception {
|
||||||
|
// Log segment with txns 1-100 succeeds
|
||||||
|
writeSegment(cluster, qjm, 1, 100, true);
|
||||||
|
|
||||||
|
// startLogSegment only makes it to one of the three nodes
|
||||||
|
failLoggerAtTxn(spies.get(1), 101);
|
||||||
|
failLoggerAtTxn(spies.get(2), 101);
|
||||||
|
|
||||||
|
try {
|
||||||
|
writeSegment(cluster, qjm, 101, 1, true);
|
||||||
|
fail("Should have failed");
|
||||||
|
} catch (QuorumException qe) {
|
||||||
|
GenericTestUtils.assertExceptionContains("mock failure", qe);
|
||||||
|
} finally {
|
||||||
|
qjm.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recovery 1:
|
||||||
|
// make acceptRecovery() only make it to the node which has txid 101
|
||||||
|
// this should fail because only 1/3 accepted the recovery
|
||||||
|
qjm = createSpyingQJM();
|
||||||
|
spies = qjm.getLoggerSetForTests().getLoggersForTests();
|
||||||
|
futureThrows(new IOException("mock failure")).when(spies.get(1))
|
||||||
|
.acceptRecovery(Mockito.<SegmentStateProto>any(), Mockito.<URL>any());
|
||||||
|
futureThrows(new IOException("mock failure")).when(spies.get(2))
|
||||||
|
.acceptRecovery(Mockito.<SegmentStateProto>any(), Mockito.<URL>any());
|
||||||
|
|
||||||
|
try {
|
||||||
|
qjm.recoverUnfinalizedSegments();
|
||||||
|
fail("Should have failed to recover");
|
||||||
|
} catch (QuorumException qe) {
|
||||||
|
GenericTestUtils.assertExceptionContains("mock failure", qe);
|
||||||
|
} finally {
|
||||||
|
qjm.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that we have entered the expected state as described in the
|
||||||
|
// method javadoc.
|
||||||
|
GenericTestUtils.assertGlobEquals(cluster.getCurrentDir(0, JID),
|
||||||
|
"edits_.*",
|
||||||
|
NNStorage.getFinalizedEditsFileName(1, 100),
|
||||||
|
NNStorage.getInProgressEditsFileName(101));
|
||||||
|
GenericTestUtils.assertGlobEquals(cluster.getCurrentDir(1, JID),
|
||||||
|
"edits_.*",
|
||||||
|
NNStorage.getFinalizedEditsFileName(1, 100),
|
||||||
|
NNStorage.getInProgressEditsFileName(101) + ".empty");
|
||||||
|
GenericTestUtils.assertGlobEquals(cluster.getCurrentDir(2, JID),
|
||||||
|
"edits_.*",
|
||||||
|
NNStorage.getFinalizedEditsFileName(1, 100),
|
||||||
|
NNStorage.getInProgressEditsFileName(101) + ".empty");
|
||||||
|
|
||||||
|
File paxos0 = new File(cluster.getCurrentDir(0, JID), "paxos");
|
||||||
|
File paxos1 = new File(cluster.getCurrentDir(1, JID), "paxos");
|
||||||
|
File paxos2 = new File(cluster.getCurrentDir(2, JID), "paxos");
|
||||||
|
|
||||||
|
GenericTestUtils.assertGlobEquals(paxos0, ".*", "101");
|
||||||
|
GenericTestUtils.assertGlobEquals(paxos1, ".*");
|
||||||
|
GenericTestUtils.assertGlobEquals(paxos2, ".*");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test an edge case discovered by randomized testing.
|
||||||
|
*
|
||||||
|
* Starts with the edge case state set up by
|
||||||
|
* {@link #setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery()}
|
||||||
|
*
|
||||||
|
* Recovery 2:
|
||||||
|
* - New NN starts recovery and only talks to JN1 and JN2. JN0 has
|
||||||
|
* crashed. Since they have no logs open, they say they don't need
|
||||||
|
* recovery.
|
||||||
|
* - Starts writing segment 101, and writes 50 transactions before crashing.
|
||||||
|
*
|
||||||
|
* Recovery 3:
|
||||||
|
* - JN0 has come back to life.
|
||||||
|
* - New NN starts recovery and talks to all three. All three have
|
||||||
|
* segments open from txid 101, so it calls prepareRecovery(101)
|
||||||
|
* - JN0 has an already-accepted value for segment 101, so it replies
|
||||||
|
* "you should recover 101-101"
|
||||||
|
* - Former incorrect behavior: NN truncates logs to txid 101 even though
|
||||||
|
* it should have recovered through 150.
|
||||||
|
*
|
||||||
|
* In this case, even though there is an accepted recovery decision,
|
||||||
|
* the newer log segments should take precedence, since they were written
|
||||||
|
* in a newer epoch than the recorded decision.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testNewerVersionOfSegmentWins() throws Exception {
|
||||||
|
setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery();
|
||||||
|
|
||||||
|
// Now start writing again without JN0 present:
|
||||||
|
cluster.getJournalNode(0).stopAndJoin(0);
|
||||||
|
|
||||||
|
qjm = createSpyingQJM();
|
||||||
|
try {
|
||||||
|
assertEquals(100, QJMTestUtil.recoverAndReturnLastTxn(qjm));
|
||||||
|
|
||||||
|
// Write segment but do not finalize
|
||||||
|
writeSegment(cluster, qjm, 101, 50, false);
|
||||||
|
} finally {
|
||||||
|
qjm.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now try to recover a new writer, with JN0 present,
|
||||||
|
// and ensure that all of the above-written transactions are recovered.
|
||||||
|
cluster.restartJournalNode(0);
|
||||||
|
qjm = createSpyingQJM();
|
||||||
|
try {
|
||||||
|
assertEquals(150, QJMTestUtil.recoverAndReturnLastTxn(qjm));
|
||||||
|
} finally {
|
||||||
|
qjm.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test another edge case discovered by randomized testing.
|
||||||
|
*
|
||||||
|
* Starts with the edge case state set up by
|
||||||
|
* {@link #setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery()}
|
||||||
|
*
|
||||||
|
* Recovery 2:
|
||||||
|
* - New NN starts recovery and only talks to JN1 and JN2. JN0 has
|
||||||
|
* crashed. Since they have no logs open, they say they don't need
|
||||||
|
* recovery.
|
||||||
|
* - Before writing any transactions, JN0 comes back to life and
|
||||||
|
* JN1 crashes.
|
||||||
|
* - Starts writing segment 101, and writes 50 transactions before crashing.
|
||||||
|
*
|
||||||
|
* Recovery 3:
|
||||||
|
* - JN1 has come back to life. JN2 crashes.
|
||||||
|
* - New NN starts recovery and talks to all three. All three have
|
||||||
|
* segments open from txid 101, so it calls prepareRecovery(101)
|
||||||
|
* - JN0 has an already-accepted value for segment 101, so it replies
|
||||||
|
* "you should recover 101-101"
|
||||||
|
* - Former incorrect behavior: NN truncates logs to txid 101 even though
|
||||||
|
* it should have recovered through 150.
|
||||||
|
*
|
||||||
|
* In this case, even though there is an accepted recovery decision,
|
||||||
|
* the newer log segments should take precedence, since they were written
|
||||||
|
* in a newer epoch than the recorded decision.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testNewerVersionOfSegmentWins2() throws Exception {
|
||||||
|
setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery();
|
||||||
|
|
||||||
|
// Recover without JN0 present.
|
||||||
|
cluster.getJournalNode(0).stopAndJoin(0);
|
||||||
|
|
||||||
|
qjm = createSpyingQJM();
|
||||||
|
try {
|
||||||
|
assertEquals(100, QJMTestUtil.recoverAndReturnLastTxn(qjm));
|
||||||
|
|
||||||
|
// After recovery, JN0 comes back to life and JN1 crashes.
|
||||||
|
cluster.restartJournalNode(0);
|
||||||
|
cluster.getJournalNode(1).stopAndJoin(0);
|
||||||
|
|
||||||
|
// Write segment but do not finalize
|
||||||
|
writeSegment(cluster, qjm, 101, 50, false);
|
||||||
|
} finally {
|
||||||
|
qjm.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// State:
|
||||||
|
// JN0: 1-100 finalized, 101_inprogress (txns up to 150)
|
||||||
|
// Previously, JN0 had an accepted recovery 101-101 from an earlier recovery
|
||||||
|
// attempt.
|
||||||
|
// JN1: 1-100 finalized
|
||||||
|
// JN2: 1-100 finalized, 101_inprogress (txns up to 150)
|
||||||
|
|
||||||
|
// We need to test that the accepted recovery 101-101 on JN0 doesn't
|
||||||
|
// end up truncating the log back to 101.
|
||||||
|
|
||||||
|
cluster.restartJournalNode(1);
|
||||||
|
cluster.getJournalNode(2).stopAndJoin(0);
|
||||||
|
|
||||||
|
qjm = createSpyingQJM();
|
||||||
|
try {
|
||||||
|
assertEquals(150, QJMTestUtil.recoverAndReturnLastTxn(qjm));
|
||||||
|
} finally {
|
||||||
|
qjm.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPurgeLogs() throws Exception {
|
public void testPurgeLogs() throws Exception {
|
||||||
for (int txid = 1; txid <= 5; txid++) {
|
for (int txid = 1; txid <= 5; txid++) {
|
||||||
|
|
|
@ -0,0 +1,99 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.qjournal.client;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.qjournal.client.SegmentRecoveryComparator.INSTANCE;
|
||||||
|
|
||||||
|
public class TestSegmentRecoveryComparator {
|
||||||
|
|
||||||
|
private static Map.Entry<AsyncLogger, PrepareRecoveryResponseProto> makeEntry(
|
||||||
|
PrepareRecoveryResponseProto proto) {
|
||||||
|
return Maps.immutableEntry(Mockito.mock(AsyncLogger.class), proto);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testComparisons() {
|
||||||
|
Entry<AsyncLogger, PrepareRecoveryResponseProto> INPROGRESS_1_3 =
|
||||||
|
makeEntry(PrepareRecoveryResponseProto.newBuilder()
|
||||||
|
.setSegmentState(SegmentStateProto.newBuilder()
|
||||||
|
.setStartTxId(1L)
|
||||||
|
.setEndTxId(3L)
|
||||||
|
.setMd5Sum(ByteString.EMPTY)
|
||||||
|
.setIsInProgress(true))
|
||||||
|
.setLastWriterEpoch(0L)
|
||||||
|
.build());
|
||||||
|
Entry<AsyncLogger, PrepareRecoveryResponseProto> INPROGRESS_1_4 =
|
||||||
|
makeEntry(PrepareRecoveryResponseProto.newBuilder()
|
||||||
|
.setSegmentState(SegmentStateProto.newBuilder()
|
||||||
|
.setStartTxId(1L)
|
||||||
|
.setEndTxId(4L)
|
||||||
|
.setMd5Sum(ByteString.EMPTY)
|
||||||
|
.setIsInProgress(true))
|
||||||
|
.setLastWriterEpoch(0L)
|
||||||
|
.build());
|
||||||
|
Entry<AsyncLogger, PrepareRecoveryResponseProto> INPROGRESS_1_4_ACCEPTED =
|
||||||
|
makeEntry(PrepareRecoveryResponseProto.newBuilder()
|
||||||
|
.setSegmentState(SegmentStateProto.newBuilder()
|
||||||
|
.setStartTxId(1L)
|
||||||
|
.setEndTxId(4L)
|
||||||
|
.setMd5Sum(ByteString.EMPTY)
|
||||||
|
.setIsInProgress(true))
|
||||||
|
.setLastWriterEpoch(0L)
|
||||||
|
.setAcceptedInEpoch(1L)
|
||||||
|
.build());
|
||||||
|
|
||||||
|
Entry<AsyncLogger, PrepareRecoveryResponseProto> FINALIZED_1_3 =
|
||||||
|
makeEntry(PrepareRecoveryResponseProto.newBuilder()
|
||||||
|
.setSegmentState(SegmentStateProto.newBuilder()
|
||||||
|
.setStartTxId(1L)
|
||||||
|
.setEndTxId(3L)
|
||||||
|
.setMd5Sum(ByteString.EMPTY)
|
||||||
|
.setIsInProgress(false))
|
||||||
|
.setLastWriterEpoch(0L)
|
||||||
|
.build());
|
||||||
|
|
||||||
|
// Should compare equal to itself
|
||||||
|
assertEquals(0, INSTANCE.compare(INPROGRESS_1_3, INPROGRESS_1_3));
|
||||||
|
|
||||||
|
// Longer log wins.
|
||||||
|
assertEquals(-1, INSTANCE.compare(INPROGRESS_1_3, INPROGRESS_1_4));
|
||||||
|
assertEquals(1, INSTANCE.compare(INPROGRESS_1_4, INPROGRESS_1_3));
|
||||||
|
|
||||||
|
// Finalized log wins even over a longer in-progress
|
||||||
|
assertEquals(-1, INSTANCE.compare(INPROGRESS_1_4, FINALIZED_1_3));
|
||||||
|
assertEquals(1, INSTANCE.compare(FINALIZED_1_3, INPROGRESS_1_4));
|
||||||
|
|
||||||
|
// Finalized log wins even if the in-progress one has an accepted
|
||||||
|
// recovery proposal.
|
||||||
|
assertEquals(-1, INSTANCE.compare(INPROGRESS_1_4_ACCEPTED, FINALIZED_1_3));
|
||||||
|
assertEquals(1, INSTANCE.compare(FINALIZED_1_3, INPROGRESS_1_4_ACCEPTED));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue