YARN-6061. Add an UncaughtExceptionHandler for critical threads in RM. (Yufei Gu via kasha)

This commit is contained in:
Karthik Kambatla 2017-02-14 13:39:34 -08:00
parent aaf2713235
commit 652679aa8a
8 changed files with 226 additions and 29 deletions

View File

@ -22,7 +22,10 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.net.HttpURLConnection;
@ -37,14 +40,18 @@
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.junit.After;
@ -174,7 +181,7 @@ public void testAutomaticFailover()
// so it transitions to standby.
ResourceManager rm = cluster.getResourceManager(
cluster.getActiveRMIndex());
rm.handleTransitionToStandBy();
rm.handleTransitionToStandByInNewThread();
int maxWaitingAttempts = 2000;
while (maxWaitingAttempts-- > 0 ) {
if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) {
@ -349,4 +356,95 @@ static String getRefreshURL(String url) {
}
return redirectUrl;
}
/**
* Throw {@link RuntimeException} inside a thread of
* {@link ResourceManager} with HA enabled and check if the
* {@link ResourceManager} is transited to standby state.
*
* @throws InterruptedException if any
*/
@Test
public void testUncaughtExceptionHandlerWithHAEnabled()
throws InterruptedException {
conf.set(YarnConfiguration.RM_CLUSTER_ID, "yarn-test-cluster");
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
cluster.init(conf);
cluster.start();
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
ResourceManager resourceManager = cluster.getResourceManager(
cluster.getActiveRMIndex());
final RMCriticalThreadUncaughtExceptionHandler exHandler =
new RMCriticalThreadUncaughtExceptionHandler(
resourceManager.getRMContext());
// Create a thread and throw a RTE inside it
final RuntimeException rte = new RuntimeException("TestRuntimeException");
final Thread testThread = new Thread(new Runnable() {
@Override
public void run() {
throw rte;
}
});
testThread.setName("TestThread");
testThread.setUncaughtExceptionHandler(exHandler);
testThread.start();
testThread.join();
int maxWaitingAttempts = 2000;
while (maxWaitingAttempts-- > 0) {
if (resourceManager.getRMContext().getHAServiceState()
== HAServiceState.STANDBY) {
break;
}
Thread.sleep(1);
}
assertFalse("RM didn't transition to Standby ", maxWaitingAttempts < 0);
}
/**
* Throw {@link RuntimeException} inside a thread of
* {@link ResourceManager} with HA disabled and check
* {@link RMCriticalThreadUncaughtExceptionHandler} instance.
*
* Used {@link ExitUtil} class to avoid jvm exit through
* {@code System.exit(-1)}.
*
* @throws InterruptedException if any
*/
@Test
public void testUncaughtExceptionHandlerWithoutHA()
throws InterruptedException {
ExitUtil.disableSystemExit();
// Create a MockRM and start it
ResourceManager resourceManager = new MockRM();
((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start();
resourceManager.getRMContext().getStateStore().start();
resourceManager.getRMContext().getContainerTokenSecretManager().
rollMasterKey();
final RMCriticalThreadUncaughtExceptionHandler exHandler =
new RMCriticalThreadUncaughtExceptionHandler(
resourceManager.getRMContext());
final RMCriticalThreadUncaughtExceptionHandler spyRTEHandler =
spy(exHandler);
// Create a thread and throw a RTE inside it
final RuntimeException rte = new RuntimeException("TestRuntimeException");
final Thread testThread = new Thread(new Runnable() {
@Override public void run() {
throw rte;
}
});
testThread.setName("TestThread");
testThread.setUncaughtExceptionHandler(spyRTEHandler);
assertSame(spyRTEHandler, testThread.getUncaughtExceptionHandler());
testThread.start();
testThread.join();
verify(spyRTEHandler).uncaughtException(testThread, rte);
}
}

View File

@ -156,4 +156,6 @@ void setRMDelegatedNodeLabelsUpdater(
RMAppLifetimeMonitor getRMAppLifetimeMonitor();
String getHAZookeeperConnectionState();
ResourceManager getResourceManager();
}

View File

@ -82,6 +82,7 @@ public class RMContextImpl implements RMContext {
private final Object haServiceStateLock = new Object();
private ResourceManager resourceManager;
/**
* Default constructor. To be used in conjunction with setter methods for
* individual fields.
@ -522,4 +523,13 @@ public String getHAZookeeperConnectionState() {
return elector.getZookeeperConnectionState();
}
}
@Override
public ResourceManager getResourceManager() {
return resourceManager;
}
public void setResourceManager(ResourceManager rm) {
this.resourceManager = rm;
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.lang.Thread.UncaughtExceptionHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.conf.HAUtil;
/**
* This class either shuts down {@link ResourceManager} or transitions the
* {@link ResourceManager} to standby state if a critical thread throws an
* uncaught exception. It is intended to be installed by calling
* {@code setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler)}
* in the thread entry point or after creation of threads.
*/
@Private
public class RMCriticalThreadUncaughtExceptionHandler
implements UncaughtExceptionHandler {
private static final Log LOG = LogFactory.getLog(
RMCriticalThreadUncaughtExceptionHandler.class);
private RMContext rmContext;
public RMCriticalThreadUncaughtExceptionHandler(RMContext rmContext) {
this.rmContext = rmContext;
}
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.fatal("Critical thread " + t.getName() + " crashed!", e);
if (HAUtil.isHAEnabled(rmContext.getYarnConfiguration())) {
rmContext.getResourceManager().handleTransitionToStandByInNewThread();
} else {
rmContext.getDispatcher().getEventHandler().handle(
new RMFatalEvent(RMFatalEventType.CRITICAL_THREAD_CRASH,
new Exception(e)));
}
}
}

View File

@ -29,5 +29,8 @@ public enum RMFatalEventType {
EMBEDDED_ELECTOR_FAILED,
// Source <- Admin Service
TRANSITION_TO_ACTIVE_FAILED
TRANSITION_TO_ACTIVE_FAILED,
// Source <- Critical Thread Crash
CRITICAL_THREAD_CRASH
}

View File

@ -130,6 +130,7 @@
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* The ResourceManager is the main class that is a set of components.
@ -232,6 +233,7 @@ Dispatcher getRmDispatcher() {
protected void serviceInit(Configuration conf) throws Exception {
this.conf = conf;
this.rmContext = new RMContextImpl();
rmContext.setResourceManager(this);
this.configurationProvider =
ConfigurationProviderFactory.getConfigurationProvider(conf);
@ -564,6 +566,7 @@ public class RMActiveServices extends CompositeService {
private ResourceManager rm;
private RMActiveServiceContext activeServiceContext;
private boolean fromActive = false;
private StandByTransitionRunnable standByTransitionRunnable;
RMActiveServices(ResourceManager rm) {
super("RMActiveServices");
@ -572,6 +575,8 @@ public class RMActiveServices extends CompositeService {
@Override
protected void serviceInit(Configuration configuration) throws Exception {
standByTransitionRunnable = new StandByTransitionRunnable();
activeServiceContext = new RMActiveServiceContext();
rmContext.setActiveServiceContext(activeServiceContext);
@ -819,19 +824,51 @@ public void handle(RMFatalEvent event) {
}
}
public void handleTransitionToStandBy() {
if (rmContext.isHAEnabled()) {
try {
// Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode");
transitionToStandby(true);
EmbeddedElector elector = rmContext.getLeaderElectorService();
if (elector != null) {
elector.rejoinElection();
/**
* Transition to standby state in a new thread. The transition operation is
* asynchronous to avoid deadlock caused by cyclic dependency.
*/
public void handleTransitionToStandByInNewThread() {
Thread standByTransitionThread =
new Thread(activeServices.standByTransitionRunnable);
standByTransitionThread.setName("StandByTransitionThread");
standByTransitionThread.start();
}
/**
* The class to transition RM to standby state. The same
* {@link StandByTransitionRunnable} object could be used in multiple threads,
* but runs only once. That's because RM can go back to active state after
* transition to standby state, the same runnable in the old context can't
* transition RM to standby state again. A new runnable is created every time
* RM transitions to active state.
*/
private class StandByTransitionRunnable implements Runnable {
// The atomic variable to make sure multiple threads with the same runnable
// run only once.
private AtomicBoolean hasAlreadyRun = new AtomicBoolean(false);
@Override
public void run() {
// Run this only once, even if multiple threads end up triggering
// this simultaneously.
if (hasAlreadyRun.getAndSet(true)) {
return;
}
if (rmContext.isHAEnabled()) {
try {
// Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode");
transitionToStandby(true);
EmbeddedElector elector = rmContext.getLeaderElectorService();
if (elector != null) {
elector.rejoinElection();
}
} catch (Exception e) {
LOG.fatal("Failed to transition RM to Standby mode.", e);
ExitUtil.terminate(1, e);
}
} catch (Exception e) {
LOG.fatal("Failed to transition RM to Standby mode.", e);
ExitUtil.terminate(1, e);
}
}
}

View File

@ -1132,10 +1132,7 @@ private boolean notifyStoreOperationFailedInternal(
if (HAUtil.isHAEnabled(getConfig())) {
LOG.warn("State-store fenced ! Transitioning RM to standby");
isFenced = true;
Thread standByTransitionThread =
new Thread(new StandByTransitionThread());
standByTransitionThread.setName("StandByTransitionThread Handler");
standByTransitionThread.start();
resourceManager.handleTransitionToStandByInNewThread();
} else if (YarnConfiguration.shouldRMFailFast(getConfig())) {
LOG.fatal("Fail RM now due to state-store error!");
rmDispatcher.getEventHandler().handle(
@ -1200,14 +1197,6 @@ public void setResourceManager(ResourceManager rm) {
this.resourceManager = rm;
}
private class StandByTransitionThread implements Runnable {
@Override
public void run() {
LOG.info("RMStateStore has been fenced");
resourceManager.handleTransitionToStandBy();
}
}
public RMStateStoreState getRMStateStoreState() {
this.readLock.lock();
try {

View File

@ -90,7 +90,7 @@ public void computeSteadyShares(Collection<? extends FSQueue> queues,
@Override
public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
return !Resources.fitsIn(usage, fairShare);
return Resources.greaterThan(CALCULATOR, null, usage, fairShare);
}
@Override