YARN-5711. Propogate exceptions back to client when using hedging RM failover provider.

(cherry picked from commit 0a166b1347)
This commit is contained in:
Subru Krishnan 2016-10-24 18:59:51 -07:00
parent c76eb90f0e
commit 81595a127f
2 changed files with 74 additions and 47 deletions

View File

@ -18,16 +18,19 @@
package org.apache.hadoop.yarn.client; package org.apache.hadoop.yarn.client;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil; import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class TestHedgingRequestRMFailoverProxyProvider { public class TestHedgingRequestRMFailoverProxyProvider {
@Test @Test
@ -63,8 +66,9 @@ public class TestHedgingRequestRMFailoverProxyProvider {
// Transition rm5 to active; // Transition rm5 to active;
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
makeRMActive(cluster, 4); makeRMActive(cluster, 4);
// client will retry until the rm becomes active.
client.getAllQueues(); validateActiveRM(client);
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
System.out.println("Client call succeeded at " + end); System.out.println("Client call succeeded at " + end);
// should return the response fast // should return the response fast
@ -76,10 +80,29 @@ public class TestHedgingRequestRMFailoverProxyProvider {
HAServiceProtocol.RequestSource.REQUEST_BY_USER)); HAServiceProtocol.RequestSource.REQUEST_BY_USER));
makeRMActive(cluster, 2); makeRMActive(cluster, 2);
client.getAllQueues();
validateActiveRM(client);
cluster.stop(); cluster.stop();
} }
private void validateActiveRM(YarnClient client) throws IOException {
// first check if exception is thrown correctly;
try {
// client will retry until the rm becomes active.
client.getApplicationReport(null);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e instanceof ApplicationNotFoundException);
}
// now make a valid call.
try {
client.getAllQueues();
} catch (YarnException e) {
Assert.fail(e.toString());
}
}
private void makeRMActive(final MiniYARNCluster cluster, final int index) { private void makeRMActive(final MiniYARNCluster cluster, final int index) {
Thread t = new Thread() { Thread t = new Thread() {
@Override public void run() { @Override public void run() {

View File

@ -18,16 +18,6 @@
package org.apache.hadoop.yarn.client; package org.apache.hadoop.yarn.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.MultiException;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
@ -39,16 +29,26 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/** /**
* A FailoverProxyProvider implementation that technically does not "failover" * A FailoverProxyProvider implementation that technically does not "failover"
* per-se. It constructs a wrapper proxy that sends the request to ALL * per-se. It constructs a wrapper proxy that sends the request to ALL
* underlying proxies simultaneously. Each proxy inside the wrapper proxy will * underlying proxies simultaneously. Each proxy inside the wrapper proxy will
* retry the corresponding target. It assumes the in an HA setup, there will * retry the corresponding target. It assumes the in an HA setup, there will be
* be only one Active, and the active should respond faster than any configured * only one Active, and the active should respond faster than any configured
* standbys. Once it receives a response from any one of the configred proxies, * standbys. Once it receives a response from any one of the configred proxies,
* outstanding requests to other proxies are immediately cancelled. * outstanding requests to other proxies are immediately cancelled.
*/ */
@ -95,11 +95,11 @@ public class RequestHedgingRMFailoverProxyProvider<T>
// Create proxy that can retry exceptions properly. // Create proxy that can retry exceptions properly.
RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf, false); RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf, false);
InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol); InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress); T proxy = RMProxy.<T> getProxy(conf, protocol, rmAddress);
return (T) RetryProxy.create(protocol, proxy, retryPolicy); return (T) RetryProxy.create(protocol, proxy, retryPolicy);
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.error("Unable to create proxy to the ResourceManager " + HAUtil LOG.error("Unable to create proxy to the ResourceManager "
.getRMHAId(conf), ioe); + HAUtil.getRMHAId(conf), ioe);
return null; return null;
} }
} }
@ -122,57 +122,61 @@ public class RequestHedgingRMFailoverProxyProvider<T>
} }
} }
private Throwable extraRootException(Exception ex) {
Throwable rootCause = ex;
if (ex instanceof ExecutionException) {
Throwable cause = ex.getCause();
if (cause instanceof InvocationTargetException) {
rootCause = cause.getCause();
}
}
return rootCause;
}
/** /**
* Creates a Executor and invokes all proxies concurrently. * Creates a Executor and invokes all proxies concurrently.
*/ */
@Override @Override
public Object invoke(Object proxy, final Method method, public Object invoke(Object proxy, final Method method, final Object[] args)
final Object[] args) throws Throwable { throws Throwable {
if (successfulProxy != null) { if (successfulProxy != null) {
return invokeMethod(nonRetriableProxy.get(successfulProxy), method, args); return invokeMethod(nonRetriableProxy.get(successfulProxy), method,
args);
} }
ExecutorService executor = null; ExecutorService executor = null;
CompletionService<Object> completionService; CompletionService<Object> completionService;
try { try {
Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>(); Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
int numAttempts = 0;
executor = HadoopExecutors.newFixedThreadPool(allProxies.size()); executor = HadoopExecutors.newFixedThreadPool(allProxies.size());
completionService = new ExecutorCompletionService<>(executor); completionService = new ExecutorCompletionService<>(executor);
for (final ProxyInfo<T> pInfo : allProxies.values()) { for (final ProxyInfo<T> pInfo : allProxies.values()) {
Callable<Object> c = new Callable<Object>() { Callable<Object> c = new Callable<Object>() {
@Override public Object call() throws Exception { @Override
public Object call() throws Exception {
return method.invoke(pInfo.proxy, args); return method.invoke(pInfo.proxy, args);
} }
}; };
proxyMap.put(completionService.submit(c), pInfo); proxyMap.put(completionService.submit(c), pInfo);
numAttempts++;
} }
Map<String, Exception> badResults = new HashMap<>();
while (numAttempts > 0) {
Future<Object> callResultFuture = completionService.take(); Future<Object> callResultFuture = completionService.take();
String pInfo = proxyMap.get(callResultFuture).proxyInfo; String pInfo = proxyMap.get(callResultFuture).proxyInfo;
successfulProxy = pInfo;
Object retVal; Object retVal;
try { try {
retVal = callResultFuture.get(); retVal = callResultFuture.get();
successfulProxy = pInfo;
LOG.info("Invocation successful on [" + pInfo + "]"); LOG.info("Invocation successful on [" + pInfo + "]");
return retVal; return retVal;
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("Invocation returned exception on " + "[" + pInfo + "]"); // Throw exception from first responding RM so that clients can handle
badResults.put(pInfo, ex); // appropriately
numAttempts--; Throwable rootCause = extraRootException(ex);
} LOG.warn("Invocation returned exception: " + rootCause.toString()
+ " on " + "[" + pInfo + "], so propagating back to caller.");
throw rootCause;
} }
// At this point we should have All bad results (Exceptions)
// Or should have returned with successful result.
if (badResults.size() == 1) {
throw badResults.values().iterator().next();
} else {
throw new MultiException(badResults);
}
} finally { } finally {
if (executor != null) { if (executor != null) {
executor.shutdownNow(); executor.shutdownNow();