HDFS-10733. NameNode terminated after full GC thinking QJM is unresponsive. Contributed by Vinitha Gankidi.
This commit is contained in:
parent
db47bd15dd
commit
e2b3eff641
|
@ -20,8 +20,10 @@ package org.apache.hadoop.hdfs.qjournal.client;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.util.StopWatch;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
|
@ -58,6 +60,7 @@ class QuorumCall<KEY, RESULT> {
|
||||||
* fraction of the configured timeout for any call.
|
* fraction of the configured timeout for any call.
|
||||||
*/
|
*/
|
||||||
private static final float WAIT_PROGRESS_WARN_THRESHOLD = 0.7f;
|
private static final float WAIT_PROGRESS_WARN_THRESHOLD = 0.7f;
|
||||||
|
private final StopWatch quorumStopWatch = new StopWatch();
|
||||||
|
|
||||||
static <KEY, RESULT> QuorumCall<KEY, RESULT> create(
|
static <KEY, RESULT> QuorumCall<KEY, RESULT> create(
|
||||||
Map<KEY, ? extends ListenableFuture<RESULT>> calls) {
|
Map<KEY, ? extends ListenableFuture<RESULT>> calls) {
|
||||||
|
@ -83,6 +86,16 @@ class QuorumCall<KEY, RESULT> {
|
||||||
private QuorumCall() {
|
private QuorumCall() {
|
||||||
// Only instantiated from factory method above
|
// Only instantiated from factory method above
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void restartQuorumStopWatch() {
|
||||||
|
quorumStopWatch.reset().start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean shouldIncreaseQuorumTimeout(long offset, int millis) {
|
||||||
|
long elapsed = quorumStopWatch.now(TimeUnit.MILLISECONDS);
|
||||||
|
return elapsed + offset > (millis * WAIT_PROGRESS_INFO_THRESHOLD);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wait for the quorum to achieve a certain number of responses.
|
* Wait for the quorum to achieve a certain number of responses.
|
||||||
|
@ -110,6 +123,7 @@ class QuorumCall<KEY, RESULT> {
|
||||||
long nextLogTime = st + (long)(millis * WAIT_PROGRESS_INFO_THRESHOLD);
|
long nextLogTime = st + (long)(millis * WAIT_PROGRESS_INFO_THRESHOLD);
|
||||||
long et = st + millis;
|
long et = st + millis;
|
||||||
while (true) {
|
while (true) {
|
||||||
|
restartQuorumStopWatch();
|
||||||
checkAssertionErrors();
|
checkAssertionErrors();
|
||||||
if (minResponses > 0 && countResponses() >= minResponses) return;
|
if (minResponses > 0 && countResponses() >= minResponses) return;
|
||||||
if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return;
|
if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return;
|
||||||
|
@ -139,11 +153,21 @@ class QuorumCall<KEY, RESULT> {
|
||||||
}
|
}
|
||||||
long rem = et - now;
|
long rem = et - now;
|
||||||
if (rem <= 0) {
|
if (rem <= 0) {
|
||||||
throw new TimeoutException();
|
// Increase timeout if a full GC occurred after restarting stopWatch
|
||||||
|
if (shouldIncreaseQuorumTimeout(0, millis)) {
|
||||||
|
et = et + millis;
|
||||||
|
} else {
|
||||||
|
throw new TimeoutException();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
restartQuorumStopWatch();
|
||||||
rem = Math.min(rem, nextLogTime - now);
|
rem = Math.min(rem, nextLogTime - now);
|
||||||
rem = Math.max(rem, 1);
|
rem = Math.max(rem, 1);
|
||||||
wait(rem);
|
wait(rem);
|
||||||
|
// Increase timeout if a full GC occurred after restarting stopWatch
|
||||||
|
if (shouldIncreaseQuorumTimeout(-rem, millis)) {
|
||||||
|
et = et + millis;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -66,4 +66,21 @@ public class TestQuorumCall {
|
||||||
// expected
|
// expected
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testQuorumFailsWithoutResponse() throws Exception {
|
||||||
|
Map<String, SettableFuture<String>> futures = ImmutableMap.of(
|
||||||
|
"f1", SettableFuture.<String>create());
|
||||||
|
|
||||||
|
QuorumCall<String, String> q = QuorumCall.create(futures);
|
||||||
|
assertEquals("The number of quorum calls for which a response has been"
|
||||||
|
+ " received should be 0", 0, q.countResponses());
|
||||||
|
|
||||||
|
try {
|
||||||
|
q.waitFor(0, 1, 100, 10, "test");
|
||||||
|
fail("Didn't time out waiting for more responses than came back");
|
||||||
|
} catch (TimeoutException te) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue