YARN-4496. Improve HA ResourceManager Failover detection on the client.

Contributed by Jian He
This commit is contained in:
Xuan 2016-01-22 18:20:38 -08:00
parent 46e5ea81e0
commit 618bfd6ac2
7 changed files with 323 additions and 19 deletions

View File

@ -104,6 +104,9 @@ Release 2.9.0 - UNRELEASED
YARN-4603. FairScheduler should mention user requested queuename in error YARN-4603. FairScheduler should mention user requested queuename in error
message when failed in queue ACL check. (Tao Jie via kasha) message when failed in queue ACL check. (Tao Jie via kasha)
YARN-4496. Improve HA ResourceManager Failover detection on the client.
(Jian He via xgong)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
import org.junit.Assert;
import org.junit.Test;
public class TestHedgingRequestRMFailoverProxyProvider {
@Test
public void testHedgingRequestProxyProvider() throws Exception {
final MiniYARNCluster cluster =
new MiniYARNCluster("testHedgingRequestProxyProvider", 5, 0, 1, 1);
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2,rm3,rm4,rm5");
conf.set(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
RequestHedgingRMFailoverProxyProvider.class.getName());
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
2000);
HATestUtil.setRpcAddressForRM("rm1", 10000, conf);
HATestUtil.setRpcAddressForRM("rm2", 20000, conf);
HATestUtil.setRpcAddressForRM("rm3", 30000, conf);
HATestUtil.setRpcAddressForRM("rm4", 40000, conf);
HATestUtil.setRpcAddressForRM("rm5", 50000, conf);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
cluster.init(conf);
cluster.start();
final YarnClient client = YarnClient.createYarnClient();
client.init(conf);
client.start();
// Transition rm5 to active;
long start = System.currentTimeMillis();
makeRMActive(cluster, 4);
// client will retry until the rm becomes active.
client.getAllQueues();
long end = System.currentTimeMillis();
System.out.println("Client call succeeded at " + end);
// should return the response fast
Assert.assertTrue(end - start <= 10000);
// transition rm5 to standby
cluster.getResourceManager(4).getRMContext().getRMAdminService()
.transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER));
makeRMActive(cluster, 2);
client.getAllQueues();
cluster.stop();
}
private void makeRMActive(final MiniYARNCluster cluster, final int index) {
Thread t = new Thread() {
@Override public void run() {
try {
System.out.println("Transition rm" + index + " to active");
cluster.getResourceManager(index).getRMContext().getRMAdminService()
.transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER));
} catch (Exception e) {
e.printStackTrace();
}
}
};
t.start();
}
}

View File

