HDFS-3955. QJM: Make acceptRecovery() atomic. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1387706 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-09-19 18:57:11 +00:00
parent 663e7484c0
commit 3ccd905d8a
8 changed files with 322 additions and 71 deletions

View File

@ -78,3 +78,5 @@ HDFS-3926. QJM: Add user documentation for QJM. (atm)
HDFS-3943. QJM: remove currently-unused md5sum field (todd)
HDFS-3950. QJM: misc TODO cleanup, improved log messages, etc. (todd)
HDFS-3955. QJM: Make acceptRecovery() atomic. (todd)

View File

@ -21,6 +21,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Time;
import com.google.common.base.Preconditions;
@ -108,6 +109,7 @@ class QuorumCall<KEY, RESULT> {
long nextLogTime = st + (long)(millis * WAIT_PROGRESS_INFO_THRESHOLD);
long et = st + millis;
while (true) {
checkAssertionErrors();
if (minResponses > 0 && countResponses() >= minResponses) return;
if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return;
if (maxExceptions >= 0 && countExceptions() > maxExceptions) return;
@ -135,6 +137,33 @@ class QuorumCall<KEY, RESULT> {
}
}
/**
* Check if any of the responses came back with an AssertionError.
* If so, it re-throws it, even if there was a quorum of responses.
* This code only runs if assertions are enabled for this class,
* otherwise it should JIT itself away.
*
* This is done since AssertionError indicates programmer confusion
* rather than some kind of expected issue, and thus in the context
* of test cases we'd like to actually fail the test case instead of
* continuing through.
*/
private synchronized void checkAssertionErrors() {
boolean assertsEnabled = false;
assert assertsEnabled = true; // sets to true if enabled
if (assertsEnabled) {
for (Throwable t : exceptions.values()) {
if (t instanceof AssertionError) {
throw (AssertionError)t;
} else if (t instanceof RemoteException &&
((RemoteException)t).getClassName().equals(
AssertionError.class.getName())) {
throw new AssertionError(t);
}
}
}
}
private synchronized void addResult(KEY k, RESULT res) {
successes.put(k, res);
notifyAll();

View File

@ -86,6 +86,19 @@ class JNStorage extends Storage {
return new File(sd.getCurrentDir(),
NNStorage.getInProgressEditsFileName(startTxId));
}
/**
* @param segmentTxId the first txid of the segment
* @param epoch the epoch number of the writer which is coordinating
* recovery
* @return the temporary path in which an edits log should be stored
* while it is being downloaded from a remote JournalNode
*/
File getSyncLogTemporaryFile(long segmentTxId, long epoch) {
String name = NNStorage.getInProgressEditsFileName(segmentTxId) +
".epoch=" + epoch;
return new File(sd.getCurrentDir(), name);
}
/**
* @return the path for the file which contains persisted data for the

View File

@ -693,10 +693,11 @@ class Journal implements Closeable {
PrepareRecoveryResponseProto.Builder builder =
PrepareRecoveryResponseProto.newBuilder();
PersistedRecoveryPaxosData previouslyAccepted = getPersistedPaxosData(segmentTxId);
completeHalfDoneAcceptRecovery(previouslyAccepted);
SegmentStateProto segInfo = getSegmentInfo(segmentTxId);
boolean hasFinalizedSegment = segInfo != null && !segInfo.getIsInProgress();
PersistedRecoveryPaxosData previouslyAccepted = getPersistedPaxosData(segmentTxId);
if (previouslyAccepted != null && !hasFinalizedSegment) {
SegmentStateProto acceptedState = previouslyAccepted.getSegmentState();
@ -722,7 +723,7 @@ class Journal implements Closeable {
TextFormat.shortDebugString(resp));
return resp;
}
/**
* @see QJournalProtocol#acceptRecovery(RequestInfo, SegmentStateProto, URL)
*/
@ -757,7 +758,9 @@ class Journal implements Closeable {
"Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: %s\n",
oldData, newData);
}
File syncedFile = null;
SegmentStateProto currentSegment = getSegmentInfo(segmentTxId);
if (currentSegment == null ||
currentSegment.getEndTxId() != segment.getEndTxId()) {
@ -799,7 +802,7 @@ class Journal implements Closeable {
highestWrittenTxId = segment.getEndTxId();
}
}
syncLog(reqInfo, segment, fromUrl);
syncedFile = syncLog(reqInfo, segment, fromUrl);
} else {
LOG.info("Skipping download of log " +
@ -807,10 +810,34 @@ class Journal implements Closeable {
": already have up-to-date logs");
}
// TODO: is it OK that this is non-atomic?
// we might be left with an older epoch recorded, but a newer log
// This is one of the few places in the protocol where we have a single
// RPC that results in two distinct actions:
//
// - 1) Downloads the new log segment data (above)
// - 2) Records the new Paxos data about the synchronized segment (below)
//
// These need to be treated as a transaction from the perspective
// of any external process. We do this by treating the persistPaxosData()
// success as the "commit" of an atomic transaction. If we fail before
// this point, the downloaded edit log will only exist at a temporary
// path, and thus not change any externally visible state. If we fail
// after this point, then any future prepareRecovery() call will see
// the Paxos data, and by calling completeHalfDoneAcceptRecovery() will
// roll forward the rename of the referenced log file.
//
// See also: HDFS-3955
//
// The fault points here are exercised by the randomized fault injection
// test case to ensure that this atomic "transaction" operates correctly.
JournalFaultInjector.get().beforePersistPaxosData();
persistPaxosData(segmentTxId, newData);
JournalFaultInjector.get().afterPersistPaxosData();
if (syncedFile != null) {
FileUtil.replaceFile(syncedFile,
storage.getInProgressEditLog(segmentTxId));
}
LOG.info("Accepted recovery for segment " + segmentTxId + ": " +
TextFormat.shortDebugString(newData));
}
@ -822,21 +849,17 @@ class Journal implements Closeable {
}
/**
* Synchronize a log segment from another JournalNode.
* @param reqInfo the request info for the recovery IPC
* @param segment
* @param url
* @throws IOException
* Synchronize a log segment from another JournalNode. The log is
* downloaded from the provided URL into a temporary location on disk,
* which is named based on the current request's epoch.
*
* @return the temporary location of the downloaded file
*/
private void syncLog(RequestInfo reqInfo,
private File syncLog(RequestInfo reqInfo,
final SegmentStateProto segment, final URL url) throws IOException {
String tmpFileName =
"synclog_" + segment.getStartTxId() + "_" +
reqInfo.getEpoch() + "." + reqInfo.getIpcSerialNumber();
final List<File> localPaths = storage.getFiles(null, tmpFileName);
assert localPaths.size() == 1;
final File tmpFile = localPaths.get(0);
final File tmpFile = storage.getSyncLogTemporaryFile(
segment.getStartTxId(), reqInfo.getEpoch());
final List<File> localPaths = ImmutableList.of(tmpFile);
LOG.info("Synchronizing log " +
TextFormat.shortDebugString(segment) + " from " + url);
@ -844,12 +867,11 @@ class Journal implements Closeable {
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException {
TransferFsImage.doGetUrl(url, localPaths, storage, true);
assert tmpFile.exists();
boolean success = false;
try {
success = tmpFile.renameTo(storage.getInProgressEditLog(
segment.getStartTxId()));
TransferFsImage.doGetUrl(url, localPaths, storage, true);
assert tmpFile.exists();
success = true;
} finally {
if (!success) {
if (!tmpFile.delete()) {
@ -860,6 +882,41 @@ class Journal implements Closeable {
return null;
}
});
return tmpFile;
}
/**
* In the case the node crashes in between downloading a log segment
* and persisting the associated paxos recovery data, the log segment
* will be left in its temporary location on disk. Given the paxos data,
* we can check if this was indeed the case, and &quot;roll forward&quot;
* the atomic operation.
*
* See the inline comments in
* {@link #acceptRecovery(RequestInfo, SegmentStateProto, URL)} for more
* details.
*
* @throws IOException if the temporary file is unable to be renamed into
* place
*/
private void completeHalfDoneAcceptRecovery(
PersistedRecoveryPaxosData paxosData) throws IOException {
if (paxosData == null) {
return;
}
long segmentId = paxosData.getSegmentState().getStartTxId();
long epoch = paxosData.getAcceptedInEpoch();
File tmp = storage.getSyncLogTemporaryFile(segmentId, epoch);
if (tmp.exists()) {
File dst = storage.getInProgressEditLog(segmentId);
LOG.info("Rolling forward previously half-completed synchronization: " +
tmp + " -> " + dst);
FileUtil.replaceFile(tmp, dst);
}
}
/**

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.qjournal.server;
import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Used for injecting faults in QuorumJournalManager tests.
* Calls into this are a no-op in production code.
*/
@VisibleForTesting
@InterfaceAudience.Private
public class JournalFaultInjector {
public static JournalFaultInjector instance = new JournalFaultInjector();
public static JournalFaultInjector get() {
return instance;
}
public void beforePersistPaxosData() throws IOException {}
public void afterPersistPaxosData() throws IOException {}
}

View File

@ -60,7 +60,7 @@ public abstract class QJMTestUtil {
return Arrays.copyOf(buf.getData(), buf.getLength());
}
public static void writeSegment(MiniJournalCluster cluster,
public static EditLogOutputStream writeSegment(MiniJournalCluster cluster,
QuorumJournalManager qjm, long startTxId, int numTxns,
boolean finalize) throws IOException {
EditLogOutputStream stm = qjm.startLogSegment(startTxId);
@ -72,6 +72,9 @@ public abstract class QJMTestUtil {
if (finalize) {
stm.close();
qjm.finalizeLogSegment(startTxId, startTxId + numTxns - 1);
return null;
} else {
return stm;
}
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@ -84,6 +85,10 @@ public class TestQJMWithFaults {
EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
}
// Set up fault injection mock.
private static JournalFaultInjector faultInjector =
JournalFaultInjector.instance = Mockito.mock(JournalFaultInjector.class);
/**
* Run through the creation of a log without any faults injected,
* and count how many RPCs are made to each node. This sets the
@ -238,7 +243,7 @@ public class TestQJMWithFaults {
recovered = QJMTestUtil.recoverAndReturnLastTxn(qjm);
} catch (Throwable t) {
LOG.info("Failed recovery", t);
GenericTestUtils.assertExceptionContains("faking being down", t);
checkException(t);
continue;
}
assertTrue("Recovered only up to txnid " + recovered +
@ -252,8 +257,7 @@ public class TestQJMWithFaults {
lastAcked = writeSegmentUntilCrash(cluster, qjm, txid, 4, thrown);
if (thrown.held != null) {
LOG.info("Failed write", thrown.held);
GenericTestUtils.assertExceptionContains("faking being down",
thrown.held);
checkException(thrown.held);
break;
}
txid += 4;
@ -267,6 +271,14 @@ public class TestQJMWithFaults {
}
}
private void checkException(Throwable t) {
GenericTestUtils.assertExceptionContains("Injected", t);
if (t.toString().contains("AssertionError")) {
throw new RuntimeException("Should never see AssertionError in fault test!",
t);
}
}
private long writeSegmentUntilCrash(MiniJournalCluster cluster,
QuorumJournalManager qjm, long txid, int numTxns, Holder<Throwable> thrown) {
@ -344,6 +356,23 @@ public class TestQJMWithFaults {
if (!isUp) {
throw new IOException("Injected - faking being down");
}
if (invocation.getMethod().getName().equals("acceptRecovery")) {
if (random.nextFloat() < injectionProbability) {
Mockito.doThrow(new IOException(
"Injected - faking fault before persisting paxos data"))
.when(faultInjector).beforePersistPaxosData();
} else if (random.nextFloat() < injectionProbability) {
Mockito.doThrow(new IOException(
"Injected - faking fault after persisting paxos data"))
.when(faultInjector).afterPersistPaxosData();
}
}
}
@Override
public void afterCall(InvocationOnMock invocation, boolean succeeded) {
Mockito.reset(faultInjector);
}
});
}
@ -432,16 +461,21 @@ public class TestQJMWithFaults {
invocation.getMethod().getDeclaringClass())) {
beforeCall(invocation);
}
boolean success = false;
try {
return (T) invocation.getMethod().invoke(realObj,
T ret = (T) invocation.getMethod().invoke(realObj,
invocation.getArguments());
success = true;
return ret;
} catch (InvocationTargetException ite) {
throw ite.getCause();
} finally {
afterCall(invocation, success);
}
}
abstract void beforeCall(InvocationOnMock invocation) throws Exception;
void afterCall(InvocationOnMock invocation, boolean succeeded) {}
}
private static QuorumJournalManager createInjectableQJM(MiniJournalCluster cluster)

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
@ -56,6 +57,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Stubber;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
@ -399,26 +401,7 @@ public class TestQuorumJournalManager {
private void doOutOfSyncTest(int missingOnRecoveryIdx,
long expectedRecoveryTxnId) throws Exception {
EditLogOutputStream stm = qjm.startLogSegment(1);
failLoggerAtTxn(spies.get(0), 4);
failLoggerAtTxn(spies.get(1), 5);
writeTxns(stm, 1, 3);
// This should succeed to 2/3 loggers
writeTxns(stm, 4, 1);
// This should only succeed to 1 logger (index 2). Hence it should
// fail
try {
writeTxns(stm, 5, 1);
fail("Did not fail to write when only a minority succeeded");
} catch (QuorumException qe) {
GenericTestUtils.assertExceptionContains(
"too many exceptions to achieve quorum size 2/3",
qe);
}
setupLoggers345();
QJMTestUtil.assertExistsInQuorum(cluster,
NNStorage.getInProgressEditsFileName(1));
@ -503,26 +486,7 @@ public class TestQuorumJournalManager {
*/
@Test
public void testRecoverAfterIncompleteRecovery() throws Exception {
EditLogOutputStream stm = qjm.startLogSegment(1);
failLoggerAtTxn(spies.get(0), 4);
failLoggerAtTxn(spies.get(1), 5);
writeTxns(stm, 1, 3);
// This should succeed to 2/3 loggers
writeTxns(stm, 4, 1);
// This should only succeed to 1 logger (index 2). Hence it should
// fail
try {
writeTxns(stm, 5, 1);
fail("Did not fail to write when only a minority succeeded");
} catch (QuorumException qe) {
GenericTestUtils.assertExceptionContains(
"too many exceptions to achieve quorum size 2/3",
qe);
}
setupLoggers345();
// Shut down the logger that has length = 5
cluster.getJournalNode(2).stopAndJoin(0);
@ -554,6 +518,37 @@ public class TestQuorumJournalManager {
checkRecovery(cluster, 1, 4);
}
/**
* Set up the loggers into the following state:
* - JN0: edits 1-3 in progress
* - JN1: edits 1-4 in progress
* - JN2: edits 1-5 in progress
*
* None of the loggers have any associated paxos info.
*/
private void setupLoggers345() throws Exception {
EditLogOutputStream stm = qjm.startLogSegment(1);
failLoggerAtTxn(spies.get(0), 4);
failLoggerAtTxn(spies.get(1), 5);
writeTxns(stm, 1, 3);
// This should succeed to 2/3 loggers
writeTxns(stm, 4, 1);
// This should only succeed to 1 logger (index 2). Hence it should
// fail
try {
writeTxns(stm, 5, 1);
fail("Did not fail to write when only a minority succeeded");
} catch (QuorumException qe) {
GenericTestUtils.assertExceptionContains(
"too many exceptions to achieve quorum size 2/3",
qe);
}
}
/**
* Set up the following tricky edge case state which is used by
* multiple tests:
@ -760,6 +755,83 @@ public class TestQuorumJournalManager {
}
}
@Test(timeout=20000)
public void testCrashBetweenSyncLogAndPersistPaxosData() throws Exception {
JournalFaultInjector faultInjector =
JournalFaultInjector.instance = Mockito.mock(JournalFaultInjector.class);
setupLoggers345();
// Run recovery where the client only talks to JN0, JN1, such that it
// decides that the correct length is through txid 4.
// Only allow it to call acceptRecovery() on JN0.
qjm = createSpyingQJM();
spies = qjm.getLoggerSetForTests().getLoggersForTests();
cluster.getJournalNode(2).stopAndJoin(0);
injectIOE().when(spies.get(1)).acceptRecovery(
Mockito.<SegmentStateProto>any(), Mockito.<URL>any());
tryRecoveryExpectingFailure();
cluster.restartJournalNode(2);
// State at this point:
// JN0: edit log for 1-4, paxos recovery data for txid 4
// JN1: edit log for 1-4,
// JN2: edit log for 1-5
// Run recovery again, but don't allow JN0 to respond to the
// prepareRecovery() call. This will cause recovery to decide
// on txid 5.
// Additionally, crash all of the nodes before they persist
// any new paxos data.
qjm = createSpyingQJM();
spies = qjm.getLoggerSetForTests().getLoggersForTests();
injectIOE().when(spies.get(0)).prepareRecovery(Mockito.eq(1L));
Mockito.doThrow(new IOException("Injected")).when(faultInjector)
.beforePersistPaxosData();
tryRecoveryExpectingFailure();
Mockito.reset(faultInjector);
// State at this point:
// JN0: edit log for 1-5, paxos recovery data for txid 4
// !!! This is the interesting bit, above. The on-disk data and the
// paxos data don't match up!
// JN1: edit log for 1-5,
// JN2: edit log for 1-5,
// Now, stop JN2, and see if we can still start up even though
// JN0 is in a strange state where its log data is actually newer
// than its accepted Paxos state.
cluster.getJournalNode(2).stopAndJoin(0);
qjm = createSpyingQJM();
try {
long recovered = QJMTestUtil.recoverAndReturnLastTxn(qjm);
assertTrue(recovered >= 4); // 4 was committed to a quorum
} finally {
qjm.close();
}
}
private void tryRecoveryExpectingFailure() throws IOException {
try {
QJMTestUtil.recoverAndReturnLastTxn(qjm);
fail("Expected to fail recovery");
} catch (QuorumException qe) {
GenericTestUtils.assertExceptionContains("Injected", qe);
} finally {
qjm.close();
}
}
private Stubber injectIOE() {
return futureThrows(new IOException("Injected"));
}
@Test
public void testPurgeLogs() throws Exception {
for (int txid = 1; txid <= 5; txid++) {