HADOOP-13146. Refactor RetryInvocationHandler. Contributed by Tsz Wo Nicholas Sze.

This commit is contained in:
Jing Zhao 2016-05-16 15:23:36 -07:00 committed by Tsz-Wo Nicholas Sze
parent 63f73d78a1
commit 03c4491d89
3 changed files with 212 additions and 219 deletions

View File

@ -37,10 +37,21 @@ public interface FailoverProxyProvider<T> extends Closeable {
* provides information for debugging purposes. * provides information for debugging purposes.
*/ */
public final String proxyInfo; public final String proxyInfo;
public ProxyInfo(T proxy, String proxyInfo) { public ProxyInfo(T proxy, String proxyInfo) {
this.proxy = proxy; this.proxy = proxy;
this.proxyInfo = proxyInfo; this.proxyInfo = proxyInfo;
} }
public String getString(String methodName) {
return proxy.getClass().getSimpleName() + "." + methodName
+ " over " + proxyInfo;
}
@Override
public String toString() {
return proxy.getClass().getSimpleName() + " over " + proxyInfo;
}
} }
/** /**

View File

@ -17,49 +17,137 @@
*/ */
package org.apache.hadoop.io.retry; package org.apache.hadoop.io.retry;
import java.io.IOException; import com.google.common.annotations.VisibleForTesting;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo; import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.*;
import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.RpcInvocationHandler;
import com.google.common.annotations.VisibleForTesting; import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
/** /**
* This class implements RpcInvocationHandler and supports retry on the client * A {@link RpcInvocationHandler} which supports client side retry .
* side.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class RetryInvocationHandler<T> implements RpcInvocationHandler { public class RetryInvocationHandler<T> implements RpcInvocationHandler {
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class); public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
private final FailoverProxyProvider<T> proxyProvider;
/** private static class Counters {
* The number of times the associated proxyProvider has ever been failed over. /** Counter for retries. */
*/ private int retries;
private long proxyProviderFailoverCount = 0; /** Counter for method invocation has been failed over. */
private int failovers;
}
private static class ProxyDescriptor<T> {
private final FailoverProxyProvider<T> fpp;
/** Count the associated proxy provider has ever been failed over. */
private long failoverCount = 0;
private ProxyInfo<T> proxyInfo;
ProxyDescriptor(FailoverProxyProvider<T> fpp) {
this.fpp = fpp;
this.proxyInfo = fpp.getProxy();
}
synchronized ProxyInfo<T> getProxyInfo() {
return proxyInfo;
}
synchronized T getProxy() {
return proxyInfo.proxy;
}
synchronized long getFailoverCount() {
return failoverCount;
}
synchronized void failover(long expectedFailoverCount, Method method) {
// Make sure that concurrent failed invocations only cause a single
// actual failover.
if (failoverCount == expectedFailoverCount) {
fpp.performFailover(proxyInfo.proxy);
failoverCount++;
} else {
LOG.warn("A failover has occurred since the start of "
+ proxyInfo.getString(method.getName()));
}
proxyInfo = fpp.getProxy();
}
boolean idempotentOrAtMostOnce(Method method) throws NoSuchMethodException {
final Method m = fpp.getInterface()
.getMethod(method.getName(), method.getParameterTypes());
return m.isAnnotationPresent(Idempotent.class)
|| m.isAnnotationPresent(AtMostOnce.class);
}
void close() throws IOException {
fpp.close();
}
}
private static class RetryInfo {
private final long delay;
private final RetryAction failover;
private final RetryAction fail;
RetryInfo(long delay, RetryAction failover, RetryAction fail) {
this.delay = delay;
this.failover = failover;
this.fail = fail;
}
static RetryInfo newRetryInfo(RetryPolicy policy, Exception e,
Counters counters, boolean idempotentOrAtMostOnce) throws Exception {
long maxRetryDelay = 0;
RetryAction failover = null;
RetryAction retry = null;
RetryAction fail = null;
final Iterable<Exception> exceptions = e instanceof MultiException ?
((MultiException) e).getExceptions().values()
: Collections.singletonList(e);
for (Exception exception : exceptions) {
final RetryAction a = policy.shouldRetry(exception,
counters.retries, counters.failovers, idempotentOrAtMostOnce);
if (a.action == RetryAction.RetryDecision.FAIL) {
fail = a;
} else {
// must be a retry or failover
if (a.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) {
failover = a;
} else {
retry = a;
}
if (a.delayMillis > maxRetryDelay) {
maxRetryDelay = a.delayMillis;
}
}
}
return new RetryInfo(maxRetryDelay, failover,
failover == null && retry == null? fail: null);
}
}
private final ProxyDescriptor<T> proxyDescriptor;
private volatile boolean hasMadeASuccessfulCall = false; private volatile boolean hasMadeASuccessfulCall = false;
private final RetryPolicy defaultPolicy; private final RetryPolicy defaultPolicy;
private final Map<String,RetryPolicy> methodNameToPolicyMap; private final Map<String,RetryPolicy> methodNameToPolicyMap;
private ProxyInfo<T> currentProxy;
protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider, protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
RetryPolicy retryPolicy) { RetryPolicy retryPolicy) {
@ -69,39 +157,40 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider, protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
RetryPolicy defaultPolicy, RetryPolicy defaultPolicy,
Map<String, RetryPolicy> methodNameToPolicyMap) { Map<String, RetryPolicy> methodNameToPolicyMap) {
this.proxyProvider = proxyProvider; this.proxyDescriptor = new ProxyDescriptor<>(proxyProvider);
this.defaultPolicy = defaultPolicy; this.defaultPolicy = defaultPolicy;
this.methodNameToPolicyMap = methodNameToPolicyMap; this.methodNameToPolicyMap = methodNameToPolicyMap;
this.currentProxy = proxyProvider.getProxy(); }
private RetryPolicy getRetryPolicy(Method method) {
final RetryPolicy policy = methodNameToPolicyMap.get(method.getName());
return policy != null? policy: defaultPolicy;
} }
@Override @Override
public Object invoke(Object proxy, Method method, Object[] args) public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable { throws Throwable {
RetryPolicy policy = methodNameToPolicyMap.get(method.getName()); final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy());
if (policy == null) {
policy = defaultPolicy;
}
// The number of times this method invocation has been failed over.
int invocationFailoverCount = 0;
final boolean isRpc = isRpcInvocation(currentProxy.proxy);
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID; final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
int retries = 0; return invoke(method, args, isRpc, callId, new Counters());
}
private Object invoke(final Method method, final Object[] args,
final boolean isRpc, final int callId, final Counters counters)
throws Throwable {
final RetryPolicy policy = getRetryPolicy(method);
while (true) { while (true) {
// The number of times this invocation handler has ever been failed over, // The number of times this invocation handler has ever been failed over,
// before this method invocation attempt. Used to prevent concurrent // before this method invocation attempt. Used to prevent concurrent
// failed method invocations from triggering multiple failover attempts. // failed method invocations from triggering multiple failover attempts.
long invocationAttemptFailoverCount; final long failoverCount = proxyDescriptor.getFailoverCount();
synchronized (proxyProvider) {
invocationAttemptFailoverCount = proxyProviderFailoverCount;
}
if (isRpc) { if (isRpc) {
Client.setCallIdAndRetryCount(callId, retries); Client.setCallIdAndRetryCount(callId, counters.retries);
} }
try { try {
Object ret = invokeMethod(method, args); final Object ret = invokeMethod(method, args);
hasMadeASuccessfulCall = true; hasMadeASuccessfulCall = true;
return ret; return ret;
} catch (Exception ex) { } catch (Exception ex) {
@ -109,153 +198,74 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
// If interrupted, do not retry. // If interrupted, do not retry.
throw ex; throw ex;
} }
boolean isIdempotentOrAtMostOnce = proxyProvider.getInterface() handleException(method, policy, failoverCount, counters, ex);
.getMethod(method.getName(), method.getParameterTypes())
.isAnnotationPresent(Idempotent.class);
if (!isIdempotentOrAtMostOnce) {
isIdempotentOrAtMostOnce = proxyProvider.getInterface()
.getMethod(method.getName(), method.getParameterTypes())
.isAnnotationPresent(AtMostOnce.class);
}
List<RetryAction> actions = extractActions(policy, ex, retries++,
invocationFailoverCount, isIdempotentOrAtMostOnce);
RetryAction failAction = getFailAction(actions);
if (failAction != null) {
// fail.
if (failAction.reason != null) {
LOG.warn("Exception while invoking " + currentProxy.proxy.getClass()
+ "." + method.getName() + " over " + currentProxy.proxyInfo
+ ". Not retrying because " + failAction.reason, ex);
}
throw ex;
} else { // retry or failover
// avoid logging the failover if this is the first call on this
// proxy object, and we successfully achieve the failover without
// any flip-flopping
boolean worthLogging =
!(invocationFailoverCount == 0 && !hasMadeASuccessfulCall);
worthLogging |= LOG.isDebugEnabled();
RetryAction failOverAction = getFailOverAction(actions);
long delay = getDelayMillis(actions);
if (worthLogging) {
String msg = "Exception while invoking " + method.getName()
+ " of class " + currentProxy.proxy.getClass().getSimpleName()
+ " over " + currentProxy.proxyInfo;
if (invocationFailoverCount > 0) {
msg += " after " + invocationFailoverCount + " fail over attempts";
}
if (failOverAction != null) {
// failover
msg += ". Trying to fail over " + formatSleepMessage(delay);
} else {
// retry
msg += ". Retrying " + formatSleepMessage(delay);
}
LOG.info(msg, ex);
}
if (delay > 0) {
Thread.sleep(delay);
}
if (failOverAction != null) {
// Make sure that concurrent failed method invocations only cause a
// single actual fail over.
synchronized (proxyProvider) {
if (invocationAttemptFailoverCount == proxyProviderFailoverCount) {
proxyProvider.performFailover(currentProxy.proxy);
proxyProviderFailoverCount++;
} else {
LOG.warn("A failover has occurred since the start of this method"
+ " invocation attempt.");
}
currentProxy = proxyProvider.getProxy();
}
invocationFailoverCount++;
}
}
} }
} }
} }
/** private void handleException(final Method method, final RetryPolicy policy,
* Obtain a retry delay from list of RetryActions. final long expectedFailoverCount, final Counters counters,
*/ final Exception ex) throws Exception {
private long getDelayMillis(List<RetryAction> actions) { final RetryInfo retryInfo = RetryInfo.newRetryInfo(policy, ex, counters,
long retVal = 0; proxyDescriptor.idempotentOrAtMostOnce(method));
for (RetryAction action : actions) { counters.retries++;
if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY ||
action.action == RetryAction.RetryDecision.RETRY) { if (retryInfo.fail != null) {
if (action.delayMillis > retVal) { // fail.
retVal = action.delayMillis; if (retryInfo.fail.reason != null) {
} LOG.warn("Exception while invoking "
+ proxyDescriptor.getProxyInfo().getString(method.getName())
+ ". Not retrying because " + retryInfo.fail.reason, ex);
} }
throw ex;
}
// retry
final boolean isFailover = retryInfo.failover != null;
log(method, isFailover, counters.failovers, retryInfo.delay, ex);
if (retryInfo.delay > 0) {
Thread.sleep(retryInfo.delay);
}
if (isFailover) {
proxyDescriptor.failover(expectedFailoverCount, method);
counters.failovers++;
} }
return retVal;
} }
/** private void log(final Method method, final boolean isFailover,
* Return the first FAILOVER_AND_RETRY action. final int failovers, final long delay, final Exception ex) {
*/ // log info if this has made some successful calls or
private RetryAction getFailOverAction(List<RetryAction> actions) { // this is not the first failover
for (RetryAction action : actions) { final boolean info = hasMadeASuccessfulCall || failovers != 0;
if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) { if (!info && !LOG.isDebugEnabled()) {
return action; return;
}
} }
return null;
}
/** final StringBuilder b = new StringBuilder()
* Return the last FAIL action.. only if there are no RETRY actions. .append("Exception while invoking ")
*/ .append(proxyDescriptor.getProxyInfo().getString(method.getName()));
private RetryAction getFailAction(List<RetryAction> actions) { if (failovers > 0) {
RetryAction fAction = null; b.append(" after ").append(failovers).append(" failover attempts");
for (RetryAction action : actions) {
if (action.action == RetryAction.RetryDecision.FAIL) {
fAction = action;
} else {
// Atleast 1 RETRY
return null;
}
} }
return fAction; b.append(isFailover? ". Trying to failover ": ". Retrying ");
} b.append(delay > 0? "after sleeping for " + delay + "ms.": "immediately.");
private List<RetryAction> extractActions(RetryPolicy policy, Exception ex, if (info) {
int i, int invocationFailoverCount, LOG.info(b.toString(), ex);
boolean isIdempotentOrAtMostOnce)
throws Exception {
List<RetryAction> actions = new LinkedList<>();
if (ex instanceof MultiException) {
for (Exception th : ((MultiException) ex).getExceptions().values()) {
actions.add(policy.shouldRetry(th, i, invocationFailoverCount,
isIdempotentOrAtMostOnce));
}
} else { } else {
actions.add(policy.shouldRetry(ex, i, LOG.debug(b.toString(), ex);
invocationFailoverCount, isIdempotentOrAtMostOnce));
} }
return actions;
} }
private static String formatSleepMessage(long millis) {
if (millis > 0) {
return "after sleeping for " + millis + "ms.";
} else {
return "immediately.";
}
}
protected Object invokeMethod(Method method, Object[] args) throws Throwable { protected Object invokeMethod(Method method, Object[] args) throws Throwable {
try { try {
if (!method.isAccessible()) { if (!method.isAccessible()) {
method.setAccessible(true); method.setAccessible(true);
} }
return method.invoke(currentProxy.proxy, args); return method.invoke(proxyDescriptor.getProxy(), args);
} catch (InvocationTargetException e) { } catch (InvocationTargetException e) {
throw e.getCause(); throw e.getCause();
} }
@ -275,12 +285,11 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
proxyProvider.close(); proxyDescriptor.close();
} }
@Override //RpcInvocationHandler @Override //RpcInvocationHandler
public ConnectionId getConnectionId() { public ConnectionId getConnectionId() {
return RPC.getConnectionIdForProxy(currentProxy.proxy); return RPC.getConnectionIdForProxy(proxyDescriptor.getProxy());
} }
} }

