HADOOP-12054. RPC client should not retry for InvalidToken exceptions. (Contributed by Varun Saxena)
This commit is contained in:
parent
2b2465dfac
commit
84ba1a75b6
|
@ -837,6 +837,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HADOOP-12052 IPC client downgrades all exception types to IOE, breaks
|
HADOOP-12052 IPC client downgrades all exception types to IOE, breaks
|
||||||
callers trying to use them. (Brahma Reddy Battula via stevel)
|
callers trying to use them. (Brahma Reddy Battula via stevel)
|
||||||
|
|
||||||
|
HADOOP-12054. RPC client should not retry for InvalidToken exceptions.
|
||||||
|
(Varun Saxena via Arpit Agarwal)
|
||||||
|
|
||||||
Release 2.7.1 - UNRELEASED
|
Release 2.7.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.ipc.RetriableException;
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.net.ConnectTimeoutException;
|
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||||
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -575,6 +576,9 @@ public class RetryPolicies {
|
||||||
// RetriableException or RetriableException wrapped
|
// RetriableException or RetriableException wrapped
|
||||||
return new RetryAction(RetryAction.RetryDecision.RETRY,
|
return new RetryAction(RetryAction.RetryDecision.RETRY,
|
||||||
getFailoverOrRetrySleepTime(retries));
|
getFailoverOrRetrySleepTime(retries));
|
||||||
|
} else if (e instanceof InvalidToken) {
|
||||||
|
return new RetryAction(RetryAction.RetryDecision.FAIL, 0,
|
||||||
|
"Invalid or Cancelled Token");
|
||||||
} else if (e instanceof SocketException
|
} else if (e instanceof SocketException
|
||||||
|| (e instanceof IOException && !(e instanceof RemoteException))) {
|
|| (e instanceof IOException && !(e instanceof RemoteException))) {
|
||||||
if (isIdempotentOrAtMostOnce) {
|
if (isIdempotentOrAtMostOnce) {
|
||||||
|
|
|
@ -62,6 +62,9 @@ import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.IntWritable;
|
import org.apache.hadoop.io.IntWritable;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.LongWritable;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
|
||||||
|
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
||||||
|
import org.apache.hadoop.io.retry.Idempotent;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
import org.apache.hadoop.io.retry.RetryProxy;
|
import org.apache.hadoop.io.retry.RetryProxy;
|
||||||
import org.apache.hadoop.ipc.Client.ConnectionId;
|
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||||
|
@ -71,6 +74,7 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
||||||
import org.apache.hadoop.net.ConnectTimeoutException;
|
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
|
@ -205,17 +209,20 @@ public class TestIPC {
|
||||||
this.total = total;
|
this.total = total;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Object returnValue(Object value) throws Exception {
|
||||||
|
if (retry++ < total) {
|
||||||
|
throw new IOException("Fake IOException");
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object invoke(Object proxy, Method method, Object[] args)
|
public Object invoke(Object proxy, Method method, Object[] args)
|
||||||
throws Throwable {
|
throws Throwable {
|
||||||
LongWritable param = new LongWritable(RANDOM.nextLong());
|
LongWritable param = new LongWritable(RANDOM.nextLong());
|
||||||
LongWritable value = (LongWritable) client.call(param,
|
LongWritable value = (LongWritable) client.call(param,
|
||||||
NetUtils.getConnectAddress(server), null, null, 0, conf);
|
NetUtils.getConnectAddress(server), null, null, 0, conf);
|
||||||
if (retry++ < total) {
|
return returnValue(value);
|
||||||
throw new IOException("Fake IOException");
|
|
||||||
} else {
|
|
||||||
return value;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -227,6 +234,25 @@ public class TestIPC {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class TestInvalidTokenHandler extends TestInvocationHandler {
|
||||||
|
private int invocations = 0;
|
||||||
|
TestInvalidTokenHandler(Client client, Server server) {
|
||||||
|
super(client, server, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Object returnValue(Object value) throws Exception {
|
||||||
|
throw new InvalidToken("Invalid Token");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object invoke(Object proxy, Method method, Object[] args)
|
||||||
|
throws Throwable {
|
||||||
|
invocations++;
|
||||||
|
return super.invoke(proxy, method, args);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
public void testSerial() throws IOException, InterruptedException {
|
public void testSerial() throws IOException, InterruptedException {
|
||||||
internalTestSerial(3, false, 2, 5, 100);
|
internalTestSerial(3, false, 2, 5, 100);
|
||||||
|
@ -1026,7 +1052,8 @@ public class TestIPC {
|
||||||
|
|
||||||
/** A dummy protocol */
|
/** A dummy protocol */
|
||||||
private interface DummyProtocol {
|
private interface DummyProtocol {
|
||||||
public void dummyRun();
|
@Idempotent
|
||||||
|
public void dummyRun() throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1066,6 +1093,39 @@ public class TestIPC {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that there is no retry when invalid token exception is thrown.
|
||||||
|
* Verfies fix for HADOOP-12054
|
||||||
|
*/
|
||||||
|
@Test(expected = InvalidToken.class)
|
||||||
|
public void testNoRetryOnInvalidToken() throws IOException {
|
||||||
|
final Client client = new Client(LongWritable.class, conf);
|
||||||
|
final TestServer server = new TestServer(1, false);
|
||||||
|
TestInvalidTokenHandler handler =
|
||||||
|
new TestInvalidTokenHandler(client, server);
|
||||||
|
DummyProtocol proxy = (DummyProtocol) Proxy.newProxyInstance(
|
||||||
|
DummyProtocol.class.getClassLoader(),
|
||||||
|
new Class[] { DummyProtocol.class }, handler);
|
||||||
|
FailoverProxyProvider<DummyProtocol> provider =
|
||||||
|
new DefaultFailoverProxyProvider<DummyProtocol>(
|
||||||
|
DummyProtocol.class, proxy);
|
||||||
|
DummyProtocol retryProxy =
|
||||||
|
(DummyProtocol) RetryProxy.create(DummyProtocol.class, provider,
|
||||||
|
RetryPolicies.failoverOnNetworkException(
|
||||||
|
RetryPolicies.TRY_ONCE_THEN_FAIL, 100, 100, 10000, 0));
|
||||||
|
|
||||||
|
try {
|
||||||
|
server.start();
|
||||||
|
retryProxy.dummyRun();
|
||||||
|
} finally {
|
||||||
|
// Check if dummyRun called only once
|
||||||
|
Assert.assertEquals(handler.invocations, 1);
|
||||||
|
Client.setCallIdAndRetryCount(0, 0);
|
||||||
|
client.stop();
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test if the rpc server gets the default retry count (0) from client.
|
* Test if the rpc server gets the default retry count (0) from client.
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue