HDFS-11395. RequestHedgingProxyProvider#RequestHedgingInvocationHandler hides the Exception thrown from NameNode. Contributed by Nandakumar.

(cherry picked from commit 55796a0946)
(cherry picked from commit e03d8ff488)
This commit is contained in:
Jing Zhao 2017-03-13 14:14:09 -07:00 committed by Andrew Wang
parent ab673aa6cc
commit e216c15625
3 changed files with 151 additions and 15 deletions

View File

@ -238,12 +238,15 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
private final long delay; private final long delay;
private final RetryAction action; private final RetryAction action;
private final long expectedFailoverCount; private final long expectedFailoverCount;
private final Exception failException;
RetryInfo(long delay, RetryAction action, long expectedFailoverCount) { RetryInfo(long delay, RetryAction action, long expectedFailoverCount,
Exception failException) {
this.delay = delay; this.delay = delay;
this.retryTime = Time.monotonicNow() + delay; this.retryTime = Time.monotonicNow() + delay;
this.action = action; this.action = action;
this.expectedFailoverCount = expectedFailoverCount; this.expectedFailoverCount = expectedFailoverCount;
this.failException = failException;
} }
boolean isFailover() { boolean isFailover() {
@ -256,11 +259,16 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
&& action.action == RetryAction.RetryDecision.FAIL; && action.action == RetryAction.RetryDecision.FAIL;
} }
Exception getFailException() {
return failException;
}
static RetryInfo newRetryInfo(RetryPolicy policy, Exception e, static RetryInfo newRetryInfo(RetryPolicy policy, Exception e,
Counters counters, boolean idempotentOrAtMostOnce, Counters counters, boolean idempotentOrAtMostOnce,
long expectedFailoverCount) throws Exception { long expectedFailoverCount) throws Exception {
RetryAction max = null; RetryAction max = null;
long maxRetryDelay = 0; long maxRetryDelay = 0;
Exception ex = null;
final Iterable<Exception> exceptions = e instanceof MultiException ? final Iterable<Exception> exceptions = e instanceof MultiException ?
((MultiException) e).getExceptions().values() ((MultiException) e).getExceptions().values()
@ -277,10 +285,13 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
if (max == null || max.action.compareTo(a.action) < 0) { if (max == null || max.action.compareTo(a.action) < 0) {
max = a; max = a;
if (a.action == RetryAction.RetryDecision.FAIL) {
ex = exception;
}
} }
} }
return new RetryInfo(maxRetryDelay, max, expectedFailoverCount); return new RetryInfo(maxRetryDelay, max, expectedFailoverCount, ex);
} }
} }
@ -357,7 +368,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
+ ". Not retrying because " + retryInfo.action.reason, e); + ". Not retrying because " + retryInfo.action.reason, e);
} }
} }
throw e; throw retryInfo.getFailException();
} }
log(method, retryInfo.isFailover(), counters.failovers, retryInfo.delay, e); log(method, retryInfo.isFailover(), counters.failovers, retryInfo.delay, e);

View File

@ -17,7 +17,6 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode.ha; package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException;
import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
@ -122,7 +121,7 @@ public class RequestHedgingProxyProvider<T> extends
} catch (Exception ex) { } catch (Exception ex) {
ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture); ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture);
logProxyException(ex, tProxyInfo.proxyInfo); logProxyException(ex, tProxyInfo.proxyInfo);
badResults.put(tProxyInfo.proxyInfo, ex); badResults.put(tProxyInfo.proxyInfo, unwrapException(ex));
LOG.trace("Unsuccessful invocation on [{}]", tProxyInfo.proxyInfo); LOG.trace("Unsuccessful invocation on [{}]", tProxyInfo.proxyInfo);
numAttempts--; numAttempts--;
} }
@ -207,16 +206,36 @@ public class RequestHedgingProxyProvider<T> extends
* @return If the exception is caused by an standby namenode. * @return If the exception is caused by an standby namenode.
*/ */
private boolean isStandbyException(Exception ex) { private boolean isStandbyException(Exception ex) {
Throwable cause = ex.getCause(); Exception exception = unwrapException(ex);
if (cause != null) { if (exception instanceof RemoteException) {
Throwable cause2 = cause.getCause(); return ((RemoteException) exception).unwrapRemoteException()
if (cause2 instanceof RemoteException) { instanceof StandbyException;
RemoteException remoteException = (RemoteException)cause2;
IOException unwrapRemoteException =
remoteException.unwrapRemoteException();
return unwrapRemoteException instanceof StandbyException;
}
} }
return false; return false;
} }
/**
* Unwraps the exception. <p>
* Example:
* <blockquote><pre>
* if ex is
* ExecutionException(InvocationTargetExeption(SomeException))
* returns SomeException
* </pre></blockquote>
*
* @return unwrapped exception
*/
private Exception unwrapException(Exception ex) {
if (ex != null) {
Throwable cause = ex.getCause();
if (cause instanceof Exception) {
Throwable innerCause = cause.getCause();
if (innerCause instanceof Exception) {
return (Exception) innerCause;
}
return (Exception) cause;
}
}
return ex;
}
} }

