YARN-6061. Add an UncaughtExceptionHandler for critical threads in RM. (Yufei Gu via kasha)

(cherry picked from commit 652679aa8a)
This commit is contained in:
Karthik Kambatla 2017-02-14 13:39:34 -08:00
parent d72f1c5976
commit 37921b3fef
8 changed files with 226 additions and 29 deletions

View File

@ -22,7 +22,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.io.IOException; import java.io.IOException;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
@ -37,14 +40,18 @@ import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil; import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
import org.apache.hadoop.yarn.webapp.YarnWebParams; import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.junit.After; import org.junit.After;
@ -174,7 +181,7 @@ public class TestRMFailover extends ClientBaseWithFixes {
// so it transitions to standby. // so it transitions to standby.
ResourceManager rm = cluster.getResourceManager( ResourceManager rm = cluster.getResourceManager(
cluster.getActiveRMIndex()); cluster.getActiveRMIndex());
rm.handleTransitionToStandBy(); rm.handleTransitionToStandByInNewThread();
int maxWaitingAttempts = 2000; int maxWaitingAttempts = 2000;
while (maxWaitingAttempts-- > 0 ) { while (maxWaitingAttempts-- > 0 ) {
if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) { if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) {
@ -349,4 +356,95 @@ public class TestRMFailover extends ClientBaseWithFixes {
} }
return redirectUrl; return redirectUrl;
} }
/**
* Throw {@link RuntimeException} inside a thread of
* {@link ResourceManager} with HA enabled and check if the
* {@link ResourceManager} is transited to standby state.
*
* @throws InterruptedException if any
*/
@Test
public void testUncaughtExceptionHandlerWithHAEnabled()
throws InterruptedException {
conf.set(YarnConfiguration.RM_CLUSTER_ID, "yarn-test-cluster");
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
cluster.init(conf);
cluster.start();
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
ResourceManager resourceManager = cluster.getResourceManager(
cluster.getActiveRMIndex());
final RMCriticalThreadUncaughtExceptionHandler exHandler =
new RMCriticalThreadUncaughtExceptionHandler(
resourceManager.getRMContext());
// Create a thread and throw a RTE inside it
final RuntimeException rte = new RuntimeException("TestRuntimeException");
final Thread testThread = new Thread(new Runnable() {
@Override
public void run() {
throw rte;
}
});
testThread.setName("TestThread");
testThread.setUncaughtExceptionHandler(exHandler);
testThread.start();
testThread.join();
int maxWaitingAttempts = 2000;
while (maxWaitingAttempts-- > 0) {
if (resourceManager.getRMContext().getHAServiceState()
== HAServiceState.STANDBY) {
break;
}
Thread.sleep(1);
}
assertFalse("RM didn't transition to Standby ", maxWaitingAttempts < 0);
}
/**
* Throw {@link RuntimeException} inside a thread of
* {@link ResourceManager} with HA disabled and check
* {@link RMCriticalThreadUncaughtExceptionHandler} instance.
*
* Used {@link ExitUtil} class to avoid jvm exit through
* {@code System.exit(-1)}.
*
* @throws InterruptedException if any
*/
@Test
public void testUncaughtExceptionHandlerWithoutHA()
throws InterruptedException {
ExitUtil.disableSystemExit();
// Create a MockRM and start it
ResourceManager resourceManager = new MockRM();
((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start();
resourceManager.getRMContext().getStateStore().start();
resourceManager.getRMContext().getContainerTokenSecretManager().
rollMasterKey();
final RMCriticalThreadUncaughtExceptionHandler exHandler =
new RMCriticalThreadUncaughtExceptionHandler(
resourceManager.getRMContext());
final RMCriticalThreadUncaughtExceptionHandler spyRTEHandler =
spy(exHandler);
// Create a thread and throw a RTE inside it
final RuntimeException rte = new RuntimeException("TestRuntimeException");
final Thread testThread = new Thread(new Runnable() {
@Override public void run() {
throw rte;
}
});
testThread.setName("TestThread");
testThread.setUncaughtExceptionHandler(spyRTEHandler);
assertSame(spyRTEHandler, testThread.getUncaughtExceptionHandler());
testThread.start();
testThread.join();
verify(spyRTEHandler).uncaughtException(testThread, rte);
}
} }

View File

@ -150,4 +150,6 @@ public interface RMContext {
RMAppLifetimeMonitor getRMAppLifetimeMonitor(); RMAppLifetimeMonitor getRMAppLifetimeMonitor();
String getHAZookeeperConnectionState(); String getHAZookeeperConnectionState();
ResourceManager getResourceManager();
} }

View File

@ -81,6 +81,7 @@ public class RMContextImpl implements RMContext {
private final Object haServiceStateLock = new Object(); private final Object haServiceStateLock = new Object();
private ResourceManager resourceManager;
/** /**
* Default constructor. To be used in conjunction with setter methods for * Default constructor. To be used in conjunction with setter methods for
* individual fields. * individual fields.
@ -509,4 +510,13 @@ public class RMContextImpl implements RMContext {
return elector.getZookeeperConnectionState(); return elector.getZookeeperConnectionState();
} }
} }
@Override
public ResourceManager getResourceManager() {
return resourceManager;
}
public void setResourceManager(ResourceManager rm) {
this.resourceManager = rm;
}
} }

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.lang.Thread.UncaughtExceptionHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.conf.HAUtil;
/**
* This class either shuts down {@link ResourceManager} or transitions the
* {@link ResourceManager} to standby state if a critical thread throws an
* uncaught exception. It is intended to be installed by calling
* {@code setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler)}
* in the thread entry point or after creation of threads.
*/
@Private
public class RMCriticalThreadUncaughtExceptionHandler
implements UncaughtExceptionHandler {
private static final Log LOG = LogFactory.getLog(
RMCriticalThreadUncaughtExceptionHandler.class);
private RMContext rmContext;
public RMCriticalThreadUncaughtExceptionHandler(RMContext rmContext) {
this.rmContext = rmContext;
}
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.fatal("Critical thread " + t.getName() + " crashed!", e);
if (HAUtil.isHAEnabled(rmContext.getYarnConfiguration())) {
rmContext.getResourceManager().handleTransitionToStandByInNewThread();
} else {
rmContext.getDispatcher().getEventHandler().handle(
new RMFatalEvent(RMFatalEventType.CRITICAL_THREAD_CRASH,
new Exception(e)));
}
}
}

