YARN-4496. Improve HA ResourceManager Failover detection on the client.

Contributed by Jian He

(cherry picked from commit 618bfd6ac2)
This commit is contained in:
Xuan 2016-01-22 18:20:38 -08:00
parent 635849fa45
commit 1d19557dda
7 changed files with 323 additions and 19 deletions

View File

@ -46,6 +46,9 @@ Release 2.9.0 - UNRELEASED
YARN-4603. FairScheduler should mention user requested queuename in error
message when failed in queue ACL check. (Tao Jie via kasha)
YARN-4496. Improve HA ResourceManager Failover detection on the client.
(Jian He via xgong)
OPTIMIZATIONS
BUG FIXES

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
import org.junit.Assert;
import org.junit.Test;
public class TestHedgingRequestRMFailoverProxyProvider {
@Test
public void testHedgingRequestProxyProvider() throws Exception {
final MiniYARNCluster cluster =
new MiniYARNCluster("testHedgingRequestProxyProvider", 5, 0, 1, 1);
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2,rm3,rm4,rm5");
conf.set(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
RequestHedgingRMFailoverProxyProvider.class.getName());
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
2000);
HATestUtil.setRpcAddressForRM("rm1", 10000, conf);
HATestUtil.setRpcAddressForRM("rm2", 20000, conf);
HATestUtil.setRpcAddressForRM("rm3", 30000, conf);
HATestUtil.setRpcAddressForRM("rm4", 40000, conf);
HATestUtil.setRpcAddressForRM("rm5", 50000, conf);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
cluster.init(conf);
cluster.start();
final YarnClient client = YarnClient.createYarnClient();
client.init(conf);
client.start();
// Transition rm5 to active;
long start = System.currentTimeMillis();
makeRMActive(cluster, 4);
// client will retry until the rm becomes active.
client.getAllQueues();
long end = System.currentTimeMillis();
System.out.println("Client call succeeded at " + end);
// should return the response fast
Assert.assertTrue(end - start <= 10000);
// transition rm5 to standby
cluster.getResourceManager(4).getRMContext().getRMAdminService()
.transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER));
makeRMActive(cluster, 2);
client.getAllQueues();
cluster.stop();
}
private void makeRMActive(final MiniYARNCluster cluster, final int index) {
Thread t = new Thread() {
@Override public void run() {
try {
System.out.println("Transition rm" + index + " to active");
cluster.getResourceManager(index).getRMContext().getRMAdminService()
.transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER));
} catch (Exception e) {
e.printStackTrace();
}
}
};
t.start();
}
}

View File

