HADOOP-13227. AsyncCallHandler should use an event driven architecture to handle async calls.

This commit is contained in:
Tsz-Wo Nicholas Sze 2016-06-25 15:45:16 +08:00
parent b59b8b7351
commit 20ee619e32
8 changed files with 224 additions and 117 deletions

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.io.retry; package org.apache.hadoop.io.retry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -27,17 +28,21 @@ import org.apache.hadoop.util.concurrent.AsyncGet;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.InterruptedIOException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.LinkedList; import java.util.Iterator;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** Handle async calls. */ /** Handle async calls. */
@InterfaceAudience.Private @InterfaceAudience.Private
public class AsyncCallHandler { public class AsyncCallHandler {
static final Logger LOG = LoggerFactory.getLogger(AsyncCallHandler.class); public static final Logger LOG = LoggerFactory.getLogger(
AsyncCallHandler.class);
private static final ThreadLocal<AsyncGet<?, Exception>> private static final ThreadLocal<AsyncGet<?, Exception>>
LOWER_LAYER_ASYNC_RETURN = new ThreadLocal<>(); LOWER_LAYER_ASYNC_RETURN = new ThreadLocal<>();
@ -73,35 +78,34 @@ public class AsyncCallHandler {
/** A simple concurrent queue which keeping track the empty start time. */ /** A simple concurrent queue which keeping track the empty start time. */
static class ConcurrentQueue<T> { static class ConcurrentQueue<T> {
private final Queue<T> queue = new LinkedList<>(); private final Queue<T> queue = new ConcurrentLinkedQueue<>();
private long emptyStartTime = Time.monotonicNow(); private final AtomicLong emptyStartTime
= new AtomicLong(Time.monotonicNow());
synchronized int size() { Iterator<T> iterator() {
return queue.size(); return queue.iterator();
} }
/** Is the queue empty for more than the given time in millisecond? */ /** Is the queue empty for more than the given time in millisecond? */
synchronized boolean isEmpty(long time) { boolean isEmpty(long time) {
return queue.isEmpty() && Time.monotonicNow() - emptyStartTime > time; return Time.monotonicNow() - emptyStartTime.get() > time
&& queue.isEmpty();
} }
synchronized void offer(T c) { void offer(T c) {
final boolean added = queue.offer(c); final boolean added = queue.offer(c);
Preconditions.checkState(added); Preconditions.checkState(added);
} }
synchronized T poll() { void checkEmpty() {
Preconditions.checkState(!queue.isEmpty());
final T t = queue.poll();
if (queue.isEmpty()) { if (queue.isEmpty()) {
emptyStartTime = Time.monotonicNow(); emptyStartTime.set(Time.monotonicNow());
} }
return t;
} }
} }
/** A queue for handling async calls. */ /** A queue for handling async calls. */
static class AsyncCallQueue { class AsyncCallQueue {
private final ConcurrentQueue<AsyncCall> queue = new ConcurrentQueue<>(); private final ConcurrentQueue<AsyncCall> queue = new ConcurrentQueue<>();
private final Processor processor = new Processor(); private final Processor processor = new Processor();
@ -113,20 +117,29 @@ public class AsyncCallHandler {
processor.tryStart(); processor.tryStart();
} }
void checkCalls() { long checkCalls() {
final int size = queue.size(); final long startTime = Time.monotonicNow();
for (int i = 0; i < size; i++) { long minWaitTime = Processor.MAX_WAIT_PERIOD;
final AsyncCall c = queue.poll();
if (!c.isDone()) { for (final Iterator<AsyncCall> i = queue.iterator(); i.hasNext();) {
queue.offer(c); // the call is not done yet, add it back. final AsyncCall c = i.next();
if (c.isDone()) {
i.remove(); // the call is done, remove it from the queue.
queue.checkEmpty();
} else {
final Long waitTime = c.getWaitTime(startTime);
if (waitTime != null && waitTime > 0 && waitTime < minWaitTime) {
minWaitTime = waitTime;
}
} }
} }
return minWaitTime;
} }
/** Process the async calls in the queue. */ /** Process the async calls in the queue. */
private class Processor { private class Processor {
static final long GRACE_PERIOD = 10*1000L; static final long GRACE_PERIOD = 3*1000L;
static final long SLEEP_PERIOD = 100L; static final long MAX_WAIT_PERIOD = 100L;
private final AtomicReference<Thread> running = new AtomicReference<>(); private final AtomicReference<Thread> running = new AtomicReference<>();
@ -141,15 +154,16 @@ public class AsyncCallHandler {
@Override @Override
public void run() { public void run() {
for (; isRunning(this);) { for (; isRunning(this);) {
final long waitTime = checkCalls();
tryStop(this);
try { try {
Thread.sleep(SLEEP_PERIOD); synchronized (AsyncCallHandler.this) {
AsyncCallHandler.this.wait(waitTime);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
kill(this); kill(this);
return;
} }
checkCalls();
tryStop(this);
} }
} }
}; };
@ -215,10 +229,9 @@ public class AsyncCallHandler {
private AsyncGet<?, Exception> lowerLayerAsyncGet; private AsyncGet<?, Exception> lowerLayerAsyncGet;
AsyncCall(Method method, Object[] args, boolean isRpc, int callId, AsyncCall(Method method, Object[] args, boolean isRpc, int callId,
RetryInvocationHandler.Counters counters,
RetryInvocationHandler<?> retryInvocationHandler, RetryInvocationHandler<?> retryInvocationHandler,
AsyncCallHandler asyncCallHandler) { AsyncCallHandler asyncCallHandler) {
super(method, args, isRpc, callId, counters, retryInvocationHandler); super(method, args, isRpc, callId, retryInvocationHandler);
this.asyncCallHandler = asyncCallHandler; this.asyncCallHandler = asyncCallHandler;
} }
@ -226,6 +239,7 @@ public class AsyncCallHandler {
/** @return true if the call is done; otherwise, return false. */ /** @return true if the call is done; otherwise, return false. */
boolean isDone() { boolean isDone() {
final CallReturn r = invokeOnce(); final CallReturn r = invokeOnce();
LOG.debug("#{}: {}", getCallId(), r.getState());
switch (r.getState()) { switch (r.getState()) {
case RETURNED: case RETURNED:
case EXCEPTION: case EXCEPTION:
@ -234,6 +248,7 @@ public class AsyncCallHandler {
case RETRY: case RETRY:
invokeOnce(); invokeOnce();
break; break;
case WAIT_RETRY:
case ASYNC_CALL_IN_PROGRESS: case ASYNC_CALL_IN_PROGRESS:
case ASYNC_INVOKED: case ASYNC_INVOKED:
// nothing to do // nothing to do
@ -244,13 +259,25 @@ public class AsyncCallHandler {
return false; return false;
} }
@Override
CallReturn processWaitTimeAndRetryInfo() {
final Long waitTime = getWaitTime(Time.monotonicNow());
LOG.trace("#{} processRetryInfo: waitTime={}", getCallId(), waitTime);
if (waitTime != null && waitTime > 0) {
return CallReturn.WAIT_RETRY;
}
processRetryInfo();
return CallReturn.RETRY;
}
@Override @Override
CallReturn invoke() throws Throwable { CallReturn invoke() throws Throwable {
LOG.debug("{}.invoke {}", getClass().getSimpleName(), this); LOG.debug("{}.invoke {}", getClass().getSimpleName(), this);
if (lowerLayerAsyncGet != null) { if (lowerLayerAsyncGet != null) {
// async call was submitted early, check the lower level async call // async call was submitted early, check the lower level async call
final boolean isDone = lowerLayerAsyncGet.isDone(); final boolean isDone = lowerLayerAsyncGet.isDone();
LOG.trace("invoke: lowerLayerAsyncGet.isDone()? {}", isDone); LOG.trace("#{} invoke: lowerLayerAsyncGet.isDone()? {}",
getCallId(), isDone);
if (!isDone) { if (!isDone) {
return CallReturn.ASYNC_CALL_IN_PROGRESS; return CallReturn.ASYNC_CALL_IN_PROGRESS;
} }
@ -262,7 +289,7 @@ public class AsyncCallHandler {
} }
// submit a new async call // submit a new async call
LOG.trace("invoke: ASYNC_INVOKED"); LOG.trace("#{} invoke: ASYNC_INVOKED", getCallId());
final boolean mode = Client.isAsynchronousMode(); final boolean mode = Client.isAsynchronousMode();
try { try {
Client.setAsynchronousMode(true); Client.setAsynchronousMode(true);
@ -271,9 +298,9 @@ public class AsyncCallHandler {
Preconditions.checkState(r == null); Preconditions.checkState(r == null);
lowerLayerAsyncGet = getLowerLayerAsyncReturn(); lowerLayerAsyncGet = getLowerLayerAsyncReturn();
if (counters.isZeros()) { if (getCounters().isZeros()) {
// first async attempt, initialize // first async attempt, initialize
LOG.trace("invoke: initAsyncCall"); LOG.trace("#{} invoke: initAsyncCall", getCallId());
asyncCallHandler.initAsyncCall(this, asyncCallReturn); asyncCallHandler.initAsyncCall(this, asyncCallReturn);
} }
return CallReturn.ASYNC_INVOKED; return CallReturn.ASYNC_INVOKED;
@ -287,9 +314,9 @@ public class AsyncCallHandler {
private volatile boolean hasSuccessfulCall = false; private volatile boolean hasSuccessfulCall = false;
AsyncCall newAsyncCall(Method method, Object[] args, boolean isRpc, AsyncCall newAsyncCall(Method method, Object[] args, boolean isRpc,
int callId, RetryInvocationHandler.Counters counters, int callId,
RetryInvocationHandler<?> retryInvocationHandler) { RetryInvocationHandler<?> retryInvocationHandler) {
return new AsyncCall(method, args, isRpc, callId, counters, return new AsyncCall(method, args, isRpc, callId,
retryInvocationHandler, this); retryInvocationHandler, this);
} }
@ -318,4 +345,9 @@ public class AsyncCallHandler {
}; };
ASYNC_RETURN.set(asyncGet); ASYNC_RETURN.set(asyncGet);
} }
@VisibleForTesting
public static long getGracePeriod() {
return AsyncCallQueue.Processor.GRACE_PERIOD;
}
} }

View File

@ -29,6 +29,8 @@ class CallReturn {
EXCEPTION, EXCEPTION,
/** Call should be retried according to the {@link RetryPolicy}. */ /** Call should be retried according to the {@link RetryPolicy}. */
RETRY, RETRY,
/** Call should wait and then retry according to the {@link RetryPolicy}. */
WAIT_RETRY,
/** Call, which is async, is still in progress. */ /** Call, which is async, is still in progress. */
ASYNC_CALL_IN_PROGRESS, ASYNC_CALL_IN_PROGRESS,
/** Call, which is async, just has been invoked. */ /** Call, which is async, just has been invoked. */
@ -39,6 +41,7 @@ class CallReturn {
State.ASYNC_CALL_IN_PROGRESS); State.ASYNC_CALL_IN_PROGRESS);
static final CallReturn ASYNC_INVOKED = new CallReturn(State.ASYNC_INVOKED); static final CallReturn ASYNC_INVOKED = new CallReturn(State.ASYNC_INVOKED);
static final CallReturn RETRY = new CallReturn(State.RETRY); static final CallReturn RETRY = new CallReturn(State.RETRY);
static final CallReturn WAIT_RETRY = new CallReturn(State.WAIT_RETRY);
private final Object returnValue; private final Object returnValue;
private final Throwable thrown; private final Throwable thrown;

View File

@ -18,13 +18,14 @@
package org.apache.hadoop.io.retry; package org.apache.hadoop.io.retry;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo; import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.*; import org.apache.hadoop.ipc.*;
import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
@ -41,33 +42,51 @@ import java.util.Map;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class RetryInvocationHandler<T> implements RpcInvocationHandler { public class RetryInvocationHandler<T> implements RpcInvocationHandler {
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class); public static final Logger LOG = LoggerFactory.getLogger(
RetryInvocationHandler.class);
static class Call { static class Call {
private final Method method; private final Method method;
private final Object[] args; private final Object[] args;
private final boolean isRpc; private final boolean isRpc;
private final int callId; private final int callId;
final Counters counters; private final Counters counters = new Counters();
private final RetryPolicy retryPolicy; private final RetryPolicy retryPolicy;
private final RetryInvocationHandler<?> retryInvocationHandler; private final RetryInvocationHandler<?> retryInvocationHandler;
private RetryInfo retryInfo;
Call(Method method, Object[] args, boolean isRpc, int callId, Call(Method method, Object[] args, boolean isRpc, int callId,
Counters counters, RetryInvocationHandler<?> retryInvocationHandler) { RetryInvocationHandler<?> retryInvocationHandler) {
this.method = method; this.method = method;
this.args = args; this.args = args;
this.isRpc = isRpc; this.isRpc = isRpc;
this.callId = callId; this.callId = callId;
this.counters = counters;
this.retryPolicy = retryInvocationHandler.getRetryPolicy(method); this.retryPolicy = retryInvocationHandler.getRetryPolicy(method);
this.retryInvocationHandler = retryInvocationHandler; this.retryInvocationHandler = retryInvocationHandler;
} }
int getCallId() {
return callId;
}
Counters getCounters() {
return counters;
}
synchronized Long getWaitTime(final long now) {
return retryInfo == null? null: retryInfo.retryTime - now;
}
/** Invoke the call once without retrying. */ /** Invoke the call once without retrying. */
synchronized CallReturn invokeOnce() { synchronized CallReturn invokeOnce() {
try { try {
if (retryInfo != null) {
return processWaitTimeAndRetryInfo();
}
// The number of times this invocation handler has ever been failed over // The number of times this invocation handler has ever been failed over
// before this method invocation attempt. Used to prevent concurrent // before this method invocation attempt. Used to prevent concurrent
// failed method invocations from triggering multiple failover attempts. // failed method invocations from triggering multiple failover attempts.
@ -76,28 +95,70 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
return invoke(); return invoke();
} catch (Exception e) { } catch (Exception e) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace(this, e); LOG.trace(toString(), e);
} }
if (Thread.currentThread().isInterrupted()) { if (Thread.currentThread().isInterrupted()) {
// If interrupted, do not retry. // If interrupted, do not retry.
throw e; throw e;
} }
retryInvocationHandler.handleException(
method, retryPolicy, failoverCount, counters, e); retryInfo = retryInvocationHandler.handleException(
return CallReturn.RETRY; method, callId, retryPolicy, counters, failoverCount, e);
return processWaitTimeAndRetryInfo();
} }
} catch(Throwable t) { } catch(Throwable t) {
return new CallReturn(t); return new CallReturn(t);
} }
} }
/**
* It first processes the wait time, if there is any,
* and then invokes {@link #processRetryInfo()}.
*
* If the wait time is positive, it either sleeps for synchronous calls
* or immediately returns for asynchronous calls.
*
* @return {@link CallReturn#RETRY} if the retryInfo is processed;
* otherwise, return {@link CallReturn#WAIT_RETRY}.
*/
CallReturn processWaitTimeAndRetryInfo() throws InterruptedIOException {
final Long waitTime = getWaitTime(Time.monotonicNow());
LOG.trace("#{} processRetryInfo: retryInfo={}, waitTime={}",
callId, retryInfo, waitTime);
if (waitTime != null && waitTime > 0) {
try {
Thread.sleep(retryInfo.delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while waiting to retry", e);
InterruptedIOException intIOE = new InterruptedIOException(
"Retry interrupted");
intIOE.initCause(e);
throw intIOE;
}
}
processRetryInfo();
return CallReturn.RETRY;
}
synchronized void processRetryInfo() {
counters.retries++;
if (retryInfo.isFailover()) {
retryInvocationHandler.proxyDescriptor.failover(
retryInfo.expectedFailoverCount, method, callId);
counters.failovers++;
}
retryInfo = null;
}
CallReturn invoke() throws Throwable { CallReturn invoke() throws Throwable {
return new CallReturn(invokeMethod()); return new CallReturn(invokeMethod());
} }
Object invokeMethod() throws Throwable { Object invokeMethod() throws Throwable {
if (isRpc) { if (isRpc) {
Client.setCallIdAndRetryCount(callId, counters.retries); Client.setCallIdAndRetryCount(callId, counters.retries,
retryInvocationHandler.asyncCallHandler);
} }
return retryInvocationHandler.invokeMethod(method, args); return retryInvocationHandler.invokeMethod(method, args);
} }
@ -146,15 +207,16 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
return failoverCount; return failoverCount;
} }
synchronized void failover(long expectedFailoverCount, Method method) { synchronized void failover(long expectedFailoverCount, Method method,
int callId) {
// Make sure that concurrent failed invocations only cause a single // Make sure that concurrent failed invocations only cause a single
// actual failover. // actual failover.
if (failoverCount == expectedFailoverCount) { if (failoverCount == expectedFailoverCount) {
fpp.performFailover(proxyInfo.proxy); fpp.performFailover(proxyInfo.proxy);
failoverCount++; failoverCount++;
} else { } else {
LOG.warn("A failover has occurred since the start of " LOG.warn("A failover has occurred since the start of call #" + callId
+ proxyInfo.getString(method.getName())); + " " + proxyInfo.getString(method.getName()));
} }
proxyInfo = fpp.getProxy(); proxyInfo = fpp.getProxy();
} }
@ -172,22 +234,33 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
} }
private static class RetryInfo { private static class RetryInfo {
private final long retryTime;
private final long delay; private final long delay;
private final RetryAction failover; private final RetryAction action;
private final RetryAction fail; private final long expectedFailoverCount;
RetryInfo(long delay, RetryAction failover, RetryAction fail) { RetryInfo(long delay, RetryAction action, long expectedFailoverCount) {
this.delay = delay; this.delay = delay;
this.failover = failover; this.retryTime = Time.monotonicNow() + delay;
this.fail = fail; this.action = action;
this.expectedFailoverCount = expectedFailoverCount;
}
boolean isFailover() {
return action != null
&& action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY;
}
boolean isFail() {
return action != null
&& action.action == RetryAction.RetryDecision.FAIL;
} }
static RetryInfo newRetryInfo(RetryPolicy policy, Exception e, static RetryInfo newRetryInfo(RetryPolicy policy, Exception e,
Counters counters, boolean idempotentOrAtMostOnce) throws Exception { Counters counters, boolean idempotentOrAtMostOnce,
long expectedFailoverCount) throws Exception {
RetryAction max = null;
long maxRetryDelay = 0; long maxRetryDelay = 0;
RetryAction failover = null;
RetryAction retry = null;
RetryAction fail = null;
final Iterable<Exception> exceptions = e instanceof MultiException ? final Iterable<Exception> exceptions = e instanceof MultiException ?
((MultiException) e).getExceptions().values() ((MultiException) e).getExceptions().values()
@ -195,23 +268,19 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
for (Exception exception : exceptions) { for (Exception exception : exceptions) {
final RetryAction a = policy.shouldRetry(exception, final RetryAction a = policy.shouldRetry(exception,
counters.retries, counters.failovers, idempotentOrAtMostOnce); counters.retries, counters.failovers, idempotentOrAtMostOnce);
if (a.action == RetryAction.RetryDecision.FAIL) { if (a.action != RetryAction.RetryDecision.FAIL) {
fail = a;
} else {
// must be a retry or failover // must be a retry or failover
if (a.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) {
failover = a;
} else {
retry = a;
}
if (a.delayMillis > maxRetryDelay) { if (a.delayMillis > maxRetryDelay) {
maxRetryDelay = a.delayMillis; maxRetryDelay = a.delayMillis;
} }
} }
if (max == null || max.action.compareTo(a.action) < 0) {
max = a;
}
} }
return new RetryInfo(maxRetryDelay, failover, return new RetryInfo(maxRetryDelay, max, expectedFailoverCount);
failover == null && retry == null? fail: null);
} }
} }
@ -246,13 +315,12 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
return proxyDescriptor.getFailoverCount(); return proxyDescriptor.getFailoverCount();
} }
private Call newCall(Method method, Object[] args, boolean isRpc, int callId, private Call newCall(Method method, Object[] args, boolean isRpc,
Counters counters) { int callId) {
if (Client.isAsynchronousMode()) { if (Client.isAsynchronousMode()) {
return asyncCallHandler.newAsyncCall(method, args, isRpc, callId, return asyncCallHandler.newAsyncCall(method, args, isRpc, callId, this);
counters, this);
} else { } else {
return new Call(method, args, isRpc, callId, counters, this); return new Call(method, args, isRpc, callId, this);
} }
} }
@ -261,9 +329,8 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
throws Throwable { throws Throwable {
final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy()); final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy());
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID; final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
final Counters counters = new Counters();
final Call call = newCall(method, args, isRpc, callId, counters); final Call call = newCall(method, args, isRpc, callId);
while (true) { while (true) {
final CallReturn c = call.invokeOnce(); final CallReturn c = call.invokeOnce();
final CallReturn.State state = c.getState(); final CallReturn.State state = c.getState();
@ -275,45 +342,24 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
} }
} }
private void handleException(final Method method, final RetryPolicy policy, private RetryInfo handleException(final Method method, final int callId,
final long expectedFailoverCount, final Counters counters, final RetryPolicy policy, final Counters counters,
final Exception ex) throws Exception { final long expectFailoverCount, final Exception e) throws Exception {
final RetryInfo retryInfo = RetryInfo.newRetryInfo(policy, ex, counters, final RetryInfo retryInfo = RetryInfo.newRetryInfo(policy, e,
proxyDescriptor.idempotentOrAtMostOnce(method)); counters, proxyDescriptor.idempotentOrAtMostOnce(method),
counters.retries++; expectFailoverCount);
if (retryInfo.isFail()) {
if (retryInfo.fail != null) {
// fail. // fail.
if (retryInfo.fail.reason != null) { if (retryInfo.action.reason != null) {
LOG.warn("Exception while invoking " LOG.warn("Exception while invoking call #" + callId + " "
+ proxyDescriptor.getProxyInfo().getString(method.getName()) + proxyDescriptor.getProxyInfo().getString(method.getName())
+ ". Not retrying because " + retryInfo.fail.reason, ex); + ". Not retrying because " + retryInfo.action.reason, e);
} }
throw ex; throw e;
} }
// retry log(method, retryInfo.isFailover(), counters.failovers, retryInfo.delay, e);
final boolean isFailover = retryInfo.failover != null; return retryInfo;
log(method, isFailover, counters.failovers, retryInfo.delay, ex);
if (retryInfo.delay > 0) {
try {
Thread.sleep(retryInfo.delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while waiting to retry", e);
InterruptedIOException intIOE = new InterruptedIOException(
"Retry interrupted");
intIOE.initCause(e);
throw intIOE;
}
}
if (isFailover) {
proxyDescriptor.failover(expectedFailoverCount, method);
counters.failovers++;
}
} }
private void log(final Method method, final boolean isFailover, private void log(final Method method, final boolean isFailover,

View File

@ -67,6 +67,7 @@ public interface RetryPolicy {
} }
public enum RetryDecision { public enum RetryDecision {
// Ordering: FAIL < RETRY < FAILOVER_AND_RETRY.
FAIL, FAIL,
RETRY, RETRY,
FAILOVER_AND_RETRY FAILOVER_AND_RETRY

View File

@ -93,6 +93,8 @@ public class Client implements AutoCloseable {
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>(); private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>(); private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
private static final ThreadLocal<Object> EXTERNAL_CALL_HANDLER
= new ThreadLocal<>();
private static final ThreadLocal<AsyncGet<? extends Writable, IOException>> private static final ThreadLocal<AsyncGet<? extends Writable, IOException>>
ASYNC_RPC_RESPONSE = new ThreadLocal<>(); ASYNC_RPC_RESPONSE = new ThreadLocal<>();
private static final ThreadLocal<Boolean> asynchronousMode = private static final ThreadLocal<Boolean> asynchronousMode =
@ -111,13 +113,15 @@ public class Client implements AutoCloseable {
} }
/** Set call id and retry count for the next call. */ /** Set call id and retry count for the next call. */
public static void setCallIdAndRetryCount(int cid, int rc) { public static void setCallIdAndRetryCount(int cid, int rc,
Object externalHandler) {
Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID); Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID);
Preconditions.checkState(callId.get() == null); Preconditions.checkState(callId.get() == null);
Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT); Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT);
callId.set(cid); callId.set(cid);
retryCount.set(rc); retryCount.set(rc);
EXTERNAL_CALL_HANDLER.set(externalHandler);
} }
private ConcurrentMap<ConnectionId, Connection> connections = private ConcurrentMap<ConnectionId, Connection> connections =
@ -335,6 +339,7 @@ public class Client implements AutoCloseable {
IOException error; // exception, null if success IOException error; // exception, null if success
final RPC.RpcKind rpcKind; // Rpc EngineKind final RPC.RpcKind rpcKind; // Rpc EngineKind
boolean done; // true when call is done boolean done; // true when call is done
private final Object externalHandler;
private Call(RPC.RpcKind rpcKind, Writable param) { private Call(RPC.RpcKind rpcKind, Writable param) {
this.rpcKind = rpcKind; this.rpcKind = rpcKind;
@ -354,6 +359,8 @@ public class Client implements AutoCloseable {
} else { } else {
this.retry = rc; this.retry = rc;
} }
this.externalHandler = EXTERNAL_CALL_HANDLER.get();
} }
@Override @Override
@ -366,6 +373,12 @@ public class Client implements AutoCloseable {
protected synchronized void callComplete() { protected synchronized void callComplete() {
this.done = true; this.done = true;
notify(); // notify caller notify(); // notify caller
if (externalHandler != null) {
synchronized (externalHandler) {
externalHandler.notify();
}
}
} }
/** Set the exception when there is an error. /** Set the exception when there is an error.

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.io.retry;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.RetriableException;
import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.Timeout; import org.junit.rules.Timeout;
@ -35,7 +36,18 @@ import static org.junit.Assert.assertThat;
*/ */
public class TestDefaultRetryPolicy { public class TestDefaultRetryPolicy {
@Rule @Rule
public Timeout timeout = new Timeout(300000); public Timeout timeout = new Timeout(30000);
/** Verify FAIL < RETRY < FAILOVER_AND_RETRY. */
@Test
public void testRetryDecisionOrdering() throws Exception {
Assert.assertTrue(RetryPolicy.RetryAction.RetryDecision.FAIL.compareTo(
RetryPolicy.RetryAction.RetryDecision.RETRY) < 0);
Assert.assertTrue(RetryPolicy.RetryAction.RetryDecision.RETRY.compareTo(
RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY) < 0);
Assert.assertTrue(RetryPolicy.RetryAction.RetryDecision.FAIL.compareTo(
RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY) < 0);
}
/** /**
* Verify that the default retry policy correctly retries * Verify that the default retry policy correctly retries

View File

@ -370,7 +370,7 @@ public class TestAsyncIPC {
Call createCall(RpcKind rpcKind, Writable rpcRequest) { Call createCall(RpcKind rpcKind, Writable rpcRequest) {
// Set different call id and retry count for the next call // Set different call id and retry count for the next call
Client.setCallIdAndRetryCount(Client.nextCallId(), Client.setCallIdAndRetryCount(Client.nextCallId(),
TestIPC.RANDOM.nextInt(255)); TestIPC.RANDOM.nextInt(255), null);
final Call call = super.createCall(rpcKind, rpcRequest); final Call call = super.createCall(rpcKind, rpcRequest);
@ -424,7 +424,7 @@ public class TestAsyncIPC {
final int retryCount = 255; final int retryCount = 255;
// Override client to store the call id // Override client to store the call id
final Client client = new Client(LongWritable.class, conf); final Client client = new Client(LongWritable.class, conf);
Client.setCallIdAndRetryCount(Client.nextCallId(), retryCount); Client.setCallIdAndRetryCount(Client.nextCallId(), retryCount, null);
// Attach a listener that tracks every call ID received by the server. // Attach a listener that tracks every call ID received by the server.
final TestServer server = new TestIPC.TestServer(1, false, conf); final TestServer server = new TestIPC.TestServer(1, false, conf);

View File

@ -1172,7 +1172,7 @@ public class TestIPC {
retryProxy.dummyRun(); retryProxy.dummyRun();
Assert.assertEquals(TestInvocationHandler.retry, totalRetry + 1); Assert.assertEquals(TestInvocationHandler.retry, totalRetry + 1);
} finally { } finally {
Client.setCallIdAndRetryCount(0, 0); Client.setCallIdAndRetryCount(0, 0, null);
client.stop(); client.stop();
server.stop(); server.stop();
} }
@ -1205,7 +1205,7 @@ public class TestIPC {
} finally { } finally {
// Check if dummyRun called only once // Check if dummyRun called only once
Assert.assertEquals(handler.invocations, 1); Assert.assertEquals(handler.invocations, 1);
Client.setCallIdAndRetryCount(0, 0); Client.setCallIdAndRetryCount(0, 0, null);
client.stop(); client.stop();
server.stop(); server.stop();
} }
@ -1250,7 +1250,7 @@ public class TestIPC {
final int retryCount = 255; final int retryCount = 255;
// Override client to store the call id // Override client to store the call id
final Client client = new Client(LongWritable.class, conf); final Client client = new Client(LongWritable.class, conf);
Client.setCallIdAndRetryCount(Client.nextCallId(), 255); Client.setCallIdAndRetryCount(Client.nextCallId(), 255, null);
// Attach a listener that tracks every call ID received by the server. // Attach a listener that tracks every call ID received by the server.
final TestServer server = new TestServer(1, false); final TestServer server = new TestServer(1, false);