View File

@ -29,5 +29,8 @@ public enum RMFatalEventType {
EMBEDDED_ELECTOR_FAILED, EMBEDDED_ELECTOR_FAILED,
// Source <- Admin Service // Source <- Admin Service
TRANSITION_TO_ACTIVE_FAILED TRANSITION_TO_ACTIVE_FAILED,
// Source <- Critical Thread Crash
CRITICAL_THREAD_CRASH
} }

View File

@ -119,6 +119,7 @@ import java.security.PrivilegedExceptionAction;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* The ResourceManager is the main class that is a set of components. * The ResourceManager is the main class that is a set of components.
@ -216,6 +217,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
this.conf = conf; this.conf = conf;
this.rmContext = new RMContextImpl(); this.rmContext = new RMContextImpl();
rmContext.setResourceManager(this);
this.configurationProvider = this.configurationProvider =
ConfigurationProviderFactory.getConfigurationProvider(conf); ConfigurationProviderFactory.getConfigurationProvider(conf);
@ -516,6 +518,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
private ResourceManager rm; private ResourceManager rm;
private RMActiveServiceContext activeServiceContext; private RMActiveServiceContext activeServiceContext;
private boolean fromActive = false; private boolean fromActive = false;
private StandByTransitionRunnable standByTransitionRunnable;
RMActiveServices(ResourceManager rm) { RMActiveServices(ResourceManager rm) {
super("RMActiveServices"); super("RMActiveServices");
@ -524,6 +527,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override @Override
protected void serviceInit(Configuration configuration) throws Exception { protected void serviceInit(Configuration configuration) throws Exception {
standByTransitionRunnable = new StandByTransitionRunnable();
activeServiceContext = new RMActiveServiceContext(); activeServiceContext = new RMActiveServiceContext();
rmContext.setActiveServiceContext(activeServiceContext); rmContext.setActiveServiceContext(activeServiceContext);
@ -771,7 +776,38 @@ public class ResourceManager extends CompositeService implements Recoverable {
} }
} }
public void handleTransitionToStandBy() { /**
* Transition to standby state in a new thread. The transition operation is
* asynchronous to avoid deadlock caused by cyclic dependency.
*/
public void handleTransitionToStandByInNewThread() {
Thread standByTransitionThread =
new Thread(activeServices.standByTransitionRunnable);
standByTransitionThread.setName("StandByTransitionThread");
standByTransitionThread.start();
}
/**
* The class to transition RM to standby state. The same
* {@link StandByTransitionRunnable} object could be used in multiple threads,
* but runs only once. That's because RM can go back to active state after
* transition to standby state, the same runnable in the old context can't
* transition RM to standby state again. A new runnable is created every time
* RM transitions to active state.
*/
private class StandByTransitionRunnable implements Runnable {
// The atomic variable to make sure multiple threads with the same runnable
// run only once.
private AtomicBoolean hasAlreadyRun = new AtomicBoolean(false);
@Override
public void run() {
// Run this only once, even if multiple threads end up triggering
// this simultaneously.
if (hasAlreadyRun.getAndSet(true)) {
return;
}
if (rmContext.isHAEnabled()) { if (rmContext.isHAEnabled()) {
try { try {
// Transition to standby and reinit active services // Transition to standby and reinit active services
@ -787,6 +823,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
} }
} }
} }
}
@Private @Private
public static final class ApplicationEventDispatcher implements public static final class ApplicationEventDispatcher implements

View File

@ -1132,10 +1132,7 @@ public abstract class RMStateStore extends AbstractService {
if (HAUtil.isHAEnabled(getConfig())) { if (HAUtil.isHAEnabled(getConfig())) {
LOG.warn("State-store fenced ! Transitioning RM to standby"); LOG.warn("State-store fenced ! Transitioning RM to standby");
isFenced = true; isFenced = true;
Thread standByTransitionThread = resourceManager.handleTransitionToStandByInNewThread();
new Thread(new StandByTransitionThread());
standByTransitionThread.setName("StandByTransitionThread Handler");
standByTransitionThread.start();
} else if (YarnConfiguration.shouldRMFailFast(getConfig())) { } else if (YarnConfiguration.shouldRMFailFast(getConfig())) {
LOG.fatal("Fail RM now due to state-store error!"); LOG.fatal("Fail RM now due to state-store error!");
rmDispatcher.getEventHandler().handle( rmDispatcher.getEventHandler().handle(
@ -1200,14 +1197,6 @@ public abstract class RMStateStore extends AbstractService {
this.resourceManager = rm; this.resourceManager = rm;
} }
private class StandByTransitionThread implements Runnable {
@Override
public void run() {
LOG.info("RMStateStore has been fenced");
resourceManager.handleTransitionToStandBy();
}
}
public RMStateStoreState getRMStateStoreState() { public RMStateStoreState getRMStateStoreState() {
this.readLock.lock(); this.readLock.lock();
try { try {

View File

@ -90,7 +90,7 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
@Override @Override
public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
return !Resources.fitsIn(usage, fairShare); return Resources.greaterThan(CALCULATOR, null, usage, fairShare);
} }
@Override @Override