View File

@ -18,55 +18,32 @@
package org.apache.hadoop.io.retry; package org.apache.hadoop.io.retry;
import static org.apache.hadoop.io.retry.RetryPolicies.RETRY_FOREVER; import org.apache.hadoop.io.retry.RetryPolicies.*;
import static org.apache.hadoop.io.retry.RetryPolicies.TRY_ONCE_THEN_FAIL;
import static org.apache.hadoop.io.retry.RetryPolicies.retryByException;
import static org.apache.hadoop.io.retry.RetryPolicies.retryByRemoteException;
import static org.apache.hadoop.io.retry.RetryPolicies.retryOtherThanRemoteException;
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep;
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithProportionalSleep;
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumTimeWithFixedSleep;
import static org.apache.hadoop.io.retry.RetryPolicies.retryForeverWithFixedSleep;
import static org.apache.hadoop.io.retry.RetryPolicies.exponentialBackoffRetry;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
import org.apache.hadoop.io.retry.RetryPolicies.RetryUpToMaximumCountWithFixedSleep;
import org.apache.hadoop.io.retry.RetryPolicies.RetryUpToMaximumTimeWithFixedSleep;
import org.apache.hadoop.io.retry.RetryPolicies.TryOnceThenFail;
import org.apache.hadoop.io.retry.UnreliableInterface.FatalException; import org.apache.hadoop.io.retry.UnreliableInterface.FatalException;
import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException; import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import org.junit.Before; import java.io.IOException;
import org.junit.Test;
import java.lang.reflect.UndeclaredThrowableException; import java.lang.reflect.UndeclaredThrowableException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.hadoop.io.retry.RetryPolicies.*;
import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.*;
public class TestRetryProxy { public class TestRetryProxy {
@ -131,25 +108,21 @@ public class TestRetryProxy {
final UnreliableInterface unreliable = (UnreliableInterface) final UnreliableInterface unreliable = (UnreliableInterface)
RetryProxy.create(UnreliableInterface.class, unreliableImpl, RETRY_FOREVER); RetryProxy.create(UnreliableInterface.class, unreliableImpl, RETRY_FOREVER);
assertTrue(RetryInvocationHandler.isRpcInvocation(unreliable)); assertTrue(RetryInvocationHandler.isRpcInvocation(unreliable));
final AtomicInteger count = new AtomicInteger();
// Embed the proxy in ProtocolTranslator // Embed the proxy in ProtocolTranslator
ProtocolTranslator xlator = new ProtocolTranslator() { ProtocolTranslator xlator = new ProtocolTranslator() {
int count = 0;
@Override @Override
public Object getUnderlyingProxyObject() { public Object getUnderlyingProxyObject() {
count++; count.getAndIncrement();
return unreliable; return unreliable;
} }
@Override
public String toString() {
return "" + count;
}
}; };
// For a proxy wrapped in ProtocolTranslator method should return true // For a proxy wrapped in ProtocolTranslator method should return true
assertTrue(RetryInvocationHandler.isRpcInvocation(xlator)); assertTrue(RetryInvocationHandler.isRpcInvocation(xlator));
// Ensure underlying proxy was looked at // Ensure underlying proxy was looked at
assertEquals(xlator.toString(), "1"); assertEquals(1, count.get());
// For non-proxy the method must return false // For non-proxy the method must return false
assertFalse(RetryInvocationHandler.isRpcInvocation(new Object())); assertFalse(RetryInvocationHandler.isRpcInvocation(new Object()));