diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 243735926a5..7c27b965846 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -32,6 +32,9 @@ Release 0.23.3 - UNRELEASED HADOOP-7776. Make the Ipc-Header in a RPC-Payload an explicit header. (sanjay) + HADOOP-7717. Move handling of concurrent client fail-overs to + RetryInvocationHandler (atm) + HADOOP-7862. Move the support for multiple protocols to lower layer so that Writable, PB and Avro can all use it (Sanjay) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index 51ff6a7e06a..3814107496f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -23,6 +23,7 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.Collections; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,6 +34,11 @@ import org.apache.hadoop.ipc.RpcInvocationHandler; class RetryInvocationHandler implements RpcInvocationHandler { public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class); private FailoverProxyProvider proxyProvider; + + /** + * The number of times the associated proxyProvider has ever been failed over. + */ + private long proxyProviderFailoverCount = 0; private RetryPolicy defaultPolicy; private Map methodNameToPolicyMap; @@ -61,16 +67,24 @@ class RetryInvocationHandler implements RpcInvocationHandler { policy = defaultPolicy; } - int failovers = 0; + // The number of times this method invocation has been failed over. + int invocationFailoverCount = 0; int retries = 0; while (true) { + // The number of times this invocation handler has ever been failed over, + // before this method invocation attempt. Used to prevent concurrent + // failed method invocations from triggering multiple failover attempts. + long invocationAttemptFailoverCount; + synchronized (proxyProvider) { + invocationAttemptFailoverCount = proxyProviderFailoverCount; + } try { return invokeMethod(method, args); } catch (Exception e) { boolean isMethodIdempotent = proxyProvider.getInterface() .getMethod(method.getName(), method.getParameterTypes()) .isAnnotationPresent(Idempotent.class); - RetryAction action = policy.shouldRetry(e, retries++, failovers, + RetryAction action = policy.shouldRetry(e, retries++, invocationFailoverCount, isMethodIdempotent); if (action == RetryAction.FAIL) { LOG.warn("Exception while invoking " + method.getName() @@ -82,10 +96,24 @@ class RetryInvocationHandler implements RpcInvocationHandler { } else if (action == RetryAction.FAILOVER_AND_RETRY) { LOG.warn("Exception while invoking " + method.getName() + " of " + currentProxy.getClass() - + ". Trying to fail over.", e); - failovers++; - proxyProvider.performFailover(currentProxy); + + " after " + invocationFailoverCount + " fail over attempts." + + " Trying to fail over.", e); + // Make sure that concurrent failed method invocations only cause a + // single actual fail over. + synchronized (proxyProvider) { + if (invocationAttemptFailoverCount == proxyProviderFailoverCount) { + proxyProvider.performFailover(currentProxy); + proxyProviderFailoverCount++; + } else { + LOG.warn("A failover has occurred since the start of this method" + + " invocation attempt."); + } + } + // The call to getProxy() could technically only be made in the event + // performFailover() is called, but it needs to be out here for the + // purpose of testing. currentProxy = proxyProvider.getProxy(); + invocationFailoverCount++; } if(LOG.isDebugEnabled()) { LOG.debug("Exception while invoking " + method.getName() diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java index 295bf13d11a..ba6828a9b87 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java @@ -20,6 +20,7 @@ package org.apache.hadoop.io.retry; import static org.junit.Assert.*; import java.io.IOException; +import java.util.concurrent.CountDownLatch; import org.apache.hadoop.io.retry.UnreliableImplementation.TypeOfExceptionToFailWith; import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException; @@ -35,22 +36,41 @@ public class TestFailoverProxy { private Object impl1; private Object impl2; + private boolean latchEnabled = false; + private CountDownLatch getProxyLatch; + private int failoversOccurred = 0; + public FlipFlopProxyProvider(Class iface, Object activeImpl, - Object standbyImpl) { + Object standbyImpl, int getProxyCountDown) { this.iface = iface; this.impl1 = activeImpl; this.impl2 = standbyImpl; currentlyActive = impl1; + getProxyLatch = new CountDownLatch(getProxyCountDown); + } + + public FlipFlopProxyProvider(Class iface, Object activeImpl, + Object standbyImpl) { + this(iface, activeImpl, standbyImpl, 0); } @Override public Object getProxy() { + if (latchEnabled) { + getProxyLatch.countDown(); + try { + getProxyLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } return currentlyActive; } @Override - public void performFailover(Object currentProxy) { + public synchronized void performFailover(Object currentProxy) { currentlyActive = impl1 == currentProxy ? impl2 : impl1; + failoversOccurred++; } @Override @@ -63,6 +83,13 @@ public class TestFailoverProxy { // Nothing to do. } + public void setLatchEnabled(boolean latchEnabled) { + this.latchEnabled = latchEnabled; + } + + public int getFailoversOccurred() { + return failoversOccurred; + } } public static class FailOverOnceOnAnyExceptionPolicy implements RetryPolicy { @@ -186,4 +213,55 @@ public class TestFailoverProxy { // IOException and this method is idempotent. assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningStringIdempotent()); } -} + + private static class ConcurrentMethodThread extends Thread { + + private UnreliableInterface unreliable; + public String result; + + public ConcurrentMethodThread(UnreliableInterface unreliable) { + this.unreliable = unreliable; + } + + public void run() { + try { + result = unreliable.failsIfIdentifierDoesntMatch("impl2"); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * Test that concurrent failed method invocations only result in a single + * failover. + */ + @Test + public void testConcurrentMethodFailures() throws InterruptedException { + FlipFlopProxyProvider proxyProvider = new FlipFlopProxyProvider( + UnreliableInterface.class, + new UnreliableImplementation("impl1", + TypeOfExceptionToFailWith.STANDBY_EXCEPTION), + new UnreliableImplementation("impl2", + TypeOfExceptionToFailWith.STANDBY_EXCEPTION), + 2); + + final UnreliableInterface unreliable = (UnreliableInterface)RetryProxy + .create(UnreliableInterface.class, proxyProvider, + RetryPolicies.failoverOnNetworkException(10)); + + ConcurrentMethodThread t1 = new ConcurrentMethodThread(unreliable); + ConcurrentMethodThread t2 = new ConcurrentMethodThread(unreliable); + + // Getting a proxy will now wait on a latch. + proxyProvider.setLatchEnabled(true); + + t1.start(); + t2.start(); + t1.join(); + t2.join(); + assertEquals("impl2", t1.result); + assertEquals("impl2", t2.result); + assertEquals(1, proxyProvider.getFailoversOccurred()); + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java index 10dc6b38309..7fa88b3b08e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java @@ -141,4 +141,23 @@ public class UnreliableImplementation implements UnreliableInterface { } } + @Override + public String failsIfIdentifierDoesntMatch(String identifier) + throws UnreliableException, StandbyException, IOException { + if (this.identifier.equals(identifier)) { + return identifier; + } else { + switch (exceptionToFailWith) { + case STANDBY_EXCEPTION: + throw new StandbyException(identifier); + case UNRELIABLE_EXCEPTION: + throw new UnreliableException(identifier); + case IO_EXCEPTION: + throw new IOException(identifier); + default: + throw new RuntimeException(identifier); + } + } + } + } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java index 04e45050017..e794c1686c2 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java @@ -63,4 +63,8 @@ public interface UnreliableInterface { throws UnreliableException, StandbyException, IOException; public String succeedsTenTimesThenFailsReturningString() throws UnreliableException, StandbyException, IOException; + + @Idempotent + public String failsIfIdentifierDoesntMatch(String identifier) + throws UnreliableException, StandbyException, IOException; }