YARN-7509. AsyncScheduleThread and ResourceCommitterService are still running after RM is transitioned to standby. (Tao Yang via wangda)

Change-Id: I7477fe355419fd4a0a6e2bdda7319abad4c4c748
This commit is contained in:
Wangda Tan 2017-11-23 19:59:03 -08:00
parent d162252d7a
commit 834e91ee91
2 changed files with 164 additions and 7 deletions

View File

@ -459,7 +459,7 @@ public class CapacityScheduler extends
* Schedule on all nodes by starting at a random point.
* @param cs
*/
static void schedule(CapacityScheduler cs) {
static void schedule(CapacityScheduler cs) throws InterruptedException{
// First randomize the start point
int current = 0;
Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes();
@ -475,9 +475,7 @@ public class CapacityScheduler extends
cs.allocateContainersToNode(node.getNodeID(), false);
}
try {
Thread.sleep(cs.getAsyncScheduleInterval());
} catch (InterruptedException e) {}
}
static class AsyncScheduleThread extends Thread {
@ -492,9 +490,9 @@ public class CapacityScheduler extends
@Override
public void run() {
while (true) {
while (!Thread.currentThread().isInterrupted()) {
try {
if (!runSchedules.get() || Thread.currentThread().isInterrupted()) {
if (!runSchedules.get()) {
Thread.sleep(100);
} else {
// Don't run schedule if we have some pending backlogs already
@ -505,9 +503,11 @@ public class CapacityScheduler extends
}
}
} catch (InterruptedException ie) {
// Do nothing
// keep interrupt signal
Thread.currentThread().interrupt();
}
}
LOG.info("AsyncScheduleThread[" + getName() + "] exited!");
}
public void beginSchedule() {
@ -546,8 +546,10 @@ public class CapacityScheduler extends
} catch (InterruptedException e) {
LOG.error(e);
Thread.currentThread().interrupt();
}
}
LOG.info("ResourceCommitterService exited!");
}
public void addNewCommitRequest(

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestRMHAForAsyncScheduler extends RMHATestBase {
@Before
@Override
public void setup() throws Exception {
super.setup();
confForRM1
.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class, ResourceCalculator.class);
confForRM1.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
confForRM1.setBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
confForRM2
.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class, ResourceCalculator.class);
confForRM2.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
confForRM2.setBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
}
@Test(timeout = 60000)
public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception {
// start two RMs, and transit rm1 to active, rm2 to standby
startRMs();
// register NM
rm1.registerNode("h1:1234", 8192, 8);
// submit app1 and check
RMApp app1 = submitAppAndCheckLaunched(rm1);
// failover RM1 to RM2
explicitFailover();
checkAsyncSchedulerThreads(Thread.currentThread());
// register NM, kill app1
rm2.registerNode("h1:1234", 8192, 8);
rm2.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.LAUNCHED);
rm2.killApp(app1.getApplicationId());
// submit app3 and check
RMApp app2 = submitAppAndCheckLaunched(rm2);
// failover RM2 to RM1
HAServiceProtocol.StateChangeRequestInfo requestInfo =
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
rm2.adminService.transitionToStandby(requestInfo);
rm1.adminService.transitionToActive(requestInfo);
Assert.assertTrue(rm2.getRMContext().getHAServiceState()
== HAServiceProtocol.HAServiceState.STANDBY);
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
== HAServiceProtocol.HAServiceState.ACTIVE);
// check async schedule threads
checkAsyncSchedulerThreads(Thread.currentThread());
// register NM, kill app2
rm1.registerNode("h1:1234", 8192, 8);
rm1.waitForState(app2.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.LAUNCHED);
rm1.killApp(app2.getApplicationId());
// submit app3 and check
submitAppAndCheckLaunched(rm1);
rm1.stop();
rm2.stop();
}
private RMApp submitAppAndCheckLaunched(MockRM rm) throws Exception {
RMApp app = rm.submitApp(200, "",
UserGroupInformation.getCurrentUser().getShortUserName(), null, false,
"default", configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, false,
false);
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
RMAppAttempt attempt = app.getCurrentAppAttempt();
rm.sendAMLaunched(attempt.getAppAttemptId());
rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.LAUNCHED);
return app;
}
/**
* Make sure the state of async-scheduler threads is correct
* @param currentThread
*/
private void checkAsyncSchedulerThreads(Thread currentThread){
// Make sure AsyncScheduleThread is interrupted
ThreadGroup threadGroup = currentThread.getThreadGroup();
while (threadGroup.getParent() != null) {
threadGroup = threadGroup.getParent();
}
Thread[] threads = new Thread[threadGroup.activeCount()];
threadGroup.enumerate(threads);
int numAsyncScheduleThread = 0;
int numResourceCommitterService = 0;
Thread asyncScheduleThread = null;
Thread resourceCommitterService = null;
for (Thread thread : threads) {
StackTraceElement[] stackTrace = thread.getStackTrace();
if(stackTrace.length>0){
String stackBottom = stackTrace[stackTrace.length-1].toString();
if(stackBottom.contains("AsyncScheduleThread.run")){
numAsyncScheduleThread++;
asyncScheduleThread = thread;
}else if(stackBottom.contains("ResourceCommitterService.run")){
numResourceCommitterService++;
resourceCommitterService = thread;
}
}
}
Assert.assertEquals(1, numResourceCommitterService);
Assert.assertEquals(1, numAsyncScheduleThread);
Assert.assertNotNull(asyncScheduleThread);
Assert.assertNotNull(resourceCommitterService);
}
}