From 652679aa8ad6f9e61b8ed8e2b04b3e0332025e94 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Tue, 14 Feb 2017 13:39:34 -0800 Subject: [PATCH] YARN-6061. Add an UncaughtExceptionHandler for critical threads in RM. (Yufei Gu via kasha) --- .../hadoop/yarn/client/TestRMFailover.java | 100 +++++++++++++++++- .../server/resourcemanager/RMContext.java | 2 + .../server/resourcemanager/RMContextImpl.java | 10 ++ ...riticalThreadUncaughtExceptionHandler.java | 58 ++++++++++ .../resourcemanager/RMFatalEventType.java | 5 +- .../resourcemanager/ResourceManager.java | 65 +++++++++--- .../recovery/RMStateStore.java | 13 +-- .../DominantResourceFairnessPolicy.java | 2 +- 8 files changed, 226 insertions(+), 29 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMCriticalThreadUncaughtExceptionHandler.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index b58a7751930..4bf6a781c91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -22,7 +22,10 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.fail; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import java.io.IOException; import java.net.HttpURLConnection; @@ -37,14 +40,18 @@ import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; import org.apache.hadoop.yarn.webapp.YarnWebParams; import org.junit.After; @@ -174,7 +181,7 @@ public void testAutomaticFailover() // so it transitions to standby. ResourceManager rm = cluster.getResourceManager( cluster.getActiveRMIndex()); - rm.handleTransitionToStandBy(); + rm.handleTransitionToStandByInNewThread(); int maxWaitingAttempts = 2000; while (maxWaitingAttempts-- > 0 ) { if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) { @@ -349,4 +356,95 @@ static String getRefreshURL(String url) { } return redirectUrl; } + + /** + * Throw {@link RuntimeException} inside a thread of + * {@link ResourceManager} with HA enabled and check if the + * {@link ResourceManager} is transited to standby state. + * + * @throws InterruptedException if any + */ + @Test + public void testUncaughtExceptionHandlerWithHAEnabled() + throws InterruptedException { + conf.set(YarnConfiguration.RM_CLUSTER_ID, "yarn-test-cluster"); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); + cluster.init(conf); + cluster.start(); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + + ResourceManager resourceManager = cluster.getResourceManager( + cluster.getActiveRMIndex()); + + final RMCriticalThreadUncaughtExceptionHandler exHandler = + new RMCriticalThreadUncaughtExceptionHandler( + resourceManager.getRMContext()); + + // Create a thread and throw a RTE inside it + final RuntimeException rte = new RuntimeException("TestRuntimeException"); + final Thread testThread = new Thread(new Runnable() { + @Override + public void run() { + throw rte; + } + }); + testThread.setName("TestThread"); + testThread.setUncaughtExceptionHandler(exHandler); + testThread.start(); + testThread.join(); + + int maxWaitingAttempts = 2000; + while (maxWaitingAttempts-- > 0) { + if (resourceManager.getRMContext().getHAServiceState() + == HAServiceState.STANDBY) { + break; + } + Thread.sleep(1); + } + assertFalse("RM didn't transition to Standby ", maxWaitingAttempts < 0); + } + + /** + * Throw {@link RuntimeException} inside a thread of + * {@link ResourceManager} with HA disabled and check + * {@link RMCriticalThreadUncaughtExceptionHandler} instance. + * + * Used {@link ExitUtil} class to avoid jvm exit through + * {@code System.exit(-1)}. + * + * @throws InterruptedException if any + */ + @Test + public void testUncaughtExceptionHandlerWithoutHA() + throws InterruptedException { + ExitUtil.disableSystemExit(); + + // Create a MockRM and start it + ResourceManager resourceManager = new MockRM(); + ((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start(); + resourceManager.getRMContext().getStateStore().start(); + resourceManager.getRMContext().getContainerTokenSecretManager(). + rollMasterKey(); + + final RMCriticalThreadUncaughtExceptionHandler exHandler = + new RMCriticalThreadUncaughtExceptionHandler( + resourceManager.getRMContext()); + final RMCriticalThreadUncaughtExceptionHandler spyRTEHandler = + spy(exHandler); + + // Create a thread and throw a RTE inside it + final RuntimeException rte = new RuntimeException("TestRuntimeException"); + final Thread testThread = new Thread(new Runnable() { + @Override public void run() { + throw rte; + } + }); + testThread.setName("TestThread"); + testThread.setUncaughtExceptionHandler(spyRTEHandler); + assertSame(spyRTEHandler, testThread.getUncaughtExceptionHandler()); + testThread.start(); + testThread.join(); + + verify(spyRTEHandler).uncaughtException(testThread, rte); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 26ef5ac754e..ba6b9159dff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -156,4 +156,6 @@ void setRMDelegatedNodeLabelsUpdater( RMAppLifetimeMonitor getRMAppLifetimeMonitor(); String getHAZookeeperConnectionState(); + + ResourceManager getResourceManager(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index a452f952737..fb160c44270 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -82,6 +82,7 @@ public class RMContextImpl implements RMContext { private final Object haServiceStateLock = new Object(); + private ResourceManager resourceManager; /** * Default constructor. To be used in conjunction with setter methods for * individual fields. @@ -522,4 +523,13 @@ public String getHAZookeeperConnectionState() { return elector.getZookeeperConnectionState(); } } + + @Override + public ResourceManager getResourceManager() { + return resourceManager; + } + + public void setResourceManager(ResourceManager rm) { + this.resourceManager = rm; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMCriticalThreadUncaughtExceptionHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMCriticalThreadUncaughtExceptionHandler.java new file mode 100644 index 00000000000..c5c60876a30 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMCriticalThreadUncaughtExceptionHandler.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.lang.Thread.UncaughtExceptionHandler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.conf.HAUtil; + +/** + * This class either shuts down {@link ResourceManager} or transitions the + * {@link ResourceManager} to standby state if a critical thread throws an + * uncaught exception. It is intended to be installed by calling + * {@code setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler)} + * in the thread entry point or after creation of threads. + */ +@Private +public class RMCriticalThreadUncaughtExceptionHandler + implements UncaughtExceptionHandler { + private static final Log LOG = LogFactory.getLog( + RMCriticalThreadUncaughtExceptionHandler.class); + private RMContext rmContext; + + public RMCriticalThreadUncaughtExceptionHandler(RMContext rmContext) { + this.rmContext = rmContext; + } + + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.fatal("Critical thread " + t.getName() + " crashed!", e); + + if (HAUtil.isHAEnabled(rmContext.getYarnConfiguration())) { + rmContext.getResourceManager().handleTransitionToStandByInNewThread(); + } else { + rmContext.getDispatcher().getEventHandler().handle( + new RMFatalEvent(RMFatalEventType.CRITICAL_THREAD_CRASH, + new Exception(e))); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java index 87cc4965e84..b6f6b3c1184 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java @@ -29,5 +29,8 @@ public enum RMFatalEventType { EMBEDDED_ELECTOR_FAILED, // Source <- Admin Service - TRANSITION_TO_ACTIVE_FAILED + TRANSITION_TO_ACTIVE_FAILED, + + // Source <- Critical Thread Crash + CRITICAL_THREAD_CRASH } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 886f4abf7c7..58e4077feb9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -130,6 +130,7 @@ import java.security.SecureRandom; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; /** * The ResourceManager is the main class that is a set of components. @@ -205,7 +206,7 @@ public class ResourceManager extends CompositeService implements Recoverable { private Configuration conf; private UserGroupInformation rmLoginUGI; - + public ResourceManager() { super("ResourceManager"); } @@ -232,7 +233,8 @@ Dispatcher getRmDispatcher() { protected void serviceInit(Configuration conf) throws Exception { this.conf = conf; this.rmContext = new RMContextImpl(); - + rmContext.setResourceManager(this); + this.configurationProvider = ConfigurationProviderFactory.getConfigurationProvider(conf); this.configurationProvider.init(this.conf); @@ -564,6 +566,7 @@ public class RMActiveServices extends CompositeService { private ResourceManager rm; private RMActiveServiceContext activeServiceContext; private boolean fromActive = false; + private StandByTransitionRunnable standByTransitionRunnable; RMActiveServices(ResourceManager rm) { super("RMActiveServices"); @@ -572,6 +575,8 @@ public class RMActiveServices extends CompositeService { @Override protected void serviceInit(Configuration configuration) throws Exception { + standByTransitionRunnable = new StandByTransitionRunnable(); + activeServiceContext = new RMActiveServiceContext(); rmContext.setActiveServiceContext(activeServiceContext); @@ -819,19 +824,51 @@ public void handle(RMFatalEvent event) { } } - public void handleTransitionToStandBy() { - if (rmContext.isHAEnabled()) { - try { - // Transition to standby and reinit active services - LOG.info("Transitioning RM to Standby mode"); - transitionToStandby(true); - EmbeddedElector elector = rmContext.getLeaderElectorService(); - if (elector != null) { - elector.rejoinElection(); + /** + * Transition to standby state in a new thread. The transition operation is + * asynchronous to avoid deadlock caused by cyclic dependency. + */ + public void handleTransitionToStandByInNewThread() { + Thread standByTransitionThread = + new Thread(activeServices.standByTransitionRunnable); + standByTransitionThread.setName("StandByTransitionThread"); + standByTransitionThread.start(); + } + + /** + * The class to transition RM to standby state. The same + * {@link StandByTransitionRunnable} object could be used in multiple threads, + * but runs only once. That's because RM can go back to active state after + * transition to standby state, the same runnable in the old context can't + * transition RM to standby state again. A new runnable is created every time + * RM transitions to active state. + */ + private class StandByTransitionRunnable implements Runnable { + // The atomic variable to make sure multiple threads with the same runnable + // run only once. + private AtomicBoolean hasAlreadyRun = new AtomicBoolean(false); + + @Override + public void run() { + // Run this only once, even if multiple threads end up triggering + // this simultaneously. + if (hasAlreadyRun.getAndSet(true)) { + return; + } + + if (rmContext.isHAEnabled()) { + try { + // Transition to standby and reinit active services + LOG.info("Transitioning RM to Standby mode"); + transitionToStandby(true); + EmbeddedElector elector = rmContext.getLeaderElectorService(); + if (elector != null) { + elector.rejoinElection(); + } + } catch (Exception e) { + LOG.fatal("Failed to transition RM to Standby mode.", e); + ExitUtil.terminate(1, e); } - } catch (Exception e) { - LOG.fatal("Failed to transition RM to Standby mode.", e); - ExitUtil.terminate(1, e); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 6ede3b4a17b..5e3cf22da5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -1132,10 +1132,7 @@ private boolean notifyStoreOperationFailedInternal( if (HAUtil.isHAEnabled(getConfig())) { LOG.warn("State-store fenced ! Transitioning RM to standby"); isFenced = true; - Thread standByTransitionThread = - new Thread(new StandByTransitionThread()); - standByTransitionThread.setName("StandByTransitionThread Handler"); - standByTransitionThread.start(); + resourceManager.handleTransitionToStandByInNewThread(); } else if (YarnConfiguration.shouldRMFailFast(getConfig())) { LOG.fatal("Fail RM now due to state-store error!"); rmDispatcher.getEventHandler().handle( @@ -1200,14 +1197,6 @@ public void setResourceManager(ResourceManager rm) { this.resourceManager = rm; } - private class StandByTransitionThread implements Runnable { - @Override - public void run() { - LOG.info("RMStateStore has been fenced"); - resourceManager.handleTransitionToStandBy(); - } - } - public RMStateStoreState getRMStateStoreState() { this.readLock.lock(); try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index ad41b11f803..7a29735a263 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -90,7 +90,7 @@ public void computeSteadyShares(Collection queues, @Override public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { - return !Resources.fitsIn(usage, fairShare); + return Resources.greaterThan(CALCULATOR, null, usage, fairShare); } @Override