HADOOP-9786. Merge change r1508304 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1508307 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
22e0a210b6
commit
b3f765ad3d
|
@ -233,6 +233,9 @@ Release 2.1.0-beta - 2013-07-02
|
||||||
|
|
||||||
HADOOP-9770. Make RetryCache#state non volatile. (suresh)
|
HADOOP-9770. Make RetryCache#state non volatile. (suresh)
|
||||||
|
|
||||||
|
HADOOP-9786. RetryInvocationHandler#isRpcInvocation should support
|
||||||
|
ProtocolTranslator. (suresh and jing9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs
|
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs
|
||||||
|
|
|
@ -31,12 +31,13 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
||||||
import org.apache.hadoop.ipc.Client;
|
import org.apache.hadoop.ipc.Client;
|
||||||
import org.apache.hadoop.ipc.Client.ConnectionId;
|
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||||
|
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.RpcConstants;
|
import org.apache.hadoop.ipc.RpcConstants;
|
||||||
import org.apache.hadoop.ipc.RpcInvocationHandler;
|
import org.apache.hadoop.ipc.RpcInvocationHandler;
|
||||||
import org.apache.hadoop.util.ThreadUtil;
|
import org.apache.hadoop.util.ThreadUtil;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
class RetryInvocationHandler implements RpcInvocationHandler {
|
class RetryInvocationHandler implements RpcInvocationHandler {
|
||||||
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
|
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
|
||||||
|
@ -76,7 +77,7 @@ class RetryInvocationHandler implements RpcInvocationHandler {
|
||||||
|
|
||||||
// The number of times this method invocation has been failed over.
|
// The number of times this method invocation has been failed over.
|
||||||
int invocationFailoverCount = 0;
|
int invocationFailoverCount = 0;
|
||||||
final boolean isRpc = isRpcInvocation();
|
final boolean isRpc = isRpcInvocation(currentProxy);
|
||||||
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
|
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
|
||||||
int retries = 0;
|
int retries = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -179,11 +180,15 @@ class RetryInvocationHandler implements RpcInvocationHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isRpcInvocation() {
|
@VisibleForTesting
|
||||||
if (!Proxy.isProxyClass(currentProxy.getClass())) {
|
static boolean isRpcInvocation(Object proxy) {
|
||||||
|
if (proxy instanceof ProtocolTranslator) {
|
||||||
|
proxy = ((ProtocolTranslator) proxy).getUnderlyingProxyObject();
|
||||||
|
}
|
||||||
|
if (!Proxy.isProxyClass(proxy.getClass())) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
final InvocationHandler ih = Proxy.getInvocationHandler(currentProxy);
|
final InvocationHandler ih = Proxy.getInvocationHandler(proxy);
|
||||||
return ih instanceof RpcInvocationHandler;
|
return ih instanceof RpcInvocationHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,7 @@ import junit.framework.TestCase;
|
||||||
|
|
||||||
import org.apache.hadoop.io.retry.UnreliableInterface.FatalException;
|
import org.apache.hadoop.io.retry.UnreliableInterface.FatalException;
|
||||||
import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
|
import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
|
||||||
|
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
|
||||||
public class TestRetryProxy extends TestCase {
|
public class TestRetryProxy extends TestCase {
|
||||||
|
@ -58,6 +59,38 @@ public class TestRetryProxy extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for {@link RetryInvocationHandler#isRpcInvocation(Object)}
|
||||||
|
*/
|
||||||
|
public void testRpcInvocation() throws Exception {
|
||||||
|
// For a proxy method should return true
|
||||||
|
final UnreliableInterface unreliable = (UnreliableInterface)
|
||||||
|
RetryProxy.create(UnreliableInterface.class, unreliableImpl, RETRY_FOREVER);
|
||||||
|
assertTrue(RetryInvocationHandler.isRpcInvocation(unreliable));
|
||||||
|
|
||||||
|
// Embed the proxy in ProtocolTranslator
|
||||||
|
ProtocolTranslator xlator = new ProtocolTranslator() {
|
||||||
|
int count = 0;
|
||||||
|
@Override
|
||||||
|
public Object getUnderlyingProxyObject() {
|
||||||
|
count++;
|
||||||
|
return unreliable;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "" + count;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// For a proxy wrapped in ProtocolTranslator method should return true
|
||||||
|
assertTrue(RetryInvocationHandler.isRpcInvocation(xlator));
|
||||||
|
// Ensure underlying proxy was looked at
|
||||||
|
assertEquals(xlator.toString(), "1");
|
||||||
|
|
||||||
|
// For non-proxy the method must return false
|
||||||
|
assertFalse(RetryInvocationHandler.isRpcInvocation(new Object()));
|
||||||
|
}
|
||||||
|
|
||||||
public void testRetryForever() throws UnreliableException {
|
public void testRetryForever() throws UnreliableException {
|
||||||
UnreliableInterface unreliable = (UnreliableInterface)
|
UnreliableInterface unreliable = (UnreliableInterface)
|
||||||
RetryProxy.create(UnreliableInterface.class, unreliableImpl, RETRY_FOREVER);
|
RetryProxy.create(UnreliableInterface.class, unreliableImpl, RETRY_FOREVER);
|
||||||
|
@ -138,7 +171,7 @@ public class TestRetryProxy extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRetryByRemoteException() throws UnreliableException {
|
public void testRetryByRemoteException() {
|
||||||
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
|
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
|
||||||
Collections.<Class<? extends Exception>, RetryPolicy>singletonMap(FatalException.class, TRY_ONCE_THEN_FAIL);
|
Collections.<Class<? extends Exception>, RetryPolicy>singletonMap(FatalException.class, TRY_ONCE_THEN_FAIL);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue