From 637cdaefc294814febb27cbef2f35026053114c7 Mon Sep 17 00:00:00 2001
From: Eli Collins
Date: Thu, 7 Jul 2011 21:16:05 +0000
Subject: [PATCH] HADOOP-7380. Add client failover functionality to
o.a.h.io.(ipc|retry). Contributed by Aaron T. Myers
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1144043 13f79535-47bb-0310-9956-ffa450edef68
---
common/CHANGES.txt | 3 +
.../retry/DefaultFailoverProxyProvider.java | 52 +++++
.../io/retry/FailoverProxyProvider.java | 60 ++++++
.../apache/hadoop/io/retry/Idempotent.java | 35 ++++
.../io/retry/RetryInvocationHandler.java | 42 ++--
.../apache/hadoop/io/retry/RetryPolicies.java | 99 ++++++++--
.../apache/hadoop/io/retry/RetryPolicy.java | 31 ++-
.../apache/hadoop/io/retry/RetryProxy.java | 58 ++++--
.../apache/hadoop/ipc/StandbyException.java | 32 +++
.../hadoop/io/retry/TestFailoverProxy.java | 184 ++++++++++++++++++
.../io/retry/UnreliableImplementation.java | 88 ++++++++-
.../hadoop/io/retry/UnreliableInterface.java | 26 ++-
12 files changed, 665 insertions(+), 45 deletions(-)
create mode 100644 common/src/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java
create mode 100644 common/src/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java
create mode 100644 common/src/java/org/apache/hadoop/io/retry/Idempotent.java
create mode 100644 common/src/java/org/apache/hadoop/ipc/StandbyException.java
create mode 100644 common/src/test/core/org/apache/hadoop/io/retry/TestFailoverProxy.java
diff --git a/common/CHANGES.txt b/common/CHANGES.txt
index c789050c161..8dcd295c2c4 100644
--- a/common/CHANGES.txt
+++ b/common/CHANGES.txt
@@ -53,6 +53,9 @@ Trunk (unreleased changes)
HADOOP-7329. Add the capability of getting invividual attribute of a mbean
using JMXProxyServlet. (tanping)
+ HADOOP-7380. Add client failover functionality to o.a.h.io.(ipc|retry).
+ (atm via eli)
+
IMPROVEMENTS
HADOOP-7042. Updates to test-patch.sh to include failed test names and
diff --git a/common/src/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java b/common/src/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java
new file mode 100644
index 00000000000..0ea294da0f8
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * An implementation of {@link FailoverProxyProvider} which does nothing in the
+ * event of failover, and always returns the same proxy object.
+ */
+@InterfaceStability.Evolving
+public class DefaultFailoverProxyProvider implements FailoverProxyProvider {
+
+ private Object proxy;
+ private Class> iface;
+
+ public DefaultFailoverProxyProvider(Class> iface, Object proxy) {
+ this.proxy = proxy;
+ this.iface = iface;
+ }
+
+ @Override
+ public Class> getInterface() {
+ return iface;
+ }
+
+ @Override
+ public Object getProxy() {
+ return proxy;
+ }
+
+ @Override
+ public void performFailover(Object currentProxy) {
+ // Nothing to do.
+ }
+
+}
diff --git a/common/src/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java b/common/src/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java
new file mode 100644
index 00000000000..cb211c2efae
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * An implementer of this interface is capable of providing proxy objects for
+ * use in IPC communication, and potentially modifying these objects or creating
+ * entirely new ones in the event of certain types of failures. The
+ * determination of whether or not to fail over is handled by
+ * {@link RetryPolicy}.
+ */
+@InterfaceStability.Evolving
+public interface FailoverProxyProvider {
+
+ /**
+ * Get the proxy object which should be used until the next failover event
+ * occurs.
+ *
+ * @return the proxy object to invoke methods upon
+ */
+ public Object getProxy();
+
+ /**
+ * Called whenever the associated {@link RetryPolicy} determines that an error
+ * warrants failing over.
+ *
+ * @param currentProxy the proxy object which was being used before this
+ * failover event
+ */
+ public void performFailover(Object currentProxy);
+
+ /**
+ * Return a reference to the interface this provider's proxy objects actually
+ * implement. If any of the methods on this interface are annotated as being
+ * {@link Idempotent}, then this fact will be passed to the
+ * {@link RetryPolicy#shouldRetry(Exception, int, int, boolean)} method on
+ * error, for use in determining whether or not failover should be attempted.
+ *
+ * @return the interface implemented by the proxy objects returned by
+ * {@link FailoverProxyProvider#getProxy()}
+ */
+ public Class> getInterface();
+}
\ No newline at end of file
diff --git a/common/src/java/org/apache/hadoop/io/retry/Idempotent.java b/common/src/java/org/apache/hadoop/io/retry/Idempotent.java
new file mode 100644
index 00000000000..073752d72dc
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/io/retry/Idempotent.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Used to mark certain methods of an interface as being idempotent, and
+ * therefore warrant being retried on failover.
+ */
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+@InterfaceStability.Evolving
+public @interface Idempotent {}
diff --git a/common/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/common/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
index 5f1b0902c8c..70149e357c9 100644
--- a/common/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
+++ b/common/src/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
@@ -25,25 +25,30 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
class RetryInvocationHandler implements InvocationHandler {
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
- private Object implementation;
+ private FailoverProxyProvider proxyProvider;
private RetryPolicy defaultPolicy;
private Map methodNameToPolicyMap;
+ private Object currentProxy;
- public RetryInvocationHandler(Object implementation, RetryPolicy retryPolicy) {
- this.implementation = implementation;
+ public RetryInvocationHandler(FailoverProxyProvider proxyProvider,
+ RetryPolicy retryPolicy) {
+ this.proxyProvider = proxyProvider;
this.defaultPolicy = retryPolicy;
this.methodNameToPolicyMap = Collections.emptyMap();
+ this.currentProxy = proxyProvider.getProxy();
}
- public RetryInvocationHandler(Object implementation, Map methodNameToPolicyMap) {
- this.implementation = implementation;
+ public RetryInvocationHandler(FailoverProxyProvider proxyProvider,
+ Map methodNameToPolicyMap) {
+ this.proxyProvider = proxyProvider;
this.defaultPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
this.methodNameToPolicyMap = methodNameToPolicyMap;
+ this.currentProxy = proxyProvider.getProxy();
}
public Object invoke(Object proxy, Method method, Object[] args)
@@ -53,24 +58,35 @@ class RetryInvocationHandler implements InvocationHandler {
policy = defaultPolicy;
}
+ int failovers = 0;
int retries = 0;
while (true) {
try {
return invokeMethod(method, args);
} catch (Exception e) {
- if (!policy.shouldRetry(e, retries++)) {
- LOG.info("Exception while invoking " + method.getName()
- + " of " + implementation.getClass() + ". Not retrying."
- , e);
+ boolean isMethodIdempotent = proxyProvider.getInterface()
+ .getMethod(method.getName(), method.getParameterTypes())
+ .isAnnotationPresent(Idempotent.class);
+ RetryAction action = policy.shouldRetry(e, retries++, failovers,
+ isMethodIdempotent);
+ if (action == RetryAction.FAIL) {
+ LOG.warn("Exception while invoking " + method.getName()
+ + " of " + currentProxy.getClass() + ". Not retrying.", e);
if (!method.getReturnType().equals(Void.TYPE)) {
throw e; // non-void methods can't fail without an exception
}
return null;
+ } else if (action == RetryAction.FAILOVER_AND_RETRY) {
+ LOG.warn("Exception while invoking " + method.getName()
+ + " of " + currentProxy.getClass()
+ + ". Trying to fail over.", e);
+ failovers++;
+ proxyProvider.performFailover(currentProxy);
+ currentProxy = proxyProvider.getProxy();
}
if(LOG.isDebugEnabled()) {
LOG.debug("Exception while invoking " + method.getName()
- + " of " + implementation.getClass() + ". Retrying."
- , e);
+ + " of " + currentProxy.getClass() + ". Retrying.", e);
}
}
}
@@ -81,7 +97,7 @@ class RetryInvocationHandler implements InvocationHandler {
if (!method.isAccessible()) {
method.setAccessible(true);
}
- return method.invoke(implementation, args);
+ return method.invoke(currentProxy, args);
} catch (InvocationTargetException e) {
throw e.getCause();
}
diff --git a/common/src/java/org/apache/hadoop/io/retry/RetryPolicies.java b/common/src/java/org/apache/hadoop/io/retry/RetryPolicies.java
index 30a78885da8..3634e18673a 100644
--- a/common/src/java/org/apache/hadoop/io/retry/RetryPolicies.java
+++ b/common/src/java/org/apache/hadoop/io/retry/RetryPolicies.java
@@ -17,14 +17,21 @@
*/
package org.apache.hadoop.io.retry;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.NoRouteToHostException;
+import java.net.SocketException;
+import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
-import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
/**
*
@@ -33,6 +40,8 @@ import org.apache.hadoop.ipc.RemoteException;
*/
public class RetryPolicies {
+ public static final Log LOG = LogFactory.getLog(RetryPolicies.class);
+
/**
*
* Try once, and fail by re-throwing the exception.
@@ -122,20 +131,32 @@ public class RetryPolicies {
return new RemoteExceptionDependentRetry(defaultPolicy, exceptionToPolicyMap);
}
+ public static final RetryPolicy failoverOnNetworkException(int maxFailovers) {
+ return failoverOnNetworkException(TRY_ONCE_THEN_FAIL, maxFailovers);
+ }
+
+ public static final RetryPolicy failoverOnNetworkException(
+ RetryPolicy fallbackPolicy, int maxFailovers) {
+ return new FailoverOnNetworkExceptionRetry(fallbackPolicy, maxFailovers);
+ }
+
static class TryOnceThenFail implements RetryPolicy {
- public boolean shouldRetry(Exception e, int retries) throws Exception {
+ public RetryAction shouldRetry(Exception e, int retries, int failovers,
+ boolean isMethodIdempotent) throws Exception {
throw e;
}
}
static class TryOnceDontFail implements RetryPolicy {
- public boolean shouldRetry(Exception e, int retries) throws Exception {
- return false;
+ public RetryAction shouldRetry(Exception e, int retries, int failovers,
+ boolean isMethodIdempotent) throws Exception {
+ return RetryAction.FAIL;
}
}
static class RetryForever implements RetryPolicy {
- public boolean shouldRetry(Exception e, int retries) throws Exception {
- return true;
+ public RetryAction shouldRetry(Exception e, int retries, int failovers,
+ boolean isMethodIdempotent) throws Exception {
+ return RetryAction.RETRY;
}
}
@@ -150,7 +171,8 @@ public class RetryPolicies {
this.timeUnit = timeUnit;
}
- public boolean shouldRetry(Exception e, int retries) throws Exception {
+ public RetryAction shouldRetry(Exception e, int retries, int failovers,
+ boolean isMethodIdempotent) throws Exception {
if (retries >= maxRetries) {
throw e;
}
@@ -159,7 +181,7 @@ public class RetryPolicies {
} catch (InterruptedException ie) {
// retry
}
- return true;
+ return RetryAction.RETRY;
}
protected abstract long calculateSleepTime(int retries);
@@ -204,12 +226,13 @@ public class RetryPolicies {
this.exceptionToPolicyMap = exceptionToPolicyMap;
}
- public boolean shouldRetry(Exception e, int retries) throws Exception {
+ public RetryAction shouldRetry(Exception e, int retries, int failovers,
+ boolean isMethodIdempotent) throws Exception {
RetryPolicy policy = exceptionToPolicyMap.get(e.getClass());
if (policy == null) {
policy = defaultPolicy;
}
- return policy.shouldRetry(e, retries);
+ return policy.shouldRetry(e, retries, failovers, isMethodIdempotent);
}
}
@@ -230,7 +253,8 @@ public class RetryPolicies {
}
}
- public boolean shouldRetry(Exception e, int retries) throws Exception {
+ public RetryAction shouldRetry(Exception e, int retries, int failovers,
+ boolean isMethodIdempotent) throws Exception {
RetryPolicy policy = null;
if (e instanceof RemoteException) {
policy = exceptionNameToPolicyMap.get(
@@ -239,7 +263,7 @@ public class RetryPolicies {
if (policy == null) {
policy = defaultPolicy;
}
- return policy.shouldRetry(e, retries);
+ return policy.shouldRetry(e, retries, failovers, isMethodIdempotent);
}
}
@@ -255,4 +279,55 @@ public class RetryPolicies {
return sleepTime*r.nextInt(1<<(retries+1));
}
}
+
+ /*
+ * Fail over and retry in the case of:
+ * Remote StandbyException (server is up, but is not the active server)
+ * Immediate socket exceptions (e.g. no route to host, econnrefused)
+ * Socket exceptions after initial connection when operation is idempotent
+ *
+ * Fail immediately in the case of:
+ * Socket exceptions after initial connection when operation is not idempotent
+ *
+ * Fall back on underlying retry policy otherwise.
+ */
+ static class FailoverOnNetworkExceptionRetry implements RetryPolicy {
+
+ private RetryPolicy fallbackPolicy;
+ private int maxFailovers;
+
+ public FailoverOnNetworkExceptionRetry(RetryPolicy fallbackPolicy,
+ int maxFailovers) {
+ this.fallbackPolicy = fallbackPolicy;
+ this.maxFailovers = maxFailovers;
+ }
+
+ @Override
+ public RetryAction shouldRetry(Exception e, int retries,
+ int failovers, boolean isMethodIdempotent) throws Exception {
+ if (failovers >= maxFailovers) {
+ LOG.info("Failovers (" + failovers + ") exceeded maximum allowed ("
+ + maxFailovers + ")");
+ return RetryAction.FAIL;
+ }
+
+ if (e instanceof ConnectException ||
+ e instanceof NoRouteToHostException ||
+ e instanceof UnknownHostException ||
+ e instanceof StandbyException) {
+ return RetryAction.FAILOVER_AND_RETRY;
+ } else if (e instanceof SocketException ||
+ e instanceof IOException) {
+ if (isMethodIdempotent) {
+ return RetryAction.FAILOVER_AND_RETRY;
+ } else {
+ return RetryAction.FAIL;
+ }
+ } else {
+ return fallbackPolicy.shouldRetry(e, retries, failovers,
+ isMethodIdempotent);
+ }
+ }
+
+ }
}
diff --git a/common/src/java/org/apache/hadoop/io/retry/RetryPolicy.java b/common/src/java/org/apache/hadoop/io/retry/RetryPolicy.java
index 26d3267bc2a..4c4534ffb7e 100644
--- a/common/src/java/org/apache/hadoop/io/retry/RetryPolicy.java
+++ b/common/src/java/org/apache/hadoop/io/retry/RetryPolicy.java
@@ -17,13 +17,28 @@
*/
package org.apache.hadoop.io.retry;
+import org.apache.hadoop.classification.InterfaceStability;
+
+
/**
*
* Specifies a policy for retrying method failures.
* Implementations of this interface should be immutable.
*
*/
+@InterfaceStability.Evolving
public interface RetryPolicy {
+
+ /**
+ * Returned by {@link RetryPolicy#shouldRetry(Exception, int, int, boolean)}.
+ */
+ @InterfaceStability.Evolving
+ public enum RetryAction {
+ FAIL,
+ RETRY,
+ FAILOVER_AND_RETRY
+ }
+
/**
*
* Determines whether the framework should retry a
@@ -31,13 +46,19 @@ public interface RetryPolicy {
* of retries that have been made for that operation
* so far.
*
- * @param e The exception that caused the method to fail.
- * @param retries The number of times the method has been retried.
+ * @param e The exception that caused the method to fail
+ * @param retries The number of times the method has been retried
+ * @param failovers The number of times the method has failed over to a
+ * different backend implementation
+ * @param isMethodIdempotent true
if the method is idempotent
+ * and so can reasonably be retried on failover when we don't know if the
+ * previous attempt reached the server or not
* @return true
if the method should be retried,
* false
if the method should not be retried
- * but shouldn't fail with an exception (only for void methods).
+ * but shouldn't fail with an exception (only for void methods)
* @throws Exception The re-thrown exception e
indicating
- * that the method failed and should not be retried further.
+ * that the method failed and should not be retried further
*/
- public boolean shouldRetry(Exception e, int retries) throws Exception;
+ public RetryAction shouldRetry(Exception e, int retries, int failovers,
+ boolean isMethodIdempotent) throws Exception;
}
diff --git a/common/src/java/org/apache/hadoop/io/retry/RetryProxy.java b/common/src/java/org/apache/hadoop/io/retry/RetryProxy.java
index 937f832213c..13e8a41eba3 100644
--- a/common/src/java/org/apache/hadoop/io/retry/RetryProxy.java
+++ b/common/src/java/org/apache/hadoop/io/retry/RetryProxy.java
@@ -33,25 +33,41 @@ public class RetryProxy {
*
* @param iface the interface that the retry will implement
* @param implementation the instance whose methods should be retried
- * @param retryPolicy the policy for retirying method call failures
+ * @param retryPolicy the policy for retrying method call failures
* @return the retry proxy
*/
public static Object create(Class> iface, Object implementation,
RetryPolicy retryPolicy) {
+ return RetryProxy.create(iface,
+ new DefaultFailoverProxyProvider(iface, implementation),
+ retryPolicy);
+ }
+
+ /**
+ * Create a proxy for an interface of implementations of that interface using
+ * the given {@link FailoverProxyProvider} and the same retry policy for each
+ * method in the interface.
+ *
+ * @param iface the interface that the retry will implement
+ * @param proxyProvider provides implementation instances whose methods should be retried
+ * @param retryPolicy the policy for retrying or failing over method call failures
+ * @return the retry proxy
+ */
+ public static Object create(Class> iface, FailoverProxyProvider proxyProvider,
+ RetryPolicy retryPolicy) {
return Proxy.newProxyInstance(
- implementation.getClass().getClassLoader(),
- new Class>[] { iface },
- new RetryInvocationHandler(implementation, retryPolicy)
- );
- }
+ proxyProvider.getInterface().getClassLoader(),
+ new Class>[] { iface },
+ new RetryInvocationHandler(proxyProvider, retryPolicy)
+ );
+ }
/**
- *
* Create a proxy for an interface of an implementation class
* using the a set of retry policies specified by method name.
* If no retry policy is defined for a method then a default of
* {@link RetryPolicies#TRY_ONCE_THEN_FAIL} is used.
- *
+ *
* @param iface the interface that the retry will implement
* @param implementation the instance whose methods should be retried
* @param methodNameToPolicyMap a map of method names to retry policies
@@ -59,10 +75,28 @@ public class RetryProxy {
*/
public static Object create(Class> iface, Object implementation,
Map methodNameToPolicyMap) {
+ return RetryProxy.create(iface,
+ new DefaultFailoverProxyProvider(iface, implementation),
+ methodNameToPolicyMap);
+ }
+
+ /**
+ * Create a proxy for an interface of implementations of that interface using
+ * the given {@link FailoverProxyProvider} and the a set of retry policies
+ * specified by method name. If no retry policy is defined for a method then a
+ * default of {@link RetryPolicies#TRY_ONCE_THEN_FAIL} is used.
+ *
+ * @param iface the interface that the retry will implement
+ * @param proxyProvider provides implementation instances whose methods should be retried
+ * @param methodNameToPolicyMapa map of method names to retry policies
+ * @return the retry proxy
+ */
+ public static Object create(Class> iface, FailoverProxyProvider proxyProvider,
+ Map methodNameToPolicyMap) {
return Proxy.newProxyInstance(
- implementation.getClass().getClassLoader(),
- new Class>[] { iface },
- new RetryInvocationHandler(implementation, methodNameToPolicyMap)
- );
+ proxyProvider.getInterface().getClassLoader(),
+ new Class>[] { iface },
+ new RetryInvocationHandler(proxyProvider, methodNameToPolicyMap)
+ );
}
}
diff --git a/common/src/java/org/apache/hadoop/ipc/StandbyException.java b/common/src/java/org/apache/hadoop/ipc/StandbyException.java
new file mode 100644
index 00000000000..49f4fadfd55
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/ipc/StandbyException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Thrown by a remote server when it is up, but is not the active server in a
+ * set of servers in which only a subset may be active.
+ */
+@InterfaceStability.Evolving
+public class StandbyException extends Exception {
+ static final long serialVersionUID = 0x12308AD010L;
+ public StandbyException(String msg) {
+ super(msg);
+ }
+}
diff --git a/common/src/test/core/org/apache/hadoop/io/retry/TestFailoverProxy.java b/common/src/test/core/org/apache/hadoop/io/retry/TestFailoverProxy.java
new file mode 100644
index 00000000000..4664ab342d1
--- /dev/null
+++ b/common/src/test/core/org/apache/hadoop/io/retry/TestFailoverProxy.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.retry;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.retry.UnreliableImplementation.TypeOfExceptionToFailWith;
+import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
+import org.apache.hadoop.ipc.StandbyException;
+import org.junit.Test;
+
+public class TestFailoverProxy {
+
+ public static class FlipFlopProxyProvider implements FailoverProxyProvider {
+
+ private Class> iface;
+ private Object currentlyActive;
+ private Object impl1;
+ private Object impl2;
+
+ public FlipFlopProxyProvider(Class> iface, Object activeImpl,
+ Object standbyImpl) {
+ this.iface = iface;
+ this.impl1 = activeImpl;
+ this.impl2 = standbyImpl;
+ currentlyActive = impl1;
+ }
+
+ @Override
+ public Object getProxy() {
+ return currentlyActive;
+ }
+
+ @Override
+ public void performFailover(Object currentProxy) {
+ currentlyActive = impl1 == currentProxy ? impl2 : impl1;
+ }
+
+ @Override
+ public Class> getInterface() {
+ return iface;
+ }
+
+ }
+
+ public static class FailOverOnceOnAnyExceptionPolicy implements RetryPolicy {
+
+ @Override
+ public RetryAction shouldRetry(Exception e, int retries, int failovers,
+ boolean isMethodIdempotent) {
+ return failovers < 1 ? RetryAction.FAILOVER_AND_RETRY : RetryAction.FAIL;
+ }
+
+ }
+
+ @Test
+ public void testSuccedsOnceThenFailOver() throws UnreliableException,
+ IOException, StandbyException {
+ UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
+ .create(UnreliableInterface.class,
+ new FlipFlopProxyProvider(UnreliableInterface.class,
+ new UnreliableImplementation("impl1"),
+ new UnreliableImplementation("impl2")),
+ new FailOverOnceOnAnyExceptionPolicy());
+
+ assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
+ assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningString());
+ try {
+ unreliable.succeedsOnceThenFailsReturningString();
+ fail("should not have succeeded more than twice");
+ } catch (UnreliableException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testSucceedsTenTimesThenFailOver() throws UnreliableException,
+ IOException, StandbyException {
+ UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
+ .create(UnreliableInterface.class,
+ new FlipFlopProxyProvider(UnreliableInterface.class,
+ new UnreliableImplementation("impl1"),
+ new UnreliableImplementation("impl2")),
+ new FailOverOnceOnAnyExceptionPolicy());
+
+ for (int i = 0; i < 10; i++) {
+ assertEquals("impl1", unreliable.succeedsTenTimesThenFailsReturningString());
+ }
+ assertEquals("impl2", unreliable.succeedsTenTimesThenFailsReturningString());
+ }
+
+ @Test
+ public void testNeverFailOver() throws UnreliableException,
+ IOException, StandbyException {
+ UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
+ .create(UnreliableInterface.class,
+ new FlipFlopProxyProvider(UnreliableInterface.class,
+ new UnreliableImplementation("impl1"),
+ new UnreliableImplementation("impl2")),
+ RetryPolicies.TRY_ONCE_DONT_FAIL);
+
+ unreliable.succeedsOnceThenFailsReturningString();
+ try {
+ unreliable.succeedsOnceThenFailsReturningString();
+ fail("should not have succeeded twice");
+ } catch (UnreliableException e) {
+ assertEquals("impl1", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testFailoverOnStandbyException()
+ throws UnreliableException, IOException, StandbyException {
+ UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
+ .create(UnreliableInterface.class,
+ new FlipFlopProxyProvider(UnreliableInterface.class,
+ new UnreliableImplementation("impl1"),
+ new UnreliableImplementation("impl2")),
+ RetryPolicies.failoverOnNetworkException(1));
+
+ assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
+ try {
+ unreliable.succeedsOnceThenFailsReturningString();
+ fail("should not have succeeded twice");
+ } catch (UnreliableException e) {
+ // Make sure there was no failover on normal exception.
+ assertEquals("impl1", e.getMessage());
+ }
+
+ unreliable = (UnreliableInterface)RetryProxy
+ .create(UnreliableInterface.class,
+ new FlipFlopProxyProvider(UnreliableInterface.class,
+ new UnreliableImplementation("impl1", TypeOfExceptionToFailWith.STANDBY_EXCEPTION),
+ new UnreliableImplementation("impl2", TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION)),
+ RetryPolicies.failoverOnNetworkException(1));
+
+ assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
+ // Make sure we fail over since the first implementation threw a StandbyException
+ assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningString());
+ }
+
+ @Test
+ public void testFailoverOnNetworkExceptionIdempotentOperation()
+ throws UnreliableException, IOException, StandbyException {
+ UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
+ .create(UnreliableInterface.class,
+ new FlipFlopProxyProvider(UnreliableInterface.class,
+ new UnreliableImplementation("impl1", TypeOfExceptionToFailWith.IO_EXCEPTION),
+ new UnreliableImplementation("impl2", TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION)),
+ RetryPolicies.failoverOnNetworkException(1));
+
+ assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
+ try {
+ assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningString());
+ fail("should not have succeeded twice");
+ } catch (IOException e) {
+ // Make sure we *don't* fail over since the first implementation threw an
+ // IOException and this method is not idempotent
+ assertEquals("impl1", e.getMessage());
+ }
+
+ assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningStringIdempotent());
+ // Make sure we fail over since the first implementation threw an
+ // IOException and this method is idempotent.
+ assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningStringIdempotent());
+ }
+}
diff --git a/common/src/test/core/org/apache/hadoop/io/retry/UnreliableImplementation.java b/common/src/test/core/org/apache/hadoop/io/retry/UnreliableImplementation.java
index 5971ee72165..10dc6b38309 100644
--- a/common/src/test/core/org/apache/hadoop/io/retry/UnreliableImplementation.java
+++ b/common/src/test/core/org/apache/hadoop/io/retry/UnreliableImplementation.java
@@ -15,16 +15,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.io.retry;
+import java.io.IOException;
+
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
public class UnreliableImplementation implements UnreliableInterface {
private int failsOnceInvocationCount,
failsOnceWithValueInvocationCount,
- failsTenTimesInvocationCount;
+ failsTenTimesInvocationCount,
+ succeedsOnceThenFailsCount,
+ succeedsOnceThenFailsIdempotentCount,
+ succeedsTenTimesThenFailsCount;
+
+ private String identifier;
+ private TypeOfExceptionToFailWith exceptionToFailWith;
+
+ public static enum TypeOfExceptionToFailWith {
+ UNRELIABLE_EXCEPTION,
+ STANDBY_EXCEPTION,
+ IO_EXCEPTION
+ }
+
+ public UnreliableImplementation() {
+ this(null);
+ }
+
+ public UnreliableImplementation(String identifier) {
+ this(identifier, TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION);
+ }
+
+ public UnreliableImplementation(String identifier,
+ TypeOfExceptionToFailWith exceptionToFailWith) {
+ this.identifier = identifier;
+ this.exceptionToFailWith = exceptionToFailWith;
+ }
public void alwaysSucceeds() {
// do nothing
@@ -57,4 +85,60 @@ public class UnreliableImplementation implements UnreliableInterface {
}
}
+ @Override
+ public String succeedsOnceThenFailsReturningString()
+ throws UnreliableException, IOException, StandbyException {
+ if (succeedsOnceThenFailsCount++ < 1) {
+ return identifier;
+ } else {
+ switch (exceptionToFailWith) {
+ case STANDBY_EXCEPTION:
+ throw new StandbyException(identifier);
+ case UNRELIABLE_EXCEPTION:
+ throw new UnreliableException(identifier);
+ case IO_EXCEPTION:
+ throw new IOException(identifier);
+ }
+ return null;
+ }
+ }
+
+ @Override
+ public String succeedsTenTimesThenFailsReturningString()
+ throws UnreliableException, IOException, StandbyException {
+ if (succeedsTenTimesThenFailsCount++ < 10) {
+ return identifier;
+ } else {
+ switch (exceptionToFailWith) {
+ case STANDBY_EXCEPTION:
+ throw new StandbyException(identifier);
+ case UNRELIABLE_EXCEPTION:
+ throw new UnreliableException(identifier);
+ case IO_EXCEPTION:
+ throw new IOException(identifier);
+ default:
+ throw new RuntimeException(identifier);
+ }
+ }
+ }
+
+ @Override
+ public String succeedsOnceThenFailsReturningStringIdempotent()
+ throws UnreliableException, StandbyException, IOException {
+ if (succeedsOnceThenFailsIdempotentCount++ < 1) {
+ return identifier;
+ } else {
+ switch (exceptionToFailWith) {
+ case STANDBY_EXCEPTION:
+ throw new StandbyException(identifier);
+ case UNRELIABLE_EXCEPTION:
+ throw new UnreliableException(identifier);
+ case IO_EXCEPTION:
+ throw new IOException(identifier);
+ default:
+ throw new RuntimeException(identifier);
+ }
+ }
+ }
+
}
diff --git a/common/src/test/core/org/apache/hadoop/io/retry/UnreliableInterface.java b/common/src/test/core/org/apache/hadoop/io/retry/UnreliableInterface.java
index af4959151e7..04e45050017 100644
--- a/common/src/test/core/org/apache/hadoop/io/retry/UnreliableInterface.java
+++ b/common/src/test/core/org/apache/hadoop/io/retry/UnreliableInterface.java
@@ -18,12 +18,28 @@
package org.apache.hadoop.io.retry;
+import java.io.IOException;
+
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
public interface UnreliableInterface {
public static class UnreliableException extends Exception {
- // no body
+ private String identifier;
+
+ public UnreliableException() {
+ // no body
+ }
+
+ public UnreliableException(String identifier) {
+ this.identifier = identifier;
+ }
+
+ @Override
+ public String getMessage() {
+ return identifier;
+ }
}
public static class FatalException extends UnreliableException {
@@ -39,4 +55,12 @@ public interface UnreliableInterface {
boolean failsOnceThenSucceedsWithReturnValue() throws UnreliableException;
void failsTenTimesThenSucceeds() throws UnreliableException;
+
+ public String succeedsOnceThenFailsReturningString()
+ throws UnreliableException, StandbyException, IOException;
+ @Idempotent
+ public String succeedsOnceThenFailsReturningStringIdempotent()
+ throws UnreliableException, StandbyException, IOException;
+ public String succeedsTenTimesThenFailsReturningString()
+ throws UnreliableException, StandbyException, IOException;
}