HADOOP-7717. Move handling of concurrent client fail-overs to RetryInvocationHandler (atm)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1179483 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2011-10-06 01:01:19 +00:00
parent 282e2e910c
commit 14569ab482
5 changed files with 144 additions and 12 deletions

View File

@ -17,15 +17,18 @@ Trunk (unreleased changes)
close (atm)
HADOOP-7668. Add a NetUtils method that can tell if an InetAddress
belongs to local host. (suresh)
belongs to local host. (suresh)
HADOOP-7687 Make getProtocolSignature public (sanjay)
HADOOP-7693. Enhance AvroRpcEngine to support the new #addProtocol
interface introduced in HADOOP-7524. (cutting)
interface introduced in HADOOP-7524. (cutting)
HADOOP-7716 RPC protocol registration on SS does not log the protocol name
(only the class which may be different) (sanjay)
HADOOP-7716. RPC protocol registration on SS does not log the protocol name
(only the class which may be different) (sanjay)
HADOOP-7717. Move handling of concurrent client fail-overs to
RetryInvocationHandler (atm)
BUGS

View File

@ -24,6 +24,7 @@
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -32,6 +33,11 @@
class RetryInvocationHandler implements InvocationHandler, Closeable {
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
private FailoverProxyProvider proxyProvider;
/**
* The number of times the associated proxyProvider has ever been failed over.
*/
private long proxyProviderFailoverCount = 0;
private RetryPolicy defaultPolicy;
private Map<String,RetryPolicy> methodNameToPolicyMap;
@ -60,16 +66,24 @@ public Object invoke(Object proxy, Method method, Object[] args)
policy = defaultPolicy;
}
int failovers = 0;
// The number of times this method invocation has been failed over.
int invocationFailoverCount = 0;
int retries = 0;
while (true) {
// The number of times this invocation handler has ever been failed over,
// before this method invocation attempt. Used to prevent concurrent
// failed method invocations from triggering multiple failover attempts.
long invocationAttemptFailoverCount;
synchronized (proxyProvider) {
invocationAttemptFailoverCount = proxyProviderFailoverCount;
}
try {
return invokeMethod(method, args);
} catch (Exception e) {
boolean isMethodIdempotent = proxyProvider.getInterface()
.getMethod(method.getName(), method.getParameterTypes())
.isAnnotationPresent(Idempotent.class);
RetryAction action = policy.shouldRetry(e, retries++, failovers,
RetryAction action = policy.shouldRetry(e, retries++, invocationFailoverCount,
isMethodIdempotent);
if (action == RetryAction.FAIL) {
LOG.warn("Exception while invoking " + method.getName()
@ -81,10 +95,24 @@ public Object invoke(Object proxy, Method method, Object[] args)
} else if (action == RetryAction.FAILOVER_AND_RETRY) {
LOG.warn("Exception while invoking " + method.getName()
+ " of " + currentProxy.getClass()
+ ". Trying to fail over.", e);
failovers++;
proxyProvider.performFailover(currentProxy);
+ " after " + invocationFailoverCount + " fail over attempts."
+ " Trying to fail over.", e);
// Make sure that concurrent failed method invocations only cause a
// single actual fail over.
synchronized (proxyProvider) {
if (invocationAttemptFailoverCount == proxyProviderFailoverCount) {
proxyProvider.performFailover(currentProxy);
proxyProviderFailoverCount++;
} else {
LOG.warn("A failover has occurred since the start of this method"
+ " invocation attempt.");
}
}
// The call to getProxy() could technically only be made in the event
// performFailover() is called, but it needs to be out here for the
// purpose of testing.
currentProxy = proxyProvider.getProxy();
invocationFailoverCount++;
}
if(LOG.isDebugEnabled()) {
LOG.debug("Exception while invoking " + method.getName()

View File

@ -20,6 +20,7 @@
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.io.retry.UnreliableImplementation.TypeOfExceptionToFailWith;
import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
@ -35,22 +36,41 @@ public static class FlipFlopProxyProvider implements FailoverProxyProvider {
private Object impl1;
private Object impl2;
private boolean latchEnabled = false;
private CountDownLatch getProxyLatch;
private int failoversOccurred = 0;
public FlipFlopProxyProvider(Class<?> iface, Object activeImpl,
Object standbyImpl) {
Object standbyImpl, int getProxyCountDown) {
this.iface = iface;
this.impl1 = activeImpl;
this.impl2 = standbyImpl;
currentlyActive = impl1;
getProxyLatch = new CountDownLatch(getProxyCountDown);
}
public FlipFlopProxyProvider(Class<?> iface, Object activeImpl,
Object standbyImpl) {
this(iface, activeImpl, standbyImpl, 0);
}
@Override
public Object getProxy() {
if (latchEnabled) {
getProxyLatch.countDown();
try {
getProxyLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return currentlyActive;
}
@Override
public void performFailover(Object currentProxy) {
public synchronized void performFailover(Object currentProxy) {
currentlyActive = impl1 == currentProxy ? impl2 : impl1;
failoversOccurred++;
}
@Override
@ -63,6 +83,13 @@ public void close() throws IOException {
// Nothing to do.
}
public void setLatchEnabled(boolean latchEnabled) {
this.latchEnabled = latchEnabled;
}
public int getFailoversOccurred() {
return failoversOccurred;
}
}
public static class FailOverOnceOnAnyExceptionPolicy implements RetryPolicy {
@ -186,4 +213,55 @@ public void testFailoverOnNetworkExceptionIdempotentOperation()
// IOException and this method is idempotent.
assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningStringIdempotent());
}
}
private static class ConcurrentMethodThread extends Thread {
private UnreliableInterface unreliable;
public String result;
public ConcurrentMethodThread(UnreliableInterface unreliable) {
this.unreliable = unreliable;
}
public void run() {
try {
result = unreliable.failsIfIdentifierDoesntMatch("impl2");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
/**
* Test that concurrent failed method invocations only result in a single
* failover.
*/
@Test
public void testConcurrentMethodFailures() throws InterruptedException {
FlipFlopProxyProvider proxyProvider = new FlipFlopProxyProvider(
UnreliableInterface.class,
new UnreliableImplementation("impl1",
TypeOfExceptionToFailWith.STANDBY_EXCEPTION),
new UnreliableImplementation("impl2",
TypeOfExceptionToFailWith.STANDBY_EXCEPTION),
2);
final UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
.create(UnreliableInterface.class, proxyProvider,
RetryPolicies.failoverOnNetworkException(10));
ConcurrentMethodThread t1 = new ConcurrentMethodThread(unreliable);
ConcurrentMethodThread t2 = new ConcurrentMethodThread(unreliable);
// Getting a proxy will now wait on a latch.
proxyProvider.setLatchEnabled(true);
t1.start();
t2.start();
t1.join();
t2.join();
assertEquals("impl2", t1.result);
assertEquals("impl2", t2.result);
assertEquals(1, proxyProvider.getFailoversOccurred());
}
}

View File

@ -141,4 +141,23 @@ public String succeedsOnceThenFailsReturningStringIdempotent()
}
}
@Override
public String failsIfIdentifierDoesntMatch(String identifier)
throws UnreliableException, StandbyException, IOException {
if (this.identifier.equals(identifier)) {
return identifier;
} else {
switch (exceptionToFailWith) {
case STANDBY_EXCEPTION:
throw new StandbyException(identifier);
case UNRELIABLE_EXCEPTION:
throw new UnreliableException(identifier);
case IO_EXCEPTION:
throw new IOException(identifier);
default:
throw new RuntimeException(identifier);
}
}
}
}

View File

@ -63,4 +63,8 @@ public String succeedsOnceThenFailsReturningStringIdempotent()
throws UnreliableException, StandbyException, IOException;
public String succeedsTenTimesThenFailsReturningString()
throws UnreliableException, StandbyException, IOException;
@Idempotent
public String failsIfIdentifierDoesntMatch(String identifier)
throws UnreliableException, StandbyException, IOException;
}