View File

@ -17,7 +17,10 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode.ha; package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
@ -30,6 +33,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.ProxyFactory; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.ProxyFactory;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.retry.MultiException; import org.apache.hadoop.io.retry.MultiException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -38,6 +43,7 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -198,7 +204,7 @@ public class TestRequestHedgingProxyProvider {
Assert.assertTrue(stats.length == 1); Assert.assertTrue(stats.length == 1);
Assert.assertEquals(2, stats[0]); Assert.assertEquals(2, stats[0]);
// Counter shuodl update only once // Counter should update only once
Assert.assertEquals(5, counter.get()); Assert.assertEquals(5, counter.get());
stats = provider.getProxy().proxy.getStats(); stats = provider.getProxy().proxy.getStats();
@ -347,6 +353,106 @@ public class TestRequestHedgingProxyProvider {
Assert.assertEquals(12, counter.get()); Assert.assertEquals(12, counter.get());
} }
@Test
public void testHedgingWhenFileNotFoundException() throws Exception {
NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class);
Mockito
.when(active.getBlockLocations(Matchers.anyString(),
Matchers.anyLong(), Matchers.anyLong()))
.thenThrow(new RemoteException("java.io.FileNotFoundException",
"File does not exist!"));
NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class);
Mockito
.when(standby.getBlockLocations(Matchers.anyString(),
Matchers.anyLong(), Matchers.anyLong()))
.thenThrow(
new RemoteException("org.apache.hadoop.ipc.StandbyException",
"Standby NameNode"));
RequestHedgingProxyProvider<NamenodeProtocols> provider =
new RequestHedgingProxyProvider<>(conf, nnUri,
NamenodeProtocols.class, createFactory(active, standby));
try {
provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L);
Assert.fail("Should fail since the active namenode throws"
+ " FileNotFoundException!");
} catch (MultiException me) {
for (Exception ex : me.getExceptions().values()) {
Exception rEx = ((RemoteException) ex).unwrapRemoteException();
if (rEx instanceof StandbyException) {
continue;
}
Assert.assertTrue(rEx instanceof FileNotFoundException);
}
}
Mockito.verify(active).getBlockLocations(Matchers.anyString(),
Matchers.anyLong(), Matchers.anyLong());
Mockito.verify(standby).getBlockLocations(Matchers.anyString(),
Matchers.anyLong(), Matchers.anyLong());
}
@Test
public void testHedgingWhenConnectException() throws Exception {
NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class);
Mockito.when(active.getStats()).thenThrow(new ConnectException());
NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class);
Mockito.when(standby.getStats())
.thenThrow(
new RemoteException("org.apache.hadoop.ipc.StandbyException",
"Standby NameNode"));
RequestHedgingProxyProvider<NamenodeProtocols> provider =
new RequestHedgingProxyProvider<>(conf, nnUri,
NamenodeProtocols.class, createFactory(active, standby));
try {
provider.getProxy().proxy.getStats();
Assert.fail("Should fail since the active namenode throws"
+ " ConnectException!");
} catch (MultiException me) {
for (Exception ex : me.getExceptions().values()) {
if (ex instanceof RemoteException) {
Exception rEx = ((RemoteException) ex)
.unwrapRemoteException();
Assert.assertTrue("Unexpected RemoteException: " + rEx.getMessage(),
rEx instanceof StandbyException);
} else {
Assert.assertTrue(ex instanceof ConnectException);
}
}
}
Mockito.verify(active).getStats();
Mockito.verify(standby).getStats();
}
@Test
public void testHedgingWhenConnectAndEOFException() throws Exception {
NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class);
Mockito.when(active.getStats()).thenThrow(new EOFException());
NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class);
Mockito.when(standby.getStats()).thenThrow(new ConnectException());
RequestHedgingProxyProvider<NamenodeProtocols> provider =
new RequestHedgingProxyProvider<>(conf, nnUri,
NamenodeProtocols.class, createFactory(active, standby));
try {
provider.getProxy().proxy.getStats();
Assert.fail("Should fail since both active and standby namenodes throw"
+ " Exceptions!");
} catch (MultiException me) {
for (Exception ex : me.getExceptions().values()) {
if (!(ex instanceof ConnectException) &&
!(ex instanceof EOFException)) {
Assert.fail("Unexpected Exception " + ex.getMessage());
}
}
}
Mockito.verify(active).getStats();
Mockito.verify(standby).getStats();
}
private ProxyFactory<NamenodeProtocols> createFactory( private ProxyFactory<NamenodeProtocols> createFactory(
NamenodeProtocols... protos) { NamenodeProtocols... protos) {
final Iterator<NamenodeProtocols> iterator = final Iterator<NamenodeProtocols> iterator =