diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java index b94e94d67bf..08edfe2c57e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.URI; @@ -29,6 +30,7 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ExecutionException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RemoteException; @@ -87,9 +89,19 @@ public class RequestHedgingProxyProvider extends targetProxies.remove(toIgnore); if (targetProxies.size() == 1) { ProxyInfo proxyInfo = targetProxies.values().iterator().next(); - Object retVal = method.invoke(proxyInfo.proxy, args); - successfulProxy = proxyInfo; - return retVal; + try { + currentUsedProxy = proxyInfo; + Object retVal = method.invoke(proxyInfo.proxy, args); + LOG.debug("Invocation successful on [{}]", + currentUsedProxy.proxyInfo); + return retVal; + } catch (InvocationTargetException ex) { + Exception unwrappedException = unwrapInvocationTargetException(ex); + logProxyException(unwrappedException, currentUsedProxy.proxyInfo); + LOG.trace("Unsuccessful invocation on [{}]", + currentUsedProxy.proxyInfo); + throw unwrappedException; + } } executor = Executors.newFixedThreadPool(proxies.size()); completionService = new ExecutorCompletionService<>(executor); @@ -112,15 +124,16 @@ public class RequestHedgingProxyProvider extends Future callResultFuture = completionService.take(); Object retVal; try { + currentUsedProxy = proxyMap.get(callResultFuture); retVal = callResultFuture.get(); - successfulProxy = proxyMap.get(callResultFuture); LOG.debug("Invocation successful on [{}]", - successfulProxy.proxyInfo); + currentUsedProxy.proxyInfo); return retVal; - } catch (Exception ex) { + } catch (ExecutionException ex) { + Exception unwrappedException = unwrapExecutionException(ex); ProxyInfo tProxyInfo = proxyMap.get(callResultFuture); - logProxyException(ex, tProxyInfo.proxyInfo); - badResults.put(tProxyInfo.proxyInfo, unwrapException(ex)); + logProxyException(unwrappedException, tProxyInfo.proxyInfo); + badResults.put(tProxyInfo.proxyInfo, unwrappedException); LOG.trace("Unsuccessful invocation on [{}]", tProxyInfo.proxyInfo); numAttempts--; } @@ -143,7 +156,7 @@ public class RequestHedgingProxyProvider extends } - private volatile ProxyInfo successfulProxy = null; + private volatile ProxyInfo currentUsedProxy = null; private volatile String toIgnore = null; public RequestHedgingProxyProvider(Configuration conf, URI uri, @@ -154,8 +167,8 @@ public class RequestHedgingProxyProvider extends @SuppressWarnings("unchecked") @Override public synchronized ProxyInfo getProxy() { - if (successfulProxy != null) { - return successfulProxy; + if (currentUsedProxy != null) { + return currentUsedProxy; } Map> targetProxyInfos = new HashMap<>(); StringBuilder combinedInfo = new StringBuilder("["); @@ -175,8 +188,8 @@ public class RequestHedgingProxyProvider extends @Override public synchronized void performFailover(T currentProxy) { - toIgnore = successfulProxy.proxyInfo; - successfulProxy = null; + toIgnore = this.currentUsedProxy.proxyInfo; + this.currentUsedProxy = null; } /** @@ -187,19 +200,18 @@ public class RequestHedgingProxyProvider extends */ private void logProxyException(Exception ex, String proxyInfo) { if (isStandbyException(ex)) { - LOG.debug("Invocation returned standby exception on [{}]", proxyInfo); + LOG.debug("Invocation returned standby exception on [{}]", proxyInfo, ex); } else { - LOG.warn("Invocation returned exception on [{}]", proxyInfo); + LOG.warn("Invocation returned exception on [{}]", proxyInfo, ex); } } /** * Check if the returned exception is caused by an standby namenode. - * @param ex Exception to check. + * @param exception Exception to check. * @return If the exception is caused by an standby namenode. */ - private boolean isStandbyException(Exception ex) { - Exception exception = unwrapException(ex); + private boolean isStandbyException(Exception exception) { if (exception instanceof RemoteException) { return ((RemoteException) exception).unwrapRemoteException() instanceof StandbyException; @@ -208,24 +220,43 @@ public class RequestHedgingProxyProvider extends } /** - * Unwraps the exception.

+ * Unwraps the ExecutionException.

* Example: *

    * if ex is
-   * ExecutionException(InvocationTargetExeption(SomeException))
+   * ExecutionException(InvocationTargetException(SomeException))
    * returns SomeException
    * 
* * @return unwrapped exception */ - private Exception unwrapException(Exception ex) { + private Exception unwrapExecutionException(ExecutionException ex) { + if (ex != null) { + Throwable cause = ex.getCause(); + if (cause instanceof InvocationTargetException) { + return + unwrapInvocationTargetException((InvocationTargetException)cause); + } + } + return ex; + + } + + /** + * Unwraps the InvocationTargetException.

+ * Example: + *

+   * if ex is InvocationTargetException(SomeException)
+   * returns SomeException
+   * 
+ * + * @return unwrapped exception + */ + private Exception unwrapInvocationTargetException( + InvocationTargetException ex) { if (ex != null) { Throwable cause = ex.getCause(); if (cause instanceof Exception) { - Throwable innerCause = cause.getCause(); - if (innerCause instanceof Exception) { - return (Exception) innerCause; - } return (Exception) cause; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java index 724b5f01c4a..8feba964ec9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java @@ -231,6 +231,64 @@ public class TestRequestHedgingProxyProvider { Assert.assertEquals(1, stats[0]); } + @Test + public void testFileNotFoundExceptionWithSingleProxy() throws Exception { + ClientProtocol active = Mockito.mock(ClientProtocol.class); + Mockito + .when(active.getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong())) + .thenThrow(new RemoteException("java.io.FileNotFoundException", + "File does not exist!")); + + ClientProtocol standby = Mockito.mock(ClientProtocol.class); + Mockito + .when(standby.getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong())) + .thenThrow( + new RemoteException("org.apache.hadoop.ipc.StandbyException", + "Standby NameNode")); + + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, + ClientProtocol.class, createFactory(standby, active)); + try { + provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L); + Assert.fail("Should fail since the active namenode throws" + + " FileNotFoundException!"); + } catch (MultiException me) { + for (Exception ex : me.getExceptions().values()) { + Exception rEx = ((RemoteException) ex).unwrapRemoteException(); + if (rEx instanceof StandbyException) { + continue; + } + Assert.assertTrue(rEx instanceof FileNotFoundException); + } + } + //Perform failover now, there will only be one active proxy now + provider.performFailover(active); + try { + provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L); + Assert.fail("Should fail since the active namenode throws" + + " FileNotFoundException!"); + } catch (RemoteException ex) { + Exception rEx = ex.unwrapRemoteException(); + if (rEx instanceof StandbyException) { + Mockito.verify(active).getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong()); + Mockito.verify(standby, Mockito.times(2)) + .getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong()); + } else { + Assert.assertTrue(rEx instanceof FileNotFoundException); + Mockito.verify(active, Mockito.times(2)) + .getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong()); + Mockito.verify(standby).getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong()); + } + } + } + @Test public void testPerformFailoverWith3Proxies() throws Exception { conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,