diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java index dc3231827e2..dee74e6fcfd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StopWatch; -import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.Timer; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -35,6 +35,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.Message; import com.google.protobuf.TextFormat; + /** * Represents a set of calls for which a quorum of results is needed. * @param a key used to identify each of the outgoing calls @@ -60,11 +61,12 @@ class QuorumCall { * fraction of the configured timeout for any call. */ private static final float WAIT_PROGRESS_WARN_THRESHOLD = 0.7f; - private final StopWatch quorumStopWatch = new StopWatch(); + private final StopWatch quorumStopWatch; + private final Timer timer; static QuorumCall create( - Map> calls) { - final QuorumCall qr = new QuorumCall(); + Map> calls, Timer timer) { + final QuorumCall qr = new QuorumCall(timer); for (final Entry> e : calls.entrySet()) { Preconditions.checkArgument(e.getValue() != null, "null future for key: " + e.getKey()); @@ -82,18 +84,53 @@ class QuorumCall { } return qr; } - - private QuorumCall() { - // Only instantiated from factory method above + + static QuorumCall create( + Map> calls) { + return create(calls, new Timer()); } + /** + * Not intended for outside use. + */ + private QuorumCall() { + this(new Timer()); + } + + private QuorumCall(Timer timer) { + // Only instantiated from factory method above + this.timer = timer; + this.quorumStopWatch = new StopWatch(timer); + } + + /** + * Used in conjunction with {@link #getQuorumTimeoutIncreaseMillis(long, int)} + * to check for pauses. + */ private void restartQuorumStopWatch() { quorumStopWatch.reset().start(); } - private boolean shouldIncreaseQuorumTimeout(long offset, int millis) { + /** + * Check for a pause (e.g. GC) since the last time + * {@link #restartQuorumStopWatch()} was called. If detected, return the + * length of the pause; else, -1. + * @param offset Offset the elapsed time by this amount; use if some amount + * of pause was expected + * @param millis Total length of timeout in milliseconds + * @return Length of pause, if detected, else -1 + */ + private long getQuorumTimeoutIncreaseMillis(long offset, int millis) { long elapsed = quorumStopWatch.now(TimeUnit.MILLISECONDS); - return elapsed + offset > (millis * WAIT_PROGRESS_INFO_THRESHOLD); + long pauseTime = elapsed + offset; + if (pauseTime > (millis * WAIT_PROGRESS_INFO_THRESHOLD)) { + QuorumJournalManager.LOG.info("Pause detected while waiting for " + + "QuorumCall response; increasing timeout threshold by pause time " + + "of " + pauseTime + " ms."); + return pauseTime; + } else { + return -1; + } } @@ -119,7 +156,7 @@ class QuorumCall { int minResponses, int minSuccesses, int maxExceptions, int millis, String operationName) throws InterruptedException, TimeoutException { - long st = Time.monotonicNow(); + long st = timer.monotonicNow(); long nextLogTime = st + (long)(millis * WAIT_PROGRESS_INFO_THRESHOLD); long et = st + millis; while (true) { @@ -128,7 +165,7 @@ class QuorumCall { if (minResponses > 0 && countResponses() >= minResponses) return; if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return; if (maxExceptions >= 0 && countExceptions() > maxExceptions) return; - long now = Time.monotonicNow(); + long now = timer.monotonicNow(); if (now > nextLogTime) { long waited = now - st; @@ -154,8 +191,9 @@ class QuorumCall { long rem = et - now; if (rem <= 0) { // Increase timeout if a full GC occurred after restarting stopWatch - if (shouldIncreaseQuorumTimeout(0, millis)) { - et = et + millis; + long timeoutIncrease = getQuorumTimeoutIncreaseMillis(0, millis); + if (timeoutIncrease > 0) { + et += timeoutIncrease; } else { throw new TimeoutException(); } @@ -165,8 +203,9 @@ class QuorumCall { rem = Math.max(rem, 1); wait(rem); // Increase timeout if a full GC occurred after restarting stopWatch - if (shouldIncreaseQuorumTimeout(-rem, millis)) { - et = et + millis; + long timeoutIncrease = getQuorumTimeoutIncreaseMillis(-rem, millis); + if (timeoutIncrease > 0) { + et += timeoutIncrease; } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java index 506497e6ae4..97cf2f3c068 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java @@ -23,7 +23,7 @@ import java.util.Map; import java.util.TreeMap; import java.util.concurrent.TimeoutException; -import org.apache.hadoop.hdfs.qjournal.client.QuorumCall; +import org.apache.hadoop.util.FakeTimer; import org.junit.Test; import com.google.common.base.Joiner; @@ -83,4 +83,33 @@ public class TestQuorumCall { } } + @Test(timeout=10000) + public void testQuorumSucceedsWithLongPause() throws Exception { + final Map> futures = ImmutableMap.of( + "f1", SettableFuture.create()); + + FakeTimer timer = new FakeTimer() { + private int callCount = 0; + @Override + public long monotonicNowNanos() { + callCount++; + if (callCount == 1) { + long old = super.monotonicNowNanos(); + advance(1000000); + return old; + } else if (callCount == 10) { + futures.get("f1").set("first future"); + return super.monotonicNowNanos(); + } else { + return super.monotonicNowNanos(); + } + } + }; + + QuorumCall q = QuorumCall.create(futures, timer); + assertEquals(0, q.countResponses()); + + q.waitFor(1, 0, 0, 3000, "test"); // wait for 1 response + } + }