diff --git a/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt b/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt index 3207e70c384..216b5622108 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt @@ -5,4 +5,8 @@ branch is merged. ------------------------------ HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh) + HADOOP-7774. HA: Administrative CLI to control HA daemons. (todd) + +HADOOP-7896. HA: if both NNs are in Standby mode, client needs to try failing + back and forth several times with sleeps. (atm) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index f928760253f..d1655778251 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -24,11 +24,11 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Collections; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; +import org.apache.hadoop.util.ThreadUtil; class RetryInvocationHandler implements InvocationHandler, Closeable { public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class); @@ -85,31 +85,38 @@ class RetryInvocationHandler implements InvocationHandler, Closeable { .isAnnotationPresent(Idempotent.class); RetryAction action = policy.shouldRetry(e, retries++, invocationFailoverCount, isMethodIdempotent); - if (action == RetryAction.FAIL) { + if (action.action == RetryAction.RetryDecision.FAIL) { LOG.warn("Exception while invoking " + method.getName() + " of " + currentProxy.getClass() + ". Not retrying.", e); if (!method.getReturnType().equals(Void.TYPE)) { throw e; // non-void methods can't fail without an exception } return null; - } else if (action == RetryAction.FAILOVER_AND_RETRY) { - LOG.warn("Exception while invoking " + method.getName() - + " of " + currentProxy.getClass() - + " after " + invocationFailoverCount + " fail over attempts." - + " Trying to fail over.", e); - // Make sure that concurrent failed method invocations only cause a - // single actual fail over. - synchronized (proxyProvider) { - if (invocationAttemptFailoverCount == proxyProviderFailoverCount) { - proxyProvider.performFailover(currentProxy); - proxyProviderFailoverCount++; - currentProxy = proxyProvider.getProxy(); - } else { - LOG.warn("A failover has occurred since the start of this method" - + " invocation attempt."); - } + } else { // retry or failover + + if (action.delayMillis > 0) { + ThreadUtil.sleepAtLeastIgnoreInterrupts(action.delayMillis); + } + + if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) { + LOG.warn("Exception while invoking " + method.getName() + + " of " + currentProxy.getClass() + + " after " + invocationFailoverCount + " fail over attempts." + + " Trying to fail over.", e); + // Make sure that concurrent failed method invocations only cause a + // single actual fail over. + synchronized (proxyProvider) { + if (invocationAttemptFailoverCount == proxyProviderFailoverCount) { + proxyProvider.performFailover(currentProxy); + proxyProviderFailoverCount++; + currentProxy = proxyProvider.getProxy(); + } else { + LOG.warn("A failover has occurred since the start of this method" + + " invocation attempt."); + } + } + invocationFailoverCount++; } - invocationFailoverCount++; } if(LOG.isDebugEnabled()) { LOG.debug("Exception while invoking " + method.getName() diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java index 3634e18673a..5afda594755 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java @@ -33,6 +33,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; +import com.google.common.annotations.VisibleForTesting; + /** *
* A collection of useful implementations of {@link RetryPolicy}. @@ -42,6 +44,8 @@ public class RetryPolicies { public static final Log LOG = LogFactory.getLog(RetryPolicies.class); + private static final Random RAND = new Random(); + /** *
* Try once, and fail by re-throwing the exception.
@@ -137,7 +141,14 @@ public class RetryPolicies {
public static final RetryPolicy failoverOnNetworkException(
RetryPolicy fallbackPolicy, int maxFailovers) {
- return new FailoverOnNetworkExceptionRetry(fallbackPolicy, maxFailovers);
+ return failoverOnNetworkException(fallbackPolicy, maxFailovers, 0, 0);
+ }
+
+ public static final RetryPolicy failoverOnNetworkException(
+ RetryPolicy fallbackPolicy, int maxFailovers, long delayMillis,
+ long maxDelayBase) {
+ return new FailoverOnNetworkExceptionRetry(fallbackPolicy, maxFailovers,
+ delayMillis, maxDelayBase);
}
static class TryOnceThenFail implements RetryPolicy {
@@ -176,12 +187,8 @@ public class RetryPolicies {
if (retries >= maxRetries) {
throw e;
}
- try {
- timeUnit.sleep(calculateSleepTime(retries));
- } catch (InterruptedException ie) {
- // retry
- }
- return RetryAction.RETRY;
+ return new RetryAction(RetryAction.RetryDecision.RETRY,
+ timeUnit.toMillis(calculateSleepTime(retries)));
}
protected abstract long calculateSleepTime(int retries);
@@ -268,7 +275,7 @@ public class RetryPolicies {
}
static class ExponentialBackoffRetry extends RetryLimited {
- private Random r = new Random();
+
public ExponentialBackoffRetry(
int maxRetries, long sleepTime, TimeUnit timeUnit) {
super(maxRetries, sleepTime, timeUnit);
@@ -276,16 +283,19 @@ public class RetryPolicies {
@Override
protected long calculateSleepTime(int retries) {
- return sleepTime*r.nextInt(1<<(retries+1));
+ return calculateExponentialTime(sleepTime, retries + 1);
}
}
- /*
+ /**
* Fail over and retry in the case of:
* Remote StandbyException (server is up, but is not the active server)
* Immediate socket exceptions (e.g. no route to host, econnrefused)
* Socket exceptions after initial connection when operation is idempotent
*
+ * The first failover is immediate, while all subsequent failovers wait an
+ * exponentially-increasing random amount of time.
+ *
* Fail immediately in the case of:
* Socket exceptions after initial connection when operation is not idempotent
*
@@ -295,11 +305,20 @@ public class RetryPolicies {
private RetryPolicy fallbackPolicy;
private int maxFailovers;
+ private long delayMillis;
+ private long maxDelayBase;
public FailoverOnNetworkExceptionRetry(RetryPolicy fallbackPolicy,
int maxFailovers) {
+ this(fallbackPolicy, maxFailovers, 0, 0);
+ }
+
+ public FailoverOnNetworkExceptionRetry(RetryPolicy fallbackPolicy,
+ int maxFailovers, long delayMillis, long maxDelayBase) {
this.fallbackPolicy = fallbackPolicy;
this.maxFailovers = maxFailovers;
+ this.delayMillis = delayMillis;
+ this.maxDelayBase = maxDelayBase;
}
@Override
@@ -314,8 +333,13 @@ public class RetryPolicies {
if (e instanceof ConnectException ||
e instanceof NoRouteToHostException ||
e instanceof UnknownHostException ||
- e instanceof StandbyException) {
- return RetryAction.FAILOVER_AND_RETRY;
+ e instanceof StandbyException ||
+ isWrappedStandbyException(e)) {
+ return new RetryAction(
+ RetryAction.RetryDecision.FAILOVER_AND_RETRY,
+ // retry immediately if this is our first failover, sleep otherwise
+ failovers == 0 ? 0 :
+ calculateExponentialTime(delayMillis, failovers, maxDelayBase));
} else if (e instanceof SocketException ||
e instanceof IOException) {
if (isMethodIdempotent) {
@@ -330,4 +354,34 @@ public class RetryPolicies {
}
}
+
+ /**
+ * Return a value which is time
increasing exponentially as a
+ * function of retries
, +/- 0%-50% of that value, chosen
+ * randomly.
+ *
+ * @param time the base amount of time to work with
+ * @param retries the number of retries that have so occurred so far
+ * @param cap value at which to cap the base sleep time
+ * @return an amount of time to sleep
+ */
+ @VisibleForTesting
+ public static long calculateExponentialTime(long time, int retries,
+ long cap) {
+ long baseTime = Math.min(time * ((long)1 << retries), cap);
+ return (long) (baseTime * (RAND.nextFloat() + 0.5));
+ }
+
+ private static long calculateExponentialTime(long time, int retries) {
+ return calculateExponentialTime(time, retries, Long.MAX_VALUE);
+ }
+
+ private static boolean isWrappedStandbyException(Exception e) {
+ if (!(e instanceof RemoteException)) {
+ return false;
+ }
+ Exception unwrapped = ((RemoteException)e).unwrapRemoteException(
+ StandbyException.class);
+ return unwrapped instanceof StandbyException;
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java
index 4c4534ffb7e..90e5eaea671 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.io.retry;
import org.apache.hadoop.classification.InterfaceStability;
-
/**
*
* Specifies a policy for retrying method failures. @@ -33,10 +32,33 @@ public interface RetryPolicy { * Returned by {@link RetryPolicy#shouldRetry(Exception, int, int, boolean)}. */ @InterfaceStability.Evolving - public enum RetryAction { - FAIL, - RETRY, - FAILOVER_AND_RETRY + public static class RetryAction { + + // A few common retry policies, with no delays. + public static final RetryAction FAIL = + new RetryAction(RetryDecision.FAIL); + public static final RetryAction RETRY = + new RetryAction(RetryDecision.RETRY); + public static final RetryAction FAILOVER_AND_RETRY = + new RetryAction(RetryDecision.FAILOVER_AND_RETRY); + + public final RetryDecision action; + public final long delayMillis; + + public RetryAction(RetryDecision action) { + this(action, 0); + } + + public RetryAction(RetryDecision action, long delayTime) { + this.action = action; + this.delayMillis = delayTime; + } + + public enum RetryDecision { + FAIL, + RETRY, + FAILOVER_AND_RETRY + } } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java new file mode 100644 index 00000000000..535ac341223 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceStability.Evolving +public class ThreadUtil { + + private static final Log LOG = LogFactory.getLog(ThreadUtil.class); + + /** + * Cause the current thread to sleep as close as possible to the provided + * number of milliseconds. This method will log and ignore any + * {@link InterrupedException} encountered. + * + * @param millis the number of milliseconds for the current thread to sleep + */ + public static void sleepAtLeastIgnoreInterrupts(long millis) { + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < millis) { + long timeToSleep = millis - + (System.currentTimeMillis() - start); + try { + Thread.sleep(timeToSleep); + } catch (InterruptedException ie) { + LOG.warn("interrupted while sleeping", ie); + } + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java index eec4797ab30..b52814cfc11 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java @@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLatch; import org.apache.hadoop.io.retry.UnreliableImplementation.TypeOfExceptionToFailWith; import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException; import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.util.ThreadUtil; import org.junit.Test; public class TestFailoverProxy { @@ -267,4 +268,40 @@ public class TestFailoverProxy { assertEquals("impl2", t2.result); assertEquals(1, proxyProvider.getFailoversOccurred()); } + + /** + * Ensure that when all configured services are throwing StandbyException + * that we fail over back and forth between them until one is no longer + * throwing StandbyException. + */ + @Test + public void testFailoverBetweenMultipleStandbys() + throws UnreliableException, StandbyException, IOException { + + final long millisToSleep = 10000; + + final UnreliableImplementation impl1 = new UnreliableImplementation("impl1", + TypeOfExceptionToFailWith.STANDBY_EXCEPTION); + FlipFlopProxyProvider proxyProvider = new FlipFlopProxyProvider( + UnreliableInterface.class, + impl1, + new UnreliableImplementation("impl2", + TypeOfExceptionToFailWith.STANDBY_EXCEPTION)); + + final UnreliableInterface unreliable = (UnreliableInterface)RetryProxy + .create(UnreliableInterface.class, proxyProvider, + RetryPolicies.failoverOnNetworkException( + RetryPolicies.TRY_ONCE_THEN_FAIL, 10, 1000, 10000)); + + new Thread() { + @Override + public void run() { + ThreadUtil.sleepAtLeastIgnoreInterrupts(millisToSleep); + impl1.setIdentifier("renamed-impl1"); + } + }.start(); + + String result = unreliable.failsIfIdentifierDoesntMatch("renamed-impl1"); + assertEquals("renamed-impl1", result); + } } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java index 7fa88b3b08e..74a63894d80 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java @@ -48,6 +48,10 @@ public class UnreliableImplementation implements UnreliableInterface { this(identifier, TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION); } + public void setIdentifier(String identifier) { + this.identifier = identifier; + } + public UnreliableImplementation(String identifier, TypeOfExceptionToFailWith exceptionToFailWith) { this.identifier = identifier; @@ -147,15 +151,17 @@ public class UnreliableImplementation implements UnreliableInterface { if (this.identifier.equals(identifier)) { return identifier; } else { + String message = "expected '" + this.identifier + "' but received '" + + identifier + "'"; switch (exceptionToFailWith) { case STANDBY_EXCEPTION: - throw new StandbyException(identifier); + throw new StandbyException(message); case UNRELIABLE_EXCEPTION: - throw new UnreliableException(identifier); + throw new UnreliableException(message); case IO_EXCEPTION: - throw new IOException(identifier); + throw new IOException(message); default: - throw new RuntimeException(identifier); + throw new RuntimeException(message); } } }