HADOOP-9803. Add a generic type parameter to RetryInvocationHandler.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1509070 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9ad19eec6f
commit
134557da47
|
@ -320,6 +320,9 @@ Release 2.1.1-beta - UNRELEASED
|
||||||
HADOOP-9787. ShutdownHelper util to shutdown threads and threadpools.
|
HADOOP-9787. ShutdownHelper util to shutdown threads and threadpools.
|
||||||
(Karthik Kambatla via Sandy Ryza)
|
(Karthik Kambatla via Sandy Ryza)
|
||||||
|
|
||||||
|
HADOOP-9803. Add a generic type parameter to RetryInvocationHandler.
|
||||||
|
(szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -44,9 +44,9 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
* side.
|
* side.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class RetryInvocationHandler implements RpcInvocationHandler {
|
public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
||||||
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
|
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
|
||||||
private final FailoverProxyProvider proxyProvider;
|
private final FailoverProxyProvider<T> proxyProvider;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The number of times the associated proxyProvider has ever been failed over.
|
* The number of times the associated proxyProvider has ever been failed over.
|
||||||
|
@ -56,14 +56,14 @@ public class RetryInvocationHandler implements RpcInvocationHandler {
|
||||||
|
|
||||||
private final RetryPolicy defaultPolicy;
|
private final RetryPolicy defaultPolicy;
|
||||||
private final Map<String,RetryPolicy> methodNameToPolicyMap;
|
private final Map<String,RetryPolicy> methodNameToPolicyMap;
|
||||||
private Object currentProxy;
|
private T currentProxy;
|
||||||
|
|
||||||
protected RetryInvocationHandler(FailoverProxyProvider proxyProvider,
|
protected RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
|
||||||
RetryPolicy retryPolicy) {
|
RetryPolicy retryPolicy) {
|
||||||
this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
|
this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap());
|
||||||
}
|
}
|
||||||
|
|
||||||
RetryInvocationHandler(FailoverProxyProvider proxyProvider,
|
RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider,
|
||||||
RetryPolicy defaultPolicy,
|
RetryPolicy defaultPolicy,
|
||||||
Map<String, RetryPolicy> methodNameToPolicyMap) {
|
Map<String, RetryPolicy> methodNameToPolicyMap) {
|
||||||
this.proxyProvider = proxyProvider;
|
this.proxyProvider = proxyProvider;
|
||||||
|
|
|
@ -58,7 +58,7 @@ public class RetryProxy {
|
||||||
return Proxy.newProxyInstance(
|
return Proxy.newProxyInstance(
|
||||||
proxyProvider.getInterface().getClassLoader(),
|
proxyProvider.getInterface().getClassLoader(),
|
||||||
new Class<?>[] { iface },
|
new Class<?>[] { iface },
|
||||||
new RetryInvocationHandler(proxyProvider, retryPolicy)
|
new RetryInvocationHandler<T>(proxyProvider, retryPolicy)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,7 +99,7 @@ public class RetryProxy {
|
||||||
return Proxy.newProxyInstance(
|
return Proxy.newProxyInstance(
|
||||||
proxyProvider.getInterface().getClassLoader(),
|
proxyProvider.getInterface().getClassLoader(),
|
||||||
new Class<?>[] { iface },
|
new Class<?>[] { iface },
|
||||||
new RetryInvocationHandler(proxyProvider, defaultPolicy,
|
new RetryInvocationHandler<T>(proxyProvider, defaultPolicy,
|
||||||
methodNameToPolicyMap)
|
methodNameToPolicyMap)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.util.ThreadUtil;
|
import org.apache.hadoop.util.ThreadUtil;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public class TestFailoverProxy {
|
public class TestFailoverProxy {
|
||||||
|
|
||||||
public static class FlipFlopProxyProvider<T> implements FailoverProxyProvider<T> {
|
public static class FlipFlopProxyProvider<T> implements FailoverProxyProvider<T> {
|
||||||
|
@ -84,14 +83,28 @@ public class TestFailoverProxy {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static FlipFlopProxyProvider<UnreliableInterface>
|
||||||
|
newFlipFlopProxyProvider() {
|
||||||
|
return new FlipFlopProxyProvider<UnreliableInterface>(
|
||||||
|
UnreliableInterface.class,
|
||||||
|
new UnreliableImplementation("impl1"),
|
||||||
|
new UnreliableImplementation("impl2"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static FlipFlopProxyProvider<UnreliableInterface>
|
||||||
|
newFlipFlopProxyProvider(TypeOfExceptionToFailWith t1,
|
||||||
|
TypeOfExceptionToFailWith t2) {
|
||||||
|
return new FlipFlopProxyProvider<UnreliableInterface>(
|
||||||
|
UnreliableInterface.class,
|
||||||
|
new UnreliableImplementation("impl1", t1),
|
||||||
|
new UnreliableImplementation("impl2", t2));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSuccedsOnceThenFailOver() throws UnreliableException,
|
public void testSuccedsOnceThenFailOver() throws UnreliableException,
|
||||||
IOException, StandbyException {
|
IOException, StandbyException {
|
||||||
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
|
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy.create(
|
||||||
.create(UnreliableInterface.class,
|
UnreliableInterface.class, newFlipFlopProxyProvider(),
|
||||||
new FlipFlopProxyProvider(UnreliableInterface.class,
|
|
||||||
new UnreliableImplementation("impl1"),
|
|
||||||
new UnreliableImplementation("impl2")),
|
|
||||||
new FailOverOnceOnAnyExceptionPolicy());
|
new FailOverOnceOnAnyExceptionPolicy());
|
||||||
|
|
||||||
assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
|
assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
|
||||||
|
@ -107,11 +120,9 @@ public class TestFailoverProxy {
|
||||||
@Test
|
@Test
|
||||||
public void testSucceedsTenTimesThenFailOver() throws UnreliableException,
|
public void testSucceedsTenTimesThenFailOver() throws UnreliableException,
|
||||||
IOException, StandbyException {
|
IOException, StandbyException {
|
||||||
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
|
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy.create(
|
||||||
.create(UnreliableInterface.class,
|
UnreliableInterface.class,
|
||||||
new FlipFlopProxyProvider(UnreliableInterface.class,
|
newFlipFlopProxyProvider(),
|
||||||
new UnreliableImplementation("impl1"),
|
|
||||||
new UnreliableImplementation("impl2")),
|
|
||||||
new FailOverOnceOnAnyExceptionPolicy());
|
new FailOverOnceOnAnyExceptionPolicy());
|
||||||
|
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
|
@ -123,11 +134,9 @@ public class TestFailoverProxy {
|
||||||
@Test
|
@Test
|
||||||
public void testNeverFailOver() throws UnreliableException,
|
public void testNeverFailOver() throws UnreliableException,
|
||||||
IOException, StandbyException {
|
IOException, StandbyException {
|
||||||
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
|
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy.create(
|
||||||
.create(UnreliableInterface.class,
|
UnreliableInterface.class,
|
||||||
new FlipFlopProxyProvider(UnreliableInterface.class,
|
newFlipFlopProxyProvider(),
|
||||||
new UnreliableImplementation("impl1"),
|
|
||||||
new UnreliableImplementation("impl2")),
|
|
||||||
RetryPolicies.TRY_ONCE_THEN_FAIL);
|
RetryPolicies.TRY_ONCE_THEN_FAIL);
|
||||||
|
|
||||||
unreliable.succeedsOnceThenFailsReturningString();
|
unreliable.succeedsOnceThenFailsReturningString();
|
||||||
|
@ -142,11 +151,9 @@ public class TestFailoverProxy {
|
||||||
@Test
|
@Test
|
||||||
public void testFailoverOnStandbyException()
|
public void testFailoverOnStandbyException()
|
||||||
throws UnreliableException, IOException, StandbyException {
|
throws UnreliableException, IOException, StandbyException {
|
||||||
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
|
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy.create(
|
||||||
.create(UnreliableInterface.class,
|
UnreliableInterface.class,
|
||||||
new FlipFlopProxyProvider(UnreliableInterface.class,
|
newFlipFlopProxyProvider(),
|
||||||
new UnreliableImplementation("impl1"),
|
|
||||||
new UnreliableImplementation("impl2")),
|
|
||||||
RetryPolicies.failoverOnNetworkException(1));
|
RetryPolicies.failoverOnNetworkException(1));
|
||||||
|
|
||||||
assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
|
assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
|
||||||
|
@ -160,9 +167,9 @@ public class TestFailoverProxy {
|
||||||
|
|
||||||
unreliable = (UnreliableInterface)RetryProxy
|
unreliable = (UnreliableInterface)RetryProxy
|
||||||
.create(UnreliableInterface.class,
|
.create(UnreliableInterface.class,
|
||||||
new FlipFlopProxyProvider(UnreliableInterface.class,
|
newFlipFlopProxyProvider(
|
||||||
new UnreliableImplementation("impl1", TypeOfExceptionToFailWith.STANDBY_EXCEPTION),
|
TypeOfExceptionToFailWith.STANDBY_EXCEPTION,
|
||||||
new UnreliableImplementation("impl2", TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION)),
|
TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION),
|
||||||
RetryPolicies.failoverOnNetworkException(1));
|
RetryPolicies.failoverOnNetworkException(1));
|
||||||
|
|
||||||
assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
|
assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
|
||||||
|
@ -173,11 +180,11 @@ public class TestFailoverProxy {
|
||||||
@Test
|
@Test
|
||||||
public void testFailoverOnNetworkExceptionIdempotentOperation()
|
public void testFailoverOnNetworkExceptionIdempotentOperation()
|
||||||
throws UnreliableException, IOException, StandbyException {
|
throws UnreliableException, IOException, StandbyException {
|
||||||
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
|
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy.create(
|
||||||
.create(UnreliableInterface.class,
|
UnreliableInterface.class,
|
||||||
new FlipFlopProxyProvider(UnreliableInterface.class,
|
newFlipFlopProxyProvider(
|
||||||
new UnreliableImplementation("impl1", TypeOfExceptionToFailWith.IO_EXCEPTION),
|
TypeOfExceptionToFailWith.IO_EXCEPTION,
|
||||||
new UnreliableImplementation("impl2", TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION)),
|
TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION),
|
||||||
RetryPolicies.failoverOnNetworkException(1));
|
RetryPolicies.failoverOnNetworkException(1));
|
||||||
|
|
||||||
assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
|
assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
|
||||||
|
@ -204,9 +211,9 @@ public class TestFailoverProxy {
|
||||||
public void testExceptionPropagatedForNonIdempotentVoid() throws Exception {
|
public void testExceptionPropagatedForNonIdempotentVoid() throws Exception {
|
||||||
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
|
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
|
||||||
.create(UnreliableInterface.class,
|
.create(UnreliableInterface.class,
|
||||||
new FlipFlopProxyProvider(UnreliableInterface.class,
|
newFlipFlopProxyProvider(
|
||||||
new UnreliableImplementation("impl1", TypeOfExceptionToFailWith.IO_EXCEPTION),
|
TypeOfExceptionToFailWith.IO_EXCEPTION,
|
||||||
new UnreliableImplementation("impl2", TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION)),
|
TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION),
|
||||||
RetryPolicies.failoverOnNetworkException(1));
|
RetryPolicies.failoverOnNetworkException(1));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -268,7 +275,8 @@ public class TestFailoverProxy {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testConcurrentMethodFailures() throws InterruptedException {
|
public void testConcurrentMethodFailures() throws InterruptedException {
|
||||||
FlipFlopProxyProvider proxyProvider = new FlipFlopProxyProvider(
|
FlipFlopProxyProvider<UnreliableInterface> proxyProvider
|
||||||
|
= new FlipFlopProxyProvider<UnreliableInterface>(
|
||||||
UnreliableInterface.class,
|
UnreliableInterface.class,
|
||||||
new SynchronizedUnreliableImplementation("impl1",
|
new SynchronizedUnreliableImplementation("impl1",
|
||||||
TypeOfExceptionToFailWith.STANDBY_EXCEPTION,
|
TypeOfExceptionToFailWith.STANDBY_EXCEPTION,
|
||||||
|
@ -305,7 +313,8 @@ public class TestFailoverProxy {
|
||||||
|
|
||||||
final UnreliableImplementation impl1 = new UnreliableImplementation("impl1",
|
final UnreliableImplementation impl1 = new UnreliableImplementation("impl1",
|
||||||
TypeOfExceptionToFailWith.STANDBY_EXCEPTION);
|
TypeOfExceptionToFailWith.STANDBY_EXCEPTION);
|
||||||
FlipFlopProxyProvider proxyProvider = new FlipFlopProxyProvider(
|
FlipFlopProxyProvider<UnreliableInterface> proxyProvider
|
||||||
|
= new FlipFlopProxyProvider<UnreliableInterface>(
|
||||||
UnreliableInterface.class,
|
UnreliableInterface.class,
|
||||||
impl1,
|
impl1,
|
||||||
new UnreliableImplementation("impl2",
|
new UnreliableImplementation("impl2",
|
||||||
|
@ -333,11 +342,11 @@ public class TestFailoverProxy {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testExpectedIOException() {
|
public void testExpectedIOException() {
|
||||||
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
|
UnreliableInterface unreliable = (UnreliableInterface)RetryProxy.create(
|
||||||
.create(UnreliableInterface.class,
|
UnreliableInterface.class,
|
||||||
new FlipFlopProxyProvider(UnreliableInterface.class,
|
newFlipFlopProxyProvider(
|
||||||
new UnreliableImplementation("impl1", TypeOfExceptionToFailWith.REMOTE_EXCEPTION),
|
TypeOfExceptionToFailWith.REMOTE_EXCEPTION,
|
||||||
new UnreliableImplementation("impl2", TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION)),
|
TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION),
|
||||||
RetryPolicies.failoverOnNetworkException(
|
RetryPolicies.failoverOnNetworkException(
|
||||||
RetryPolicies.TRY_ONCE_THEN_FAIL, 10, 1000, 10000));
|
RetryPolicies.TRY_ONCE_THEN_FAIL, 10, 1000, 10000));
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,8 @@ import org.apache.hadoop.ipc.StandbyException;
|
||||||
public interface UnreliableInterface {
|
public interface UnreliableInterface {
|
||||||
|
|
||||||
public static class UnreliableException extends Exception {
|
public static class UnreliableException extends Exception {
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
private String identifier;
|
private String identifier;
|
||||||
|
|
||||||
public UnreliableException() {
|
public UnreliableException() {
|
||||||
|
@ -43,6 +45,7 @@ public interface UnreliableInterface {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class FatalException extends UnreliableException {
|
public static class FatalException extends UnreliableException {
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
// no body
|
// no body
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -74,7 +74,7 @@ public class TestRetryCacheWithHA {
|
||||||
* a boolean flag to control whether the method invocation succeeds or not.
|
* a boolean flag to control whether the method invocation succeeds or not.
|
||||||
*/
|
*/
|
||||||
private static class DummyRetryInvocationHandler extends
|
private static class DummyRetryInvocationHandler extends
|
||||||
RetryInvocationHandler {
|
RetryInvocationHandler<ClientProtocol> {
|
||||||
static AtomicBoolean block = new AtomicBoolean(false);
|
static AtomicBoolean block = new AtomicBoolean(false);
|
||||||
|
|
||||||
DummyRetryInvocationHandler(
|
DummyRetryInvocationHandler(
|
||||||
|
|
Loading…
Reference in New Issue