@ -45,8 +45,8 @@ public class ConfiguredRMFailoverProxyProvider<T>
private int currentProxyIndex = 0; private int currentProxyIndex = 0;
Map<String, T> proxies = new HashMap<String, T>(); Map<String, T> proxies = new HashMap<String, T>();
private RMProxy<T> rmProxy; protected RMProxy<T> rmProxy;
private Class<T> protocol; protected Class<T> protocol;
protected YarnConfiguration conf; protected YarnConfiguration conf;
protected String[] rmServiceIds; protected String[] rmServiceIds;
@ -71,7 +71,7 @@ public void init(Configuration configuration, RMProxy<T> rmProxy,
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS)); YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
} }
private T getProxyInternal() { protected T getProxyInternal() {
try { try {
final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol); final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
return RMProxy.getProxy(conf, protocol, rmAddress); return RMProxy.getProxy(conf, protocol, rmAddress);

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
@ -77,6 +78,7 @@ protected InetSocketAddress getRMAddress(
} }
/** /**
* Currently, used by Client and AM only
* Create a proxy for the specified protocol. For non-HA, * Create a proxy for the specified protocol. For non-HA,
* this is a direct connection to the ResourceManager address. When HA is * this is a direct connection to the ResourceManager address. When HA is
* enabled, the proxy handles the failover between the ResourceManagers as * enabled, the proxy handles the failover between the ResourceManagers as
@ -88,12 +90,12 @@ protected static <T> T createRMProxy(final Configuration configuration,
YarnConfiguration conf = (configuration instanceof YarnConfiguration) YarnConfiguration conf = (configuration instanceof YarnConfiguration)
? (YarnConfiguration) configuration ? (YarnConfiguration) configuration
: new YarnConfiguration(configuration); : new YarnConfiguration(configuration);
RetryPolicy retryPolicy = RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf));
createRetryPolicy(conf); return newProxyInstance(conf, protocol, instance, retryPolicy);
return createRMProxy(conf, protocol, instance, retryPolicy);
} }
/** /**
* Currently, used by NodeManagers only.
* Create a proxy for the specified protocol. For non-HA, * Create a proxy for the specified protocol. For non-HA,
* this is a direct connection to the ResourceManager address. When HA is * this is a direct connection to the ResourceManager address. When HA is
* enabled, the proxy handles the failover between the ResourceManagers as * enabled, the proxy handles the failover between the ResourceManagers as
@ -106,12 +108,12 @@ protected static <T> T createRMProxy(final Configuration configuration,
YarnConfiguration conf = (configuration instanceof YarnConfiguration) YarnConfiguration conf = (configuration instanceof YarnConfiguration)
? (YarnConfiguration) configuration ? (YarnConfiguration) configuration
: new YarnConfiguration(configuration); : new YarnConfiguration(configuration);
RetryPolicy retryPolicy = RetryPolicy retryPolicy = createRetryPolicy(conf, retryTime, retryInterval,
createRetryPolicy(conf, retryTime, retryInterval); HAUtil.isHAEnabled(conf));
return createRMProxy(conf, protocol, instance, retryPolicy); return newProxyInstance(conf, protocol, instance, retryPolicy);
} }
private static <T> T createRMProxy(final YarnConfiguration conf, private static <T> T newProxyInstance(final YarnConfiguration conf,
final Class<T> protocol, RMProxy instance, RetryPolicy retryPolicy) final Class<T> protocol, RMProxy instance, RetryPolicy retryPolicy)
throws IOException{ throws IOException{
if (HAUtil.isHAEnabled(conf)) { if (HAUtil.isHAEnabled(conf)) {
@ -144,7 +146,7 @@ private static <T> T createRMProxy(final YarnConfiguration conf,
@Deprecated @Deprecated
public static <T> T createRMProxy(final Configuration conf, public static <T> T createRMProxy(final Configuration conf,
final Class<T> protocol, InetSocketAddress rmAddress) throws IOException { final Class<T> protocol, InetSocketAddress rmAddress) throws IOException {
RetryPolicy retryPolicy = createRetryPolicy(conf); RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf));
T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress); T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
LOG.info("Connecting to ResourceManager at " + rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress);
return (T) RetryProxy.create(protocol, proxy, retryPolicy); return (T) RetryProxy.create(protocol, proxy, retryPolicy);
@ -194,7 +196,8 @@ private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider(
*/ */
@Private @Private
@VisibleForTesting @VisibleForTesting
public static RetryPolicy createRetryPolicy(Configuration conf) { public static RetryPolicy createRetryPolicy(Configuration conf,
boolean isHAEnabled) {
long rmConnectWaitMS = long rmConnectWaitMS =
conf.getLong( conf.getLong(
YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
@ -204,16 +207,17 @@ public static RetryPolicy createRetryPolicy(Configuration conf) {
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
YarnConfiguration YarnConfiguration
.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS); .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
return createRetryPolicy(
conf, rmConnectWaitMS, rmConnectionRetryIntervalMS); return createRetryPolicy(conf, rmConnectWaitMS, rmConnectionRetryIntervalMS,
isHAEnabled);
} }
/** /**
* Fetch retry policy from Configuration and create the * Fetch retry policy from Configuration and create the
* retry policy with specified retryTime and retry interval. * retry policy with specified retryTime and retry interval.
*/ */
private static RetryPolicy createRetryPolicy(Configuration conf, protected static RetryPolicy createRetryPolicy(Configuration conf,
long retryTime, long retryInterval) { long retryTime, long retryInterval, boolean isHAEnabled) {
long rmConnectWaitMS = retryTime; long rmConnectWaitMS = retryTime;
long rmConnectionRetryIntervalMS = retryInterval; long rmConnectionRetryIntervalMS = retryInterval;
@ -236,7 +240,7 @@ private static RetryPolicy createRetryPolicy(Configuration conf,
} }
// Handle HA case first // Handle HA case first
if (HAUtil.isHAEnabled(conf)) { if (isHAEnabled) {
final long failoverSleepBaseMs = conf.getLong( final long failoverSleepBaseMs = conf.getLong(
YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
rmConnectionRetryIntervalMS); rmConnectionRetryIntervalMS);
@ -287,6 +291,7 @@ private static RetryPolicy createRetryPolicy(Configuration conf,
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy); exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
exceptionToPolicyMap.put(RetriableException.class, retryPolicy); exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
exceptionToPolicyMap.put(SocketException.class, retryPolicy); exceptionToPolicyMap.put(SocketException.class, retryPolicy);
exceptionToPolicyMap.put(StandbyException.class, retryPolicy);
// YARN-4288: local IOException is also possible. // YARN-4288: local IOException is also possible.
exceptionToPolicyMap.put(IOException.class, retryPolicy); exceptionToPolicyMap.put(IOException.class, retryPolicy);
// Not retry on remote IO exception. // Not retry on remote IO exception.

View File

@ -0,0 +1,194 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.MultiException;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* A FailoverProxyProvider implementation that technically does not "failover"
* per-se. It constructs a wrapper proxy that sends the request to ALL
* underlying proxies simultaneously. Each proxy inside the wrapper proxy will
* retry the corresponding target. It assumes the in an HA setup, there will
* be only one Active, and the active should respond faster than any configured
* standbys. Once it receives a response from any one of the configred proxies,
* outstanding requests to other proxies are immediately cancelled.
*/
public class RequestHedgingRMFailoverProxyProvider<T>
extends ConfiguredRMFailoverProxyProvider<T> {
private static final Log LOG =
LogFactory.getLog(RequestHedgingRMFailoverProxyProvider.class);
private volatile String successfulProxy = null;
private ProxyInfo<T> wrappedProxy = null;
private Map<String, T> nonRetriableProxy = new HashMap<>();
@Override
@SuppressWarnings("unchecked")
public void init(Configuration configuration, RMProxy<T> rmProxy,
Class<T> protocol) {
super.init(configuration, rmProxy, protocol);
Map<String, ProxyInfo<T>> retriableProxies = new HashMap<>();
String originalId = HAUtil.getRMHAId(conf);
for (String rmId : rmServiceIds) {
conf.set(YarnConfiguration.RM_HA_ID, rmId);
nonRetriableProxy.put(rmId, super.getProxyInternal());
T proxy = createRetriableProxy();
ProxyInfo<T> pInfo = new ProxyInfo<T>(proxy, rmId);
retriableProxies.put(rmId, pInfo);
}
conf.set(YarnConfiguration.RM_HA_ID, originalId);
T proxyInstance = (T) Proxy.newProxyInstance(
RMRequestHedgingInvocationHandler.class.getClassLoader(),
new Class<?>[] {protocol},
new RMRequestHedgingInvocationHandler(retriableProxies));
String combinedInfo = Arrays.toString(rmServiceIds);
wrappedProxy = new ProxyInfo<T>(proxyInstance, combinedInfo);
LOG.info("Created wrapped proxy for " + combinedInfo);
}
@SuppressWarnings("unchecked")
protected T createRetriableProxy() {
try {
// Create proxy that can retry exceptions properly.
RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf, false);
InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
} catch (IOException ioe) {
LOG.error("Unable to create proxy to the ResourceManager " + HAUtil
.getRMHAId(conf), ioe);
return null;
}
}
class RMRequestHedgingInvocationHandler implements InvocationHandler {
final private Map<String, ProxyInfo<T>> allProxies;
public RMRequestHedgingInvocationHandler(
Map<String, ProxyInfo<T>> allProxies) {
this.allProxies = new HashMap<>(allProxies);
}
protected Object invokeMethod(Object proxy, Method method, Object[] args)
throws Throwable {
try {
return method.invoke(proxy, args);
} catch (InvocationTargetException ex) {
throw ex.getCause();
}
}
/**
* Creates a Executor and invokes all proxies concurrently.
*/
@Override
public Object invoke(Object proxy, final Method method,
final Object[] args) throws Throwable {
if (successfulProxy != null) {
return invokeMethod(nonRetriableProxy.get(successfulProxy), method, args);
}
ExecutorService executor = null;
CompletionService<Object> completionService;
try {
Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
int numAttempts = 0;
executor = Executors.newFixedThreadPool(allProxies.size());
completionService = new ExecutorCompletionService<>(executor);
for (final ProxyInfo<T> pInfo : allProxies.values()) {
Callable<Object> c = new Callable<Object>() {
@Override public Object call() throws Exception {
return method.invoke(pInfo.proxy, args);
}
};
proxyMap.put(completionService.submit(c), pInfo);
numAttempts++;
}
Map<String, Exception> badResults = new HashMap<>();
while (numAttempts > 0) {
Future<Object> callResultFuture = completionService.take();
String pInfo = proxyMap.get(callResultFuture).proxyInfo;
Object retVal;
try {
retVal = callResultFuture.get();
successfulProxy = pInfo;
LOG.info("Invocation successful on [" + pInfo + "]");
return retVal;
} catch (Exception ex) {
LOG.warn("Invocation returned exception on " + "[" + pInfo + "]");
badResults.put(pInfo, ex);
numAttempts--;
}
}
// At this point we should have All bad results (Exceptions)
// Or should have returned with successful result.
if (badResults.size() == 1) {
throw badResults.values().iterator().next();
} else {
throw new MultiException(badResults);
}
} finally {
if (executor != null) {
executor.shutdownNow();
}
}
}
}
@Override
public ProxyInfo<T> getProxy() {
return wrappedProxy;
}
@Override
public void performFailover(T currentProxy) {
LOG.info("Connection lost, trying to fail over.");
successfulProxy = null;
}
}

View File

@ -74,6 +74,7 @@
import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.RMProxy; import org.apache.hadoop.yarn.client.RMProxy;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@ -443,7 +444,8 @@ protected void serviceStart() throws Exception {
@Override @Override
protected ResourceTracker getRMClient() throws IOException { protected ResourceTracker getRMClient() throws IOException {
RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf); RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf,
HAUtil.isHAEnabled(conf));
resourceTracker = resourceTracker =
(ResourceTracker) RetryProxy.create(ResourceTracker.class, (ResourceTracker) RetryProxy.create(ResourceTracker.class,
new MyResourceTracker6(rmStartIntervalMS, rmNeverStart), new MyResourceTracker6(rmStartIntervalMS, rmNeverStart),
@ -476,7 +478,8 @@ public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher,
@Override @Override
protected ResourceTracker getRMClient() { protected ResourceTracker getRMClient() {
RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf); RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf,
HAUtil.isHAEnabled(conf));
return (ResourceTracker) RetryProxy.create(ResourceTracker.class, return (ResourceTracker) RetryProxy.create(ResourceTracker.class,
resourceTracker, retryPolicy); resourceTracker, retryPolicy);
} }

View File

@ -457,6 +457,7 @@ protected synchronized void serviceInit(Configuration conf)
protected synchronized void serviceStart() throws Exception { protected synchronized void serviceStart() throws Exception {
startResourceManager(index); startResourceManager(index);
Configuration conf = resourceManagers[index].getConfig(); Configuration conf = resourceManagers[index].getConfig();
LOG.info("Starting resourcemanager " + index);
LOG.info("MiniYARN ResourceManager address: " + LOG.info("MiniYARN ResourceManager address: " +
conf.get(YarnConfiguration.RM_ADDRESS)); conf.get(YarnConfiguration.RM_ADDRESS));
LOG.info("MiniYARN ResourceManager web address: " + WebAppUtils LOG.info("MiniYARN ResourceManager web address: " + WebAppUtils