HDFS-3906. QJM: quorum timeout on failover with large log segment. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1383251 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-09-11 06:31:42 +00:00
parent c859e87d1e
commit 8a8c9c18d3
8 changed files with 160 additions and 80 deletions

View File

@ -66,3 +66,5 @@ HDFS-3899. QJM: Add client-side metrics (todd)
HDFS-3914. QJM: acceptRecovery should abort current segment (todd)
HDFS-3915. QJM: Failover fails with auth error in secure cluster (todd)
HDFS-3906. QJM: quorum timeout on failover with large log segment (todd)

View File

@ -413,10 +413,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_KEY = "dfs.qjournal.accept-recovery.timeout.ms";
public static final String DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.finalize-segment.timeout.ms";
public static final String DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY = "dfs.qjournal.select-input-streams.timeout.ms";
public static final String DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY = "dfs.qjournal.get-journal-state.timeout.ms";
public static final String DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY = "dfs.qjournal.new-epoch.timeout.ms";
public static final int DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000;
public static final int DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 20000;
public static final int DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 60000;
public static final int DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT = 20000;
public static final int DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 120000;
public static final int DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 120000;
public static final int DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT = 120000;
public static final int DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT = 20000;
public static final int DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT = 120000;
public static final int DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT = 120000;
}

View File

