HDFS-11395. RequestHedgingProxyProvider#RequestHedgingInvocationHandler hides the Exception thrown from NameNode. Contributed by Nandakumar.
(cherry picked from commit 55796a0946
)
This commit is contained in:
parent
0da850a472
commit
e03d8ff488
|
@ -240,12 +240,15 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
||||||
private final long delay;
|
private final long delay;
|
||||||
private final RetryAction action;
|
private final RetryAction action;
|
||||||
private final long expectedFailoverCount;
|
private final long expectedFailoverCount;
|
||||||
|
private final Exception failException;
|
||||||
|
|
||||||
RetryInfo(long delay, RetryAction action, long expectedFailoverCount) {
|
RetryInfo(long delay, RetryAction action, long expectedFailoverCount,
|
||||||
|
Exception failException) {
|
||||||
this.delay = delay;
|
this.delay = delay;
|
||||||
this.retryTime = Time.monotonicNow() + delay;
|
this.retryTime = Time.monotonicNow() + delay;
|
||||||
this.action = action;
|
this.action = action;
|
||||||
this.expectedFailoverCount = expectedFailoverCount;
|
this.expectedFailoverCount = expectedFailoverCount;
|
||||||
|
this.failException = failException;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean isFailover() {
|
boolean isFailover() {
|
||||||
|
@ -258,11 +261,16 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
||||||
&& action.action == RetryAction.RetryDecision.FAIL;
|
&& action.action == RetryAction.RetryDecision.FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Exception getFailException() {
|
||||||
|
return failException;
|
||||||
|
}
|
||||||
|
|
||||||
static RetryInfo newRetryInfo(RetryPolicy policy, Exception e,
|
static RetryInfo newRetryInfo(RetryPolicy policy, Exception e,
|
||||||
Counters counters, boolean idempotentOrAtMostOnce,
|
Counters counters, boolean idempotentOrAtMostOnce,
|
||||||
long expectedFailoverCount) throws Exception {
|
long expectedFailoverCount) throws Exception {
|
||||||
RetryAction max = null;
|
RetryAction max = null;
|
||||||
long maxRetryDelay = 0;
|
long maxRetryDelay = 0;
|
||||||
|
Exception ex = null;
|
||||||
|
|
||||||
final Iterable<Exception> exceptions = e instanceof MultiException ?
|
final Iterable<Exception> exceptions = e instanceof MultiException ?
|
||||||
((MultiException) e).getExceptions().values()
|
((MultiException) e).getExceptions().values()
|
||||||
|
@ -279,10 +287,13 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
||||||
|
|
||||||
if (max == null || max.action.compareTo(a.action) < 0) {
|
if (max == null || max.action.compareTo(a.action) < 0) {
|
||||||
max = a;
|
max = a;
|
||||||
|
if (a.action == RetryAction.RetryDecision.FAIL) {
|
||||||
|
ex = exception;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return new RetryInfo(maxRetryDelay, max, expectedFailoverCount);
|
return new RetryInfo(maxRetryDelay, max, expectedFailoverCount, ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -359,7 +370,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
||||||
+ ". Not retrying because " + retryInfo.action.reason, e);
|
+ ". Not retrying because " + retryInfo.action.reason, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
throw e;
|
throw retryInfo.getFailException();
|
||||||
}
|
}
|
||||||
|
|
||||||
log(method, retryInfo.isFailover(), counters.failovers, retryInfo.delay, e);
|
log(method, retryInfo.isFailover(), counters.failovers, retryInfo.delay, e);
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.lang.reflect.InvocationHandler;
|
import java.lang.reflect.InvocationHandler;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.lang.reflect.Proxy;
|
import java.lang.reflect.Proxy;
|
||||||
|
@ -122,7 +121,7 @@ public class RequestHedgingProxyProvider<T> extends
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture);
|
ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture);
|
||||||
logProxyException(ex, tProxyInfo.proxyInfo);
|
logProxyException(ex, tProxyInfo.proxyInfo);
|
||||||
badResults.put(tProxyInfo.proxyInfo, ex);
|
badResults.put(tProxyInfo.proxyInfo, unwrapException(ex));
|
||||||
LOG.trace("Unsuccessful invocation on [{}]", tProxyInfo.proxyInfo);
|
LOG.trace("Unsuccessful invocation on [{}]", tProxyInfo.proxyInfo);
|
||||||
numAttempts--;
|
numAttempts--;
|
||||||
}
|
}
|
||||||
|
@ -207,16 +206,36 @@ public class RequestHedgingProxyProvider<T> extends
|
||||||
* @return If the exception is caused by an standby namenode.
|
* @return If the exception is caused by an standby namenode.
|
||||||
*/
|
*/
|
||||||
private boolean isStandbyException(Exception ex) {
|
private boolean isStandbyException(Exception ex) {
|
||||||
Throwable cause = ex.getCause();
|
Exception exception = unwrapException(ex);
|
||||||
if (cause != null) {
|
if (exception instanceof RemoteException) {
|
||||||
Throwable cause2 = cause.getCause();
|
return ((RemoteException) exception).unwrapRemoteException()
|
||||||
if (cause2 instanceof RemoteException) {
|
instanceof StandbyException;
|
||||||
RemoteException remoteException = (RemoteException)cause2;
|
|
||||||
IOException unwrapRemoteException =
|
|
||||||
remoteException.unwrapRemoteException();
|
|
||||||
return unwrapRemoteException instanceof StandbyException;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unwraps the exception. <p>
|
||||||
|
* Example:
|
||||||
|
* <blockquote><pre>
|
||||||
|
* if ex is
|
||||||
|
* ExecutionException(InvocationTargetExeption(SomeException))
|
||||||
|
* returns SomeException
|
||||||
|
* </pre></blockquote>
|
||||||
|
*
|
||||||
|
* @return unwrapped exception
|
||||||
|
*/
|
||||||
|
private Exception unwrapException(Exception ex) {
|
||||||
|
if (ex != null) {
|
||||||
|
Throwable cause = ex.getCause();
|
||||||
|
if (cause instanceof Exception) {
|
||||||
|
Throwable innerCause = cause.getCause();
|
||||||
|
if (innerCause instanceof Exception) {
|
||||||
|
return (Exception) innerCause;
|
||||||
|
}
|
||||||
|
return (Exception) cause;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ex;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||||
|
|
||||||
|
import java.io.EOFException;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.ConnectException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
@ -30,6 +33,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.ProxyFactory;
|
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.ProxyFactory;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.io.retry.MultiException;
|
import org.apache.hadoop.io.retry.MultiException;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
@ -38,6 +43,7 @@ import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Matchers;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
@ -198,7 +204,7 @@ public class TestRequestHedgingProxyProvider {
|
||||||
Assert.assertTrue(stats.length == 1);
|
Assert.assertTrue(stats.length == 1);
|
||||||
Assert.assertEquals(2, stats[0]);
|
Assert.assertEquals(2, stats[0]);
|
||||||
|
|
||||||
// Counter shuodl update only once
|
// Counter should update only once
|
||||||
Assert.assertEquals(5, counter.get());
|
Assert.assertEquals(5, counter.get());
|
||||||
|
|
||||||
stats = provider.getProxy().proxy.getStats();
|
stats = provider.getProxy().proxy.getStats();
|
||||||
|
@ -347,6 +353,106 @@ public class TestRequestHedgingProxyProvider {
|
||||||
Assert.assertEquals(12, counter.get());
|
Assert.assertEquals(12, counter.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHedgingWhenFileNotFoundException() throws Exception {
|
||||||
|
NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class);
|
||||||
|
Mockito
|
||||||
|
.when(active.getBlockLocations(Matchers.anyString(),
|
||||||
|
Matchers.anyLong(), Matchers.anyLong()))
|
||||||
|
.thenThrow(new RemoteException("java.io.FileNotFoundException",
|
||||||
|
"File does not exist!"));
|
||||||
|
|
||||||
|
NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class);
|
||||||
|
Mockito
|
||||||
|
.when(standby.getBlockLocations(Matchers.anyString(),
|
||||||
|
Matchers.anyLong(), Matchers.anyLong()))
|
||||||
|
.thenThrow(
|
||||||
|
new RemoteException("org.apache.hadoop.ipc.StandbyException",
|
||||||
|
"Standby NameNode"));
|
||||||
|
|
||||||
|
RequestHedgingProxyProvider<NamenodeProtocols> provider =
|
||||||
|
new RequestHedgingProxyProvider<>(conf, nnUri,
|
||||||
|
NamenodeProtocols.class, createFactory(active, standby));
|
||||||
|
try {
|
||||||
|
provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L);
|
||||||
|
Assert.fail("Should fail since the active namenode throws"
|
||||||
|
+ " FileNotFoundException!");
|
||||||
|
} catch (MultiException me) {
|
||||||
|
for (Exception ex : me.getExceptions().values()) {
|
||||||
|
Exception rEx = ((RemoteException) ex).unwrapRemoteException();
|
||||||
|
if (rEx instanceof StandbyException) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Assert.assertTrue(rEx instanceof FileNotFoundException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Mockito.verify(active).getBlockLocations(Matchers.anyString(),
|
||||||
|
Matchers.anyLong(), Matchers.anyLong());
|
||||||
|
Mockito.verify(standby).getBlockLocations(Matchers.anyString(),
|
||||||
|
Matchers.anyLong(), Matchers.anyLong());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHedgingWhenConnectException() throws Exception {
|
||||||
|
NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class);
|
||||||
|
Mockito.when(active.getStats()).thenThrow(new ConnectException());
|
||||||
|
|
||||||
|
NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class);
|
||||||
|
Mockito.when(standby.getStats())
|
||||||
|
.thenThrow(
|
||||||
|
new RemoteException("org.apache.hadoop.ipc.StandbyException",
|
||||||
|
"Standby NameNode"));
|
||||||
|
|
||||||
|
RequestHedgingProxyProvider<NamenodeProtocols> provider =
|
||||||
|
new RequestHedgingProxyProvider<>(conf, nnUri,
|
||||||
|
NamenodeProtocols.class, createFactory(active, standby));
|
||||||
|
try {
|
||||||
|
provider.getProxy().proxy.getStats();
|
||||||
|
Assert.fail("Should fail since the active namenode throws"
|
||||||
|
+ " ConnectException!");
|
||||||
|
} catch (MultiException me) {
|
||||||
|
for (Exception ex : me.getExceptions().values()) {
|
||||||
|
if (ex instanceof RemoteException) {
|
||||||
|
Exception rEx = ((RemoteException) ex)
|
||||||
|
.unwrapRemoteException();
|
||||||
|
Assert.assertTrue("Unexpected RemoteException: " + rEx.getMessage(),
|
||||||
|
rEx instanceof StandbyException);
|
||||||
|
} else {
|
||||||
|
Assert.assertTrue(ex instanceof ConnectException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Mockito.verify(active).getStats();
|
||||||
|
Mockito.verify(standby).getStats();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHedgingWhenConnectAndEOFException() throws Exception {
|
||||||
|
NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class);
|
||||||
|
Mockito.when(active.getStats()).thenThrow(new EOFException());
|
||||||
|
|
||||||
|
NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class);
|
||||||
|
Mockito.when(standby.getStats()).thenThrow(new ConnectException());
|
||||||
|
|
||||||
|
RequestHedgingProxyProvider<NamenodeProtocols> provider =
|
||||||
|
new RequestHedgingProxyProvider<>(conf, nnUri,
|
||||||
|
NamenodeProtocols.class, createFactory(active, standby));
|
||||||
|
try {
|
||||||
|
provider.getProxy().proxy.getStats();
|
||||||
|
Assert.fail("Should fail since both active and standby namenodes throw"
|
||||||
|
+ " Exceptions!");
|
||||||
|
} catch (MultiException me) {
|
||||||
|
for (Exception ex : me.getExceptions().values()) {
|
||||||
|
if (!(ex instanceof ConnectException) &&
|
||||||
|
!(ex instanceof EOFException)) {
|
||||||
|
Assert.fail("Unexpected Exception " + ex.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Mockito.verify(active).getStats();
|
||||||
|
Mockito.verify(standby).getStats();
|
||||||
|
}
|
||||||
|
|
||||||
private ProxyFactory<NamenodeProtocols> createFactory(
|
private ProxyFactory<NamenodeProtocols> createFactory(
|
||||||
NamenodeProtocols... protos) {
|
NamenodeProtocols... protos) {
|
||||||
final Iterator<NamenodeProtocols> iterator =
|
final Iterator<NamenodeProtocols> iterator =
|
||||||
|
|
Loading…
Reference in New Issue