@ -45,8 +45,8 @@ public class ConfiguredRMFailoverProxyProvider<T>
private int currentProxyIndex = 0;
Map<String, T> proxies = new HashMap<String, T>();
private RMProxy<T> rmProxy;
private Class<T> protocol;
protected RMProxy<T> rmProxy;
protected Class<T> protocol;
protected YarnConfiguration conf;
protected String[] rmServiceIds;
@ -71,7 +71,7 @@ public class ConfiguredRMFailoverProxyProvider<T>
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
}
private T getProxyInternal() {
protected T getProxyInternal() {
try {
final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
return RMProxy.getProxy(conf, protocol, rmAddress);

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
@ -77,6 +78,7 @@ public class RMProxy<T> {
}
/**
* Currently, used by Client and AM only
* Create a proxy for the specified protocol. For non-HA,
* this is a direct connection to the ResourceManager address. When HA is
* enabled, the proxy handles the failover between the ResourceManagers as
@ -88,12 +90,12 @@ public class RMProxy<T> {
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
? (YarnConfiguration) configuration
: new YarnConfiguration(configuration);
RetryPolicy retryPolicy =
createRetryPolicy(conf);
return createRMProxy(conf, protocol, instance, retryPolicy);
RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf));
return newProxyInstance(conf, protocol, instance, retryPolicy);
}
/**
* Currently, used by NodeManagers only.
* Create a proxy for the specified protocol. For non-HA,
* this is a direct connection to the ResourceManager address. When HA is
* enabled, the proxy handles the failover between the ResourceManagers as
@ -106,12 +108,12 @@ public class RMProxy<T> {
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
? (YarnConfiguration) configuration
: new YarnConfiguration(configuration);
RetryPolicy retryPolicy =
createRetryPolicy(conf, retryTime, retryInterval);
return createRMProxy(conf, protocol, instance, retryPolicy);
RetryPolicy retryPolicy = createRetryPolicy(conf, retryTime, retryInterval,
HAUtil.isHAEnabled(conf));
return newProxyInstance(conf, protocol, instance, retryPolicy);
}
private static <T> T createRMProxy(final YarnConfiguration conf,
private static <T> T newProxyInstance(final YarnConfiguration conf,
final Class<T> protocol, RMProxy instance, RetryPolicy retryPolicy)
throws IOException{
if (HAUtil.isHAEnabled(conf)) {
@ -144,7 +146,7 @@ public class RMProxy<T> {
@Deprecated
public static <T> T createRMProxy(final Configuration conf,
final Class<T> protocol, InetSocketAddress rmAddress) throws IOException {
RetryPolicy retryPolicy = createRetryPolicy(conf);
RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf));
T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
LOG.info("Connecting to ResourceManager at " + rmAddress);
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
@ -194,7 +196,8 @@ public class RMProxy<T> {
*/
@Private
@VisibleForTesting
public static RetryPolicy createRetryPolicy(Configuration conf) {
public static RetryPolicy createRetryPolicy(Configuration conf,
boolean isHAEnabled) {
long rmConnectWaitMS =
conf.getLong(
YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
@ -204,16 +207,17 @@ public class RMProxy<T> {
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
YarnConfiguration
.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
return createRetryPolicy(
conf, rmConnectWaitMS, rmConnectionRetryIntervalMS);
return createRetryPolicy(conf, rmConnectWaitMS, rmConnectionRetryIntervalMS,
isHAEnabled);
}
/**
* Fetch retry policy from Configuration and create the
* retry policy with specified retryTime and retry interval.
*/
private static RetryPolicy createRetryPolicy(Configuration conf,
long retryTime, long retryInterval) {
protected static RetryPolicy createRetryPolicy(Configuration conf,
long retryTime, long retryInterval, boolean isHAEnabled) {
long rmConnectWaitMS = retryTime;
long rmConnectionRetryIntervalMS = retryInterval;
@ -236,7 +240,7 @@ public class RMProxy<T> {
}
// Handle HA case first
if (HAUtil.isHAEnabled(conf)) {
if (isHAEnabled) {
final long failoverSleepBaseMs = conf.getLong(
YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
rmConnectionRetryIntervalMS);
@ -287,6 +291,7 @@ public class RMProxy<T> {
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
exceptionToPolicyMap.put(SocketException.class, retryPolicy);
exceptionToPolicyMap.put(StandbyException.class, retryPolicy);
// YARN-4288: local IOException is also possible.
exceptionToPolicyMap.put(IOException.class, retryPolicy);
// Not retry on remote IO exception.

View File

@ -0,0 +1,194 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.MultiException;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* A FailoverProxyProvider implementation that technically does not "failover"
* per-se. It constructs a wrapper proxy that sends the request to ALL
* underlying proxies simultaneously. Each proxy inside the wrapper proxy will
* retry the corresponding target. It assumes the in an HA setup, there will
* be only one Active, and the active should respond faster than any configured
* standbys. Once it receives a response from any one of the configred proxies,
* outstanding requests to other proxies are immediately cancelled.
*/
public class RequestHedgingRMFailoverProxyProvider<T>
extends ConfiguredRMFailoverProxyProvider<T> {
private static final Log LOG =
LogFactory.getLog(RequestHedgingRMFailoverProxyProvider.class);
private volatile String successfulProxy = null;
private ProxyInfo<T> wrappedProxy = null;
private Map<String, T> nonRetriableProxy = new HashMap<>();
@Override
@SuppressWarnings("unchecked")
public void init(Configuration configuration, RMProxy<T> rmProxy,
Class<T> protocol) {
super.init(configuration, rmProxy, protocol);
Map<String, ProxyInfo<T>> retriableProxies = new HashMap<>();
String originalId = HAUtil.getRMHAId(conf);
for (String rmId : rmServiceIds) {
conf.set(YarnConfiguration.RM_HA_ID, rmId);
nonRetriableProxy.put(rmId, super.getProxyInternal());
T proxy = createRetriableProxy();
ProxyInfo<T> pInfo = new ProxyInfo<T>(proxy, rmId);
retriableProxies.put(rmId, pInfo);
}
conf.set(YarnConfiguration.RM_HA_ID, originalId);
T proxyInstance = (T) Proxy.newProxyInstance(
RMRequestHedgingInvocationHandler.class.getClassLoader(),
new Class<?>[] {protocol},
new RMRequestHedgingInvocationHandler(retriableProxies));
String combinedInfo = Arrays.toString(rmServiceIds);
wrappedProxy = new ProxyInfo<T>(proxyInstance, combinedInfo);
LOG.info("Created wrapped proxy for " + combinedInfo);
}
@SuppressWarnings("unchecked")
protected T createRetriableProxy() {
try {
// Create proxy that can retry exceptions properly.
RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf, false);
InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
} catch (IOException ioe) {
LOG.error("Unable to create proxy to the ResourceManager " + HAUtil
.getRMHAId(conf), ioe);
return null;
}
}
class RMRequestHedgingInvocationHandler implements InvocationHandler {
final private Map<String, ProxyInfo<T>> allProxies;
public RMRequestHedgingInvocationHandler(
Map<String, ProxyInfo<T>> allProxies) {
this.allProxies = new HashMap<>(allProxies);
}
protected Object invokeMethod(Object proxy, Method method, Object[] args)
throws Throwable {
try {
return method.invoke(proxy, args);
} catch (InvocationTargetException ex) {
throw ex.getCause();
}
}
/**
* Creates a Executor and invokes all proxies concurrently.
*/
@Override
public Object invoke(Object proxy, final Method method,
final Object[] args) throws Throwable {
if (successfulProxy != null) {
return invokeMethod(nonRetriableProxy.get(successfulProxy), method, args);
}
ExecutorService executor = null;
CompletionService<Object> completionService;
try {
Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
int numAttempts = 0;
executor = Executors.newFixedThreadPool(allProxies.size());
completionService = new ExecutorCompletionService<>(executor);
for (final ProxyInfo<T> pInfo : allProxies.values()) {
Callable<Object> c = new Callable<Object>() {
@Override public Object call() throws Exception {
return method.invoke(pInfo.proxy, args);
}
};
proxyMap.put(completionService.submit(c), pInfo);
numAttempts++;
}
Map<String, Exception> badResults = new HashMap<>();
while (numAttempts > 0) {
Future<Object> callResultFuture = completionService.take();
String pInfo = proxyMap.get(callResultFuture).proxyInfo;
Object retVal;
try {
retVal = callResultFuture.get();
successfulProxy = pInfo;
LOG.info("Invocation successful on [" + pInfo + "]");
return retVal;
} catch (Exception ex) {
LOG.warn("Invocation returned exception on " + "[" + pInfo + "]");
badResults.put(pInfo, ex);
numAttempts--;
}
}
// At this point we should have All bad results (Exceptions)
// Or should have returned with successful result.
if (badResults.size() == 1) {
throw badResults.values().iterator().next();
} else {
throw new MultiException(badResults);
}
} finally {
if (executor != null) {
executor.shutdownNow();
}
}
}
}
@Override
public ProxyInfo<T> getProxy() {
return wrappedProxy;
}
@Override
public void performFailover(T currentProxy) {
LOG.info("Connection lost, trying to fail over.");
successfulProxy = null;
}
}

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.RMProxy;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@ -443,7 +444,8 @@ public class TestNodeStatusUpdater {
@Override
protected ResourceTracker getRMClient() throws IOException {
RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf);
RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf,
HAUtil.isHAEnabled(conf));
resourceTracker =
(ResourceTracker) RetryProxy.create(ResourceTracker.class,
new MyResourceTracker6(rmStartIntervalMS, rmNeverStart),
@ -476,7 +478,8 @@ public class TestNodeStatusUpdater {
@Override
protected ResourceTracker getRMClient() {
RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf);
RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf,
HAUtil.isHAEnabled(conf));
return (ResourceTracker) RetryProxy.create(ResourceTracker.class,
resourceTracker, retryPolicy);
}

View File

@ -457,6 +457,7 @@ public class MiniYARNCluster extends CompositeService {
protected synchronized void serviceStart() throws Exception {
startResourceManager(index);
Configuration conf = resourceManagers[index].getConfig();
LOG.info("Starting resourcemanager " + index);
LOG.info("MiniYARN ResourceManager address: " +
conf.get(YarnConfiguration.RM_ADDRESS));
LOG.info("MiniYARN ResourceManager web address: " + WebAppUtils