@ -52,8 +52,6 @@ import com.google.common.util.concurrent.SettableFuture;
class AsyncLoggerSet {
static final Log LOG = LogFactory.getLog(AsyncLoggerSet.class);
private static final int NEWEPOCH_TIMEOUT_MS = 10000;
private final List<AsyncLogger> loggers;
private static final long INVALID_EPOCH = -1;
@ -63,44 +61,15 @@ class AsyncLoggerSet {
this.loggers = ImmutableList.copyOf(loggers);
}
/**
* Fence any previous writers, and obtain a unique epoch number
* for write-access to the journal nodes.
*
* @param nsInfo the expected namespace information. If the remote
* node does not match with this namespace, the request will be rejected.
* @return the new, unique epoch number
* @throws IOException
*/
Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch(
NamespaceInfo nsInfo) throws IOException {
Preconditions.checkState(myEpoch == -1,
"epoch already created: epoch=" + myEpoch);
Map<AsyncLogger, GetJournalStateResponseProto> lastPromises =
waitForWriteQuorum(getJournalState(), NEWEPOCH_TIMEOUT_MS);
long maxPromised = Long.MIN_VALUE;
for (GetJournalStateResponseProto resp : lastPromises.values()) {
maxPromised = Math.max(maxPromised, resp.getLastPromisedEpoch());
}
assert maxPromised >= 0;
long myEpoch = maxPromised + 1;
Map<AsyncLogger, NewEpochResponseProto> resps =
waitForWriteQuorum(newEpoch(nsInfo, myEpoch), NEWEPOCH_TIMEOUT_MS);
this.myEpoch = myEpoch;
setEpoch(myEpoch);
return resps;
}
private void setEpoch(long e) {
void setEpoch(long e) {
Preconditions.checkState(!isEpochEstablished(),
"Epoch already established: epoch=%s", myEpoch);
myEpoch = e;
for (AsyncLogger l : loggers) {
l.setEpoch(e);
}
}
/**
* Set the highest successfully committed txid seen by the writer.
* This should be called after a successful write to a quorum, and is used
@ -112,6 +81,13 @@ class AsyncLoggerSet {
}
}
/**
* @return true if an epoch has been established.
*/
boolean isEpochEstablished() {
return myEpoch != INVALID_EPOCH;
}
/**
* @return the epoch number for this writer. This may only be called after
* a successful call to {@link #createNewUniqueEpoch(NamespaceInfo)}.
@ -143,19 +119,20 @@ class AsyncLoggerSet {
* can't be achieved, throws a QuorumException.
* @param q the quorum call
* @param timeoutMs the number of millis to wait
* @param operationName textual description of the operation, for logging
* @return a map of successful results
* @throws QuorumException if a quorum doesn't respond with success
* @throws IOException if the thread is interrupted or times out
*/
<V> Map<AsyncLogger, V> waitForWriteQuorum(QuorumCall<AsyncLogger, V> q,
int timeoutMs) throws IOException {
int timeoutMs, String operationName) throws IOException {
int majority = getMajoritySize();
try {
q.waitFor(
loggers.size(), // either all respond
majority, // or we get a majority successes
majority, // or we get a majority failures,
timeoutMs);
timeoutMs, operationName);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted waiting " + timeoutMs + "ms for a " +
@ -227,7 +204,7 @@ class AsyncLoggerSet {
// in a QuorumCall.
///////////////////////////////////////////////////////////////////////////
private QuorumCall<AsyncLogger, GetJournalStateResponseProto> getJournalState() {
public QuorumCall<AsyncLogger, GetJournalStateResponseProto> getJournalState() {
Map<AsyncLogger, ListenableFuture<GetJournalStateResponseProto>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
@ -266,7 +243,7 @@ class AsyncLoggerSet {
return QuorumCall.create(calls);
}
private QuorumCall<AsyncLogger,NewEpochResponseProto> newEpoch(
public QuorumCall<AsyncLogger,NewEpochResponseProto> newEpoch(
NamespaceInfo nsInfo,
long epoch) {
Map<AsyncLogger, ListenableFuture<NewEpochResponseProto>> calls =

View File

@ -40,6 +40,23 @@ class QuorumCall<KEY, RESULT> {
private final Map<KEY, RESULT> successes = Maps.newHashMap();
private final Map<KEY, Throwable> exceptions = Maps.newHashMap();
/**
* Interval, in milliseconds, at which a log message will be made
* while waiting for a quorum call.
*/
private static final int WAIT_PROGRESS_INTERVAL_MILLIS = 1000;
/**
* Start logging messages at INFO level periodically after waiting for
* this fraction of the configured timeout for any call.
*/
private static final float WAIT_PROGRESS_INFO_THRESHOLD = 0.3f;
/**
* Start logging messages at WARN level after waiting for this
* fraction of the configured timeout for any call.
*/
private static final float WAIT_PROGRESS_WARN_THRESHOLD = 0.7f;
static <KEY, RESULT> QuorumCall<KEY, RESULT> create(
Map<KEY, ? extends ListenableFuture<RESULT>> calls) {
final QuorumCall<KEY, RESULT> qr = new QuorumCall<KEY, RESULT>();
@ -85,17 +102,35 @@ class QuorumCall<KEY, RESULT> {
*/
public synchronized void waitFor(
int minResponses, int minSuccesses, int maxExceptions,
int millis)
int millis, String operationName)
throws InterruptedException, TimeoutException {
long et = Time.monotonicNow() + millis;
long st = Time.monotonicNow();
long nextLogTime = st + (long)(millis * WAIT_PROGRESS_INFO_THRESHOLD);
long et = st + millis;
while (true) {
if (minResponses > 0 && countResponses() >= minResponses) return;
if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return;
if (maxExceptions >= 0 && countExceptions() > maxExceptions) return;
long rem = et - Time.monotonicNow();
long now = Time.monotonicNow();
if (now > nextLogTime) {
long waited = now - st;
String msg = String.format(
"Waited %s ms (timeout=%s ms) for a response for %s",
waited, millis, operationName);
if (waited > millis * WAIT_PROGRESS_WARN_THRESHOLD) {
QuorumJournalManager.LOG.warn(msg);
} else {
QuorumJournalManager.LOG.info(msg);
}
nextLogTime = now + WAIT_PROGRESS_INTERVAL_MILLIS;
}
long rem = et - now;
if (rem <= 0) {
throw new TimeoutException();
}
rem = Math.min(rem, nextLogTime - now);
rem = Math.max(rem, 1);
wait(rem);
}
}

View File

@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
@ -52,6 +53,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.TextFormat;
/**
* A JournalManager that writes to a set of remote JournalNodes,
@ -67,6 +69,8 @@ public class QuorumJournalManager implements JournalManager {
private final int acceptRecoveryTimeoutMs;
private final int finalizeSegmentTimeoutMs;
private final int selectInputStreamsTimeoutMs;
private final int getJournalStateTimeoutMs;
private final int newEpochTimeoutMs;
// Since these don't occur during normal operation, we can
// use rather lengthy timeouts, and don't need to make them
@ -112,6 +116,13 @@ public class QuorumJournalManager implements JournalManager {
this.selectInputStreamsTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY,
DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT);
this.getJournalStateTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY,
DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT);
this.newEpochTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY,
DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT);
}
@ -138,11 +149,43 @@ public class QuorumJournalManager implements JournalManager {
"bad journal id: " + jid);
}
/**
* Fence any previous writers, and obtain a unique epoch number
* for write-access to the journal nodes.
*
* @return the new, unique epoch number
*/
Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch()
throws IOException {
Preconditions.checkState(!loggers.isEpochEstablished(),
"epoch already created");
Map<AsyncLogger, GetJournalStateResponseProto> lastPromises =
loggers.waitForWriteQuorum(loggers.getJournalState(),
getJournalStateTimeoutMs, "getJournalState()");
long maxPromised = Long.MIN_VALUE;
for (GetJournalStateResponseProto resp : lastPromises.values()) {
maxPromised = Math.max(maxPromised, resp.getLastPromisedEpoch());
}
assert maxPromised >= 0;
long myEpoch = maxPromised + 1;
Map<AsyncLogger, NewEpochResponseProto> resps =
loggers.waitForWriteQuorum(loggers.newEpoch(nsInfo, myEpoch),
newEpochTimeoutMs, "newEpoch(" + myEpoch + ")");
loggers.setEpoch(myEpoch);
return resps;
}
@Override
public void format(NamespaceInfo nsInfo) throws IOException {
QuorumCall<AsyncLogger,Void> call = loggers.format(nsInfo);
try {
call.waitFor(loggers.size(), loggers.size(), 0, FORMAT_TIMEOUT_MS);
call.waitFor(loggers.size(), loggers.size(), 0, FORMAT_TIMEOUT_MS,
"format");
} catch (InterruptedException e) {
throw new IOException("Interrupted waiting for format() response");
} catch (TimeoutException e) {
@ -160,7 +203,7 @@ public class QuorumJournalManager implements JournalManager {
loggers.isFormatted();
try {
call.waitFor(loggers.size(), 0, 0, HASDATA_TIMEOUT_MS);
call.waitFor(loggers.size(), 0, 0, HASDATA_TIMEOUT_MS, "hasSomeData");
} catch (InterruptedException e) {
throw new IOException("Interrupted while determining if JNs have data");
} catch (TimeoutException e) {
@ -206,7 +249,8 @@ public class QuorumJournalManager implements JournalManager {
QuorumCall<AsyncLogger,PrepareRecoveryResponseProto> prepare =
loggers.prepareRecovery(segmentTxId);
Map<AsyncLogger, PrepareRecoveryResponseProto> prepareResponses=
loggers.waitForWriteQuorum(prepare, prepareRecoveryTimeoutMs);
loggers.waitForWriteQuorum(prepare, prepareRecoveryTimeoutMs,
"prepareRecovery(" + segmentTxId + ")");
LOG.info("Recovery prepare phase complete. Responses:\n" +
QuorumCall.mapToString(prepareResponses));
@ -283,7 +327,8 @@ public class QuorumJournalManager implements JournalManager {
URL syncFromUrl = bestLogger.buildURLToFetchLogs(segmentTxId);
QuorumCall<AsyncLogger,Void> accept = loggers.acceptRecovery(logToSync, syncFromUrl);
loggers.waitForWriteQuorum(accept, acceptRecoveryTimeoutMs);
loggers.waitForWriteQuorum(accept, acceptRecoveryTimeoutMs,
"acceptRecovery(" + TextFormat.shortDebugString(logToSync) + ")");
// TODO:
// we should only try to finalize loggers who successfully synced above
@ -292,7 +337,10 @@ public class QuorumJournalManager implements JournalManager {
QuorumCall<AsyncLogger, Void> finalize =
loggers.finalizeLogSegment(logToSync.getStartTxId(), logToSync.getEndTxId());
loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs);
loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs,
String.format("finalizeLogSegment(%s-%s)",
logToSync.getStartTxId(),
logToSync.getEndTxId()));
}
static List<AsyncLogger> createLoggers(Configuration conf,
@ -336,7 +384,8 @@ public class QuorumJournalManager implements JournalManager {
Preconditions.checkState(isActiveWriter,
"must recover segments before starting a new one");
QuorumCall<AsyncLogger,Void> q = loggers.startLogSegment(txId);
loggers.waitForWriteQuorum(q, startSegmentTimeoutMs);
loggers.waitForWriteQuorum(q, startSegmentTimeoutMs,
"startLogSegment(" + txId + ")");
return new QuorumOutputStream(loggers, txId);
}
@ -345,7 +394,8 @@ public class QuorumJournalManager implements JournalManager {
throws IOException {
QuorumCall<AsyncLogger,Void> q = loggers.finalizeLogSegment(
firstTxId, lastTxId);
loggers.waitForWriteQuorum(q, finalizeSegmentTimeoutMs);
loggers.waitForWriteQuorum(q, finalizeSegmentTimeoutMs,
String.format("finalizeLogSegment(%s-%s)", firstTxId, lastTxId));
}
@Override
@ -366,8 +416,7 @@ public class QuorumJournalManager implements JournalManager {
public void recoverUnfinalizedSegments() throws IOException {
Preconditions.checkState(!isActiveWriter, "already active writer");
Map<AsyncLogger, NewEpochResponseProto> resps =
loggers.createNewUniqueEpoch(nsInfo);
Map<AsyncLogger, NewEpochResponseProto> resps = createNewUniqueEpoch();
LOG.info("newEpoch(" + loggers.getEpoch() + ") responses:\n" +
QuorumCall.mapToString(resps));
@ -399,7 +448,8 @@ public class QuorumJournalManager implements JournalManager {
QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
loggers.getEditLogManifest(fromTxnId);
Map<AsyncLogger, RemoteEditLogManifest> resps =
loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs);
loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
"selectInputStreams");
LOG.debug("selectInputStream manifests:\n" +
Joiner.on("\n").withKeyValueSeparator(": ").join(resps));

View File

@ -101,7 +101,7 @@ class QuorumOutputStream extends EditLogOutputStream {
QuorumCall<AsyncLogger, Void> qcall = loggers.sendEdits(
segmentTxId, firstTxToFlush,
numReadyTxns, data);
loggers.waitForWriteQuorum(qcall, 20000); // TODO: configurable timeout
loggers.waitForWriteQuorum(qcall, 20000, "sendEdits"); // TODO: configurable timeout
// Since we successfully wrote this batch, let the loggers know. Any future
// RPCs will thus let the loggers know of the most recent transaction, even

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.qjournal.client;
import static org.junit.Assert.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.List;
import java.util.Random;
@ -56,35 +57,43 @@ public class TestEpochsAreUnique {
URI uri = cluster.getQuorumJournalURI(JID);
QuorumJournalManager qjm = new QuorumJournalManager(
conf, uri, FAKE_NSINFO);
qjm.format(FAKE_NSINFO);
try {
qjm.format(FAKE_NSINFO);
} finally {
qjm.close();
}
try {
// With no failures or contention, epochs should increase one-by-one
for (int i = 0; i < 5; i++) {
AsyncLoggerSet als = new AsyncLoggerSet(
QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO,
IPCLoggerChannel.FACTORY));
als.createNewUniqueEpoch(FAKE_NSINFO);
assertEquals(i + 1, als.getEpoch());
qjm = new QuorumJournalManager(
conf, uri, FAKE_NSINFO);
try {
qjm.createNewUniqueEpoch();
assertEquals(i + 1, qjm.getLoggerSetForTests().getEpoch());
} finally {
qjm.close();
}
}
long prevEpoch = 5;
// With some failures injected, it should still always increase, perhaps
// skipping some
for (int i = 0; i < 20; i++) {
AsyncLoggerSet als = new AsyncLoggerSet(
makeFaulty(QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO,
IPCLoggerChannel.FACTORY)));
long newEpoch = -1;
while (true) {
qjm = new QuorumJournalManager(
conf, uri, FAKE_NSINFO, new FaultyLoggerFactory());
try {
als.createNewUniqueEpoch(FAKE_NSINFO);
newEpoch = als.getEpoch();
qjm.createNewUniqueEpoch();
newEpoch = qjm.getLoggerSetForTests().getEpoch();
break;
} catch (IOException ioe) {
// It's OK to fail to create an epoch, since we randomly inject
// faults. It's possible we'll inject faults in too many of the
// underlying nodes, and a failure is expected in that case
} finally {
qjm.close();
}
}
LOG.info("Created epoch " + newEpoch);
@ -97,18 +106,21 @@ public class TestEpochsAreUnique {
}
}
private List<AsyncLogger> makeFaulty(List<AsyncLogger> loggers) {
List<AsyncLogger> ret = Lists.newArrayList();
for (AsyncLogger l : loggers) {
AsyncLogger spy = Mockito.spy(l);
private class FaultyLoggerFactory implements AsyncLogger.Factory {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) {
AsyncLogger ch = IPCLoggerChannel.FACTORY.createLogger(
conf, nsInfo, journalId, addr);
AsyncLogger spy = Mockito.spy(ch);
Mockito.doAnswer(new SometimesFaulty<Long>(0.10f))
.when(spy).getJournalState();
Mockito.doAnswer(new SometimesFaulty<Void>(0.40f))
.when(spy).newEpoch(Mockito.anyLong());
ret.add(spy);
return spy;
}
return ret;
}
private class SometimesFaulty<T> implements Answer<ListenableFuture<T>> {

View File

@ -42,8 +42,8 @@ public class TestQuorumCall {
assertEquals(0, q.countResponses());
futures.get("f1").set("first future");
q.waitFor(1, 0, 0, 100000); // wait for 1 response
q.waitFor(0, 1, 0, 100000); // wait for 1 success
q.waitFor(1, 0, 0, 100000, "test"); // wait for 1 response
q.waitFor(0, 1, 0, 100000, "test"); // wait for 1 success
assertEquals(1, q.countResponses());
@ -51,8 +51,8 @@ public class TestQuorumCall {
assertEquals(2, q.countResponses());
futures.get("f3").set("second future");
q.waitFor(3, 0, 100, 100000); // wait for 3 responses
q.waitFor(0, 2, 100, 100000); // 2 successes
q.waitFor(3, 0, 100, 100000, "test"); // wait for 3 responses
q.waitFor(0, 2, 100, 100000, "test"); // 2 successes
assertEquals(3, q.countResponses());
assertEquals("f1=first future,f3=second future",
@ -60,7 +60,7 @@ public class TestQuorumCall {
new TreeMap<String, String>(q.getResults())));
try {
q.waitFor(0, 4, 100, 10);
q.waitFor(0, 4, 100, 10, "test");
fail("Didn't time out waiting for more responses than came back");
} catch (TimeoutException te) {
// expected