HDFS-12813. RequestHedgingProxyProvider can hide Exception thrown from the Namenode for proxy size of 1. Contributed by Mukul Kumar Singh

This commit is contained in:
Tsz-Wo Nicholas Sze 2017-11-20 17:09:19 -08:00 committed by Yongjun Zhang
parent 97aa781d3d
commit 2acd8e4ecc
2 changed files with 114 additions and 25 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.URI;
@ -29,6 +30,7 @@ import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RemoteException;
@ -87,9 +89,19 @@ public class RequestHedgingProxyProvider<T> extends
targetProxies.remove(toIgnore);
if (targetProxies.size() == 1) {
ProxyInfo<T> proxyInfo = targetProxies.values().iterator().next();
Object retVal = method.invoke(proxyInfo.proxy, args);
successfulProxy = proxyInfo;
return retVal;
try {
currentUsedProxy = proxyInfo;
Object retVal = method.invoke(proxyInfo.proxy, args);
LOG.debug("Invocation successful on [{}]",
currentUsedProxy.proxyInfo);
return retVal;
} catch (InvocationTargetException ex) {
Exception unwrappedException = unwrapInvocationTargetException(ex);
logProxyException(unwrappedException, currentUsedProxy.proxyInfo);
LOG.trace("Unsuccessful invocation on [{}]",
currentUsedProxy.proxyInfo);
throw unwrappedException;
}
}
executor = Executors.newFixedThreadPool(proxies.size());
completionService = new ExecutorCompletionService<>(executor);
@ -112,15 +124,16 @@ public class RequestHedgingProxyProvider<T> extends
Future<Object> callResultFuture = completionService.take();
Object retVal;
try {
currentUsedProxy = proxyMap.get(callResultFuture);
retVal = callResultFuture.get();
successfulProxy = proxyMap.get(callResultFuture);
LOG.debug("Invocation successful on [{}]",
successfulProxy.proxyInfo);
currentUsedProxy.proxyInfo);
return retVal;
} catch (Exception ex) {
} catch (ExecutionException ex) {
Exception unwrappedException = unwrapExecutionException(ex);
ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture);
logProxyException(ex, tProxyInfo.proxyInfo);
badResults.put(tProxyInfo.proxyInfo, unwrapException(ex));
logProxyException(unwrappedException, tProxyInfo.proxyInfo);
badResults.put(tProxyInfo.proxyInfo, unwrappedException);
LOG.trace("Unsuccessful invocation on [{}]", tProxyInfo.proxyInfo);
numAttempts--;
}
@ -143,7 +156,7 @@ public class RequestHedgingProxyProvider<T> extends
}
private volatile ProxyInfo<T> successfulProxy = null;
private volatile ProxyInfo<T> currentUsedProxy = null;
private volatile String toIgnore = null;
public RequestHedgingProxyProvider(Configuration conf, URI uri,
@ -154,8 +167,8 @@ public class RequestHedgingProxyProvider<T> extends
@SuppressWarnings("unchecked")
@Override
public synchronized ProxyInfo<T> getProxy() {
if (successfulProxy != null) {
return successfulProxy;
if (currentUsedProxy != null) {
return currentUsedProxy;
}
Map<String, ProxyInfo<T>> targetProxyInfos = new HashMap<>();
StringBuilder combinedInfo = new StringBuilder("[");
@ -175,8 +188,8 @@ public class RequestHedgingProxyProvider<T> extends
@Override
public synchronized void performFailover(T currentProxy) {
toIgnore = successfulProxy.proxyInfo;
successfulProxy = null;
toIgnore = this.currentUsedProxy.proxyInfo;
this.currentUsedProxy = null;
}
/**
@ -187,19 +200,18 @@ public class RequestHedgingProxyProvider<T> extends
*/
private void logProxyException(Exception ex, String proxyInfo) {
if (isStandbyException(ex)) {
LOG.debug("Invocation returned standby exception on [{}]", proxyInfo);
LOG.debug("Invocation returned standby exception on [{}]", proxyInfo, ex);
} else {
LOG.warn("Invocation returned exception on [{}]", proxyInfo);
LOG.warn("Invocation returned exception on [{}]", proxyInfo, ex);
}
}
/**
* Check if the returned exception is caused by an standby namenode.
* @param ex Exception to check.
* @param exception Exception to check.
* @return If the exception is caused by an standby namenode.
*/
private boolean isStandbyException(Exception ex) {
Exception exception = unwrapException(ex);
private boolean isStandbyException(Exception exception) {
if (exception instanceof RemoteException) {
return ((RemoteException) exception).unwrapRemoteException()
instanceof StandbyException;
@ -208,24 +220,43 @@ public class RequestHedgingProxyProvider<T> extends
}
/**
* Unwraps the exception. <p>
* Unwraps the ExecutionException. <p>
* Example:
* <blockquote><pre>
* if ex is
* ExecutionException(InvocationTargetExeption(SomeException))
* ExecutionException(InvocationTargetException(SomeException))
* returns SomeException
* </pre></blockquote>
*
* @return unwrapped exception
*/
private Exception unwrapException(Exception ex) {
private Exception unwrapExecutionException(ExecutionException ex) {
if (ex != null) {
Throwable cause = ex.getCause();
if (cause instanceof InvocationTargetException) {
return
unwrapInvocationTargetException((InvocationTargetException)cause);
}
}
return ex;
}
/**
* Unwraps the InvocationTargetException. <p>
* Example:
* <blockquote><pre>
* if ex is InvocationTargetException(SomeException)
* returns SomeException
* </pre></blockquote>
*
* @return unwrapped exception
*/
private Exception unwrapInvocationTargetException(
InvocationTargetException ex) {
if (ex != null) {
Throwable cause = ex.getCause();
if (cause instanceof Exception) {
Throwable innerCause = cause.getCause();
if (innerCause instanceof Exception) {
return (Exception) innerCause;
}
return (Exception) cause;
}
}

View File

@ -231,6 +231,64 @@ public class TestRequestHedgingProxyProvider {
Assert.assertEquals(1, stats[0]);
}
@Test
public void testFileNotFoundExceptionWithSingleProxy() throws Exception {
ClientProtocol active = Mockito.mock(ClientProtocol.class);
Mockito
.when(active.getBlockLocations(Matchers.anyString(),
Matchers.anyLong(), Matchers.anyLong()))
.thenThrow(new RemoteException("java.io.FileNotFoundException",
"File does not exist!"));
ClientProtocol standby = Mockito.mock(ClientProtocol.class);
Mockito
.when(standby.getBlockLocations(Matchers.anyString(),
Matchers.anyLong(), Matchers.anyLong()))
.thenThrow(
new RemoteException("org.apache.hadoop.ipc.StandbyException",
"Standby NameNode"));
RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri,
ClientProtocol.class, createFactory(standby, active));
try {
provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L);
Assert.fail("Should fail since the active namenode throws"
+ " FileNotFoundException!");
} catch (MultiException me) {
for (Exception ex : me.getExceptions().values()) {
Exception rEx = ((RemoteException) ex).unwrapRemoteException();
if (rEx instanceof StandbyException) {
continue;
}
Assert.assertTrue(rEx instanceof FileNotFoundException);
}
}
//Perform failover now, there will only be one active proxy now
provider.performFailover(active);
try {
provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L);
Assert.fail("Should fail since the active namenode throws"
+ " FileNotFoundException!");
} catch (RemoteException ex) {
Exception rEx = ex.unwrapRemoteException();
if (rEx instanceof StandbyException) {
Mockito.verify(active).getBlockLocations(Matchers.anyString(),
Matchers.anyLong(), Matchers.anyLong());
Mockito.verify(standby, Mockito.times(2))
.getBlockLocations(Matchers.anyString(),
Matchers.anyLong(), Matchers.anyLong());
} else {
Assert.assertTrue(rEx instanceof FileNotFoundException);
Mockito.verify(active, Mockito.times(2))
.getBlockLocations(Matchers.anyString(),
Matchers.anyLong(), Matchers.anyLong());
Mockito.verify(standby).getBlockLocations(Matchers.anyString(),
Matchers.anyLong(), Matchers.anyLong());
}
}
}
@Test
public void testPerformFailoverWith3Proxies() throws Exception {
conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,