diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index eb421dda89c..408032a02f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -459,7 +459,7 @@ public class CapacityScheduler extends * Schedule on all nodes by starting at a random point. * @param cs */ - static void schedule(CapacityScheduler cs) { + static void schedule(CapacityScheduler cs) throws InterruptedException{ // First randomize the start point int current = 0; Collection nodes = cs.nodeTracker.getAllNodes(); @@ -475,9 +475,7 @@ public class CapacityScheduler extends cs.allocateContainersToNode(node.getNodeID(), false); } - try { - Thread.sleep(cs.getAsyncScheduleInterval()); - } catch (InterruptedException e) {} + Thread.sleep(cs.getAsyncScheduleInterval()); } static class AsyncScheduleThread extends Thread { @@ -492,9 +490,9 @@ public class CapacityScheduler extends @Override public void run() { - while (true) { + while (!Thread.currentThread().isInterrupted()) { try { - if (!runSchedules.get() || Thread.currentThread().isInterrupted()) { + if (!runSchedules.get()) { Thread.sleep(100); } else { // Don't run schedule if we have some pending backlogs already @@ -505,9 +503,11 @@ public class CapacityScheduler extends } } } catch (InterruptedException ie) { - // Do nothing + // keep interrupt signal + Thread.currentThread().interrupt(); } } + LOG.info("AsyncScheduleThread[" + getName() + "] exited!"); } public void beginSchedule() { @@ -546,8 +546,10 @@ public class CapacityScheduler extends } catch (InterruptedException e) { LOG.error(e); + Thread.currentThread().interrupt(); } } + LOG.info("ResourceCommitterService exited!"); } public void addNewCommitRequest( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java new file mode 100644 index 00000000000..46d5cda2b41 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestRMHAForAsyncScheduler extends RMHATestBase { + + @Before + @Override + public void setup() throws Exception { + super.setup(); + confForRM1 + .setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class, ResourceCalculator.class); + confForRM1.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + confForRM1.setBoolean( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true); + + confForRM2 + .setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class, ResourceCalculator.class); + confForRM2.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + confForRM2.setBoolean( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true); + } + + @Test(timeout = 60000) + public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception { + // start two RMs, and transit rm1 to active, rm2 to standby + startRMs(); + // register NM + rm1.registerNode("h1:1234", 8192, 8); + // submit app1 and check + RMApp app1 = submitAppAndCheckLaunched(rm1); + + // failover RM1 to RM2 + explicitFailover(); + checkAsyncSchedulerThreads(Thread.currentThread()); + + // register NM, kill app1 + rm2.registerNode("h1:1234", 8192, 8); + rm2.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.LAUNCHED); + rm2.killApp(app1.getApplicationId()); + // submit app3 and check + RMApp app2 = submitAppAndCheckLaunched(rm2); + + // failover RM2 to RM1 + HAServiceProtocol.StateChangeRequestInfo requestInfo = + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + rm2.adminService.transitionToStandby(requestInfo); + rm1.adminService.transitionToActive(requestInfo); + Assert.assertTrue(rm2.getRMContext().getHAServiceState() + == HAServiceProtocol.HAServiceState.STANDBY); + Assert.assertTrue(rm1.getRMContext().getHAServiceState() + == HAServiceProtocol.HAServiceState.ACTIVE); + // check async schedule threads + checkAsyncSchedulerThreads(Thread.currentThread()); + + // register NM, kill app2 + rm1.registerNode("h1:1234", 8192, 8); + rm1.waitForState(app2.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.LAUNCHED); + rm1.killApp(app2.getApplicationId()); + // submit app3 and check + submitAppAndCheckLaunched(rm1); + + rm1.stop(); + rm2.stop(); + } + + private RMApp submitAppAndCheckLaunched(MockRM rm) throws Exception { + RMApp app = rm.submitApp(200, "", + UserGroupInformation.getCurrentUser().getShortUserName(), null, false, + "default", configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, false, + false); + rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); + RMAppAttempt attempt = app.getCurrentAppAttempt(); + rm.sendAMLaunched(attempt.getAppAttemptId()); + rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.LAUNCHED); + return app; + } + + /** + * Make sure the state of async-scheduler threads is correct + * @param currentThread + */ + private void checkAsyncSchedulerThreads(Thread currentThread){ + // Make sure AsyncScheduleThread is interrupted + ThreadGroup threadGroup = currentThread.getThreadGroup(); + while (threadGroup.getParent() != null) { + threadGroup = threadGroup.getParent(); + } + Thread[] threads = new Thread[threadGroup.activeCount()]; + threadGroup.enumerate(threads); + int numAsyncScheduleThread = 0; + int numResourceCommitterService = 0; + Thread asyncScheduleThread = null; + Thread resourceCommitterService = null; + for (Thread thread : threads) { + StackTraceElement[] stackTrace = thread.getStackTrace(); + if(stackTrace.length>0){ + String stackBottom = stackTrace[stackTrace.length-1].toString(); + if(stackBottom.contains("AsyncScheduleThread.run")){ + numAsyncScheduleThread++; + asyncScheduleThread = thread; + }else if(stackBottom.contains("ResourceCommitterService.run")){ + numResourceCommitterService++; + resourceCommitterService = thread; + } + } + } + Assert.assertEquals(1, numResourceCommitterService); + Assert.assertEquals(1, numAsyncScheduleThread); + Assert.assertNotNull(asyncScheduleThread); + Assert.assertNotNull(resourceCommitterService); + } + +} \ No newline at end of file