diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java index 6fd65912388..30b409ee10c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestHedgingRequestRMFailoverProxyProvider.java @@ -18,16 +18,19 @@ package org.apache.hadoop.yarn.client; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil; import org.junit.Assert; import org.junit.Test; - public class TestHedgingRequestRMFailoverProxyProvider { @Test @@ -63,8 +66,9 @@ public void testHedgingRequestProxyProvider() throws Exception { // Transition rm5 to active; long start = System.currentTimeMillis(); makeRMActive(cluster, 4); - // client will retry until the rm becomes active. - client.getAllQueues(); + + validateActiveRM(client); + long end = System.currentTimeMillis(); System.out.println("Client call succeeded at " + end); // should return the response fast @@ -76,10 +80,29 @@ public void testHedgingRequestProxyProvider() throws Exception { HAServiceProtocol.RequestSource.REQUEST_BY_USER)); makeRMActive(cluster, 2); - client.getAllQueues(); + + validateActiveRM(client); + cluster.stop(); } + private void validateActiveRM(YarnClient client) throws IOException { + // first check if exception is thrown correctly; + try { + // client will retry until the rm becomes active. + client.getApplicationReport(null); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e instanceof ApplicationNotFoundException); + } + // now make a valid call. + try { + client.getAllQueues(); + } catch (YarnException e) { + Assert.fail(e.toString()); + } + } + private void makeRMActive(final MiniYARNCluster cluster, final int index) { Thread t = new Thread() { @Override public void run() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java index d076599ed7d..9468f4e90e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java @@ -18,16 +18,6 @@ package org.apache.hadoop.yarn.client; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.retry.MultiException; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.io.retry.RetryProxy; -import org.apache.hadoop.util.concurrent.HadoopExecutors; -import org.apache.hadoop.yarn.conf.HAUtil; -import org.apache.hadoop.yarn.conf.YarnConfiguration; - import java.io.IOException; import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; @@ -39,16 +29,26 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + /** * A FailoverProxyProvider implementation that technically does not "failover" * per-se. It constructs a wrapper proxy that sends the request to ALL * underlying proxies simultaneously. Each proxy inside the wrapper proxy will - * retry the corresponding target. It assumes the in an HA setup, there will - * be only one Active, and the active should respond faster than any configured + * retry the corresponding target. It assumes the in an HA setup, there will be + * only one Active, and the active should respond faster than any configured * standbys. Once it receives a response from any one of the configred proxies, * outstanding requests to other proxies are immediately cancelled. */ @@ -95,11 +95,11 @@ protected T createRetriableProxy() { // Create proxy that can retry exceptions properly. RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf, false); InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol); - T proxy = RMProxy.getProxy(conf, protocol, rmAddress); + T proxy = RMProxy. getProxy(conf, protocol, rmAddress); return (T) RetryProxy.create(protocol, proxy, retryPolicy); } catch (IOException ioe) { - LOG.error("Unable to create proxy to the ResourceManager " + HAUtil - .getRMHAId(conf), ioe); + LOG.error("Unable to create proxy to the ResourceManager " + + HAUtil.getRMHAId(conf), ioe); return null; } } @@ -122,57 +122,61 @@ protected Object invokeMethod(Object proxy, Method method, Object[] args) } } + private Throwable extraRootException(Exception ex) { + Throwable rootCause = ex; + if (ex instanceof ExecutionException) { + Throwable cause = ex.getCause(); + if (cause instanceof InvocationTargetException) { + rootCause = cause.getCause(); + } + } + return rootCause; + } + /** * Creates a Executor and invokes all proxies concurrently. */ @Override - public Object invoke(Object proxy, final Method method, - final Object[] args) throws Throwable { + public Object invoke(Object proxy, final Method method, final Object[] args) + throws Throwable { if (successfulProxy != null) { - return invokeMethod(nonRetriableProxy.get(successfulProxy), method, args); + return invokeMethod(nonRetriableProxy.get(successfulProxy), method, + args); } ExecutorService executor = null; CompletionService completionService; try { Map, ProxyInfo> proxyMap = new HashMap<>(); - int numAttempts = 0; executor = HadoopExecutors.newFixedThreadPool(allProxies.size()); completionService = new ExecutorCompletionService<>(executor); for (final ProxyInfo pInfo : allProxies.values()) { Callable c = new Callable() { - @Override public Object call() throws Exception { + @Override + public Object call() throws Exception { return method.invoke(pInfo.proxy, args); } }; proxyMap.put(completionService.submit(c), pInfo); - numAttempts++; } - Map badResults = new HashMap<>(); - while (numAttempts > 0) { - Future callResultFuture = completionService.take(); - String pInfo = proxyMap.get(callResultFuture).proxyInfo; - Object retVal; - try { - retVal = callResultFuture.get(); - successfulProxy = pInfo; - LOG.info("Invocation successful on [" + pInfo + "]"); - return retVal; - } catch (Exception ex) { - LOG.warn("Invocation returned exception on " + "[" + pInfo + "]"); - badResults.put(pInfo, ex); - numAttempts--; - } + Future callResultFuture = completionService.take(); + String pInfo = proxyMap.get(callResultFuture).proxyInfo; + successfulProxy = pInfo; + Object retVal; + try { + retVal = callResultFuture.get(); + LOG.info("Invocation successful on [" + pInfo + "]"); + return retVal; + } catch (Exception ex) { + // Throw exception from first responding RM so that clients can handle + // appropriately + Throwable rootCause = extraRootException(ex); + LOG.warn("Invocation returned exception: " + rootCause.toString() + + " on " + "[" + pInfo + "], so propagating back to caller."); + throw rootCause; } - // At this point we should have All bad results (Exceptions) - // Or should have returned with successful result. - if (badResults.size() == 1) { - throw badResults.values().iterator().next(); - } else { - throw new MultiException(badResults); - } } finally { if (executor != null) { executor.shutdownNow();