YARN-7509. AsyncScheduleThread and ResourceCommitterService are still running after RM is transitioned to standby. (Tao Yang via wangda)
Change-Id: I7477fe355419fd4a0a6e2bdda7319abad4c4c748 (cherry picked from commit834e91ee91
) (cherry picked from commit94f7b0f99f
)
This commit is contained in:
parent
17289ca677
commit
a6d7fa91d9
|
@ -459,7 +459,7 @@ public class CapacityScheduler extends
|
||||||
* Schedule on all nodes by starting at a random point.
|
* Schedule on all nodes by starting at a random point.
|
||||||
* @param cs
|
* @param cs
|
||||||
*/
|
*/
|
||||||
static void schedule(CapacityScheduler cs) {
|
static void schedule(CapacityScheduler cs) throws InterruptedException{
|
||||||
// First randomize the start point
|
// First randomize the start point
|
||||||
int current = 0;
|
int current = 0;
|
||||||
Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes();
|
Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes();
|
||||||
|
@ -475,9 +475,7 @@ public class CapacityScheduler extends
|
||||||
cs.allocateContainersToNode(node.getNodeID(), false);
|
cs.allocateContainersToNode(node.getNodeID(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
Thread.sleep(cs.getAsyncScheduleInterval());
|
||||||
Thread.sleep(cs.getAsyncScheduleInterval());
|
|
||||||
} catch (InterruptedException e) {}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static class AsyncScheduleThread extends Thread {
|
static class AsyncScheduleThread extends Thread {
|
||||||
|
@ -492,9 +490,9 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
while (true) {
|
while (!Thread.currentThread().isInterrupted()) {
|
||||||
try {
|
try {
|
||||||
if (!runSchedules.get() || Thread.currentThread().isInterrupted()) {
|
if (!runSchedules.get()) {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
} else {
|
} else {
|
||||||
// Don't run schedule if we have some pending backlogs already
|
// Don't run schedule if we have some pending backlogs already
|
||||||
|
@ -505,9 +503,11 @@ public class CapacityScheduler extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
// Do nothing
|
// keep interrupt signal
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG.info("AsyncScheduleThread[" + getName() + "] exited!");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void beginSchedule() {
|
public void beginSchedule() {
|
||||||
|
@ -546,8 +546,10 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.error(e);
|
LOG.error(e);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG.info("ResourceCommitterService exited!");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addNewCommitRequest(
|
public void addNewCommitRequest(
|
||||||
|
|
|
@ -0,0 +1,155 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestRMHAForAsyncScheduler extends RMHATestBase {
|
||||||
|
|
||||||
|
@Before
|
||||||
|
@Override
|
||||||
|
public void setup() throws Exception {
|
||||||
|
super.setup();
|
||||||
|
confForRM1
|
||||||
|
.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
||||||
|
DominantResourceCalculator.class, ResourceCalculator.class);
|
||||||
|
confForRM1.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
confForRM1.setBoolean(
|
||||||
|
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
|
||||||
|
|
||||||
|
confForRM2
|
||||||
|
.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
||||||
|
DominantResourceCalculator.class, ResourceCalculator.class);
|
||||||
|
confForRM2.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
confForRM2.setBoolean(
|
||||||
|
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception {
|
||||||
|
// start two RMs, and transit rm1 to active, rm2 to standby
|
||||||
|
startRMs();
|
||||||
|
// register NM
|
||||||
|
rm1.registerNode("h1:1234", 8192, 8);
|
||||||
|
// submit app1 and check
|
||||||
|
RMApp app1 = submitAppAndCheckLaunched(rm1);
|
||||||
|
|
||||||
|
// failover RM1 to RM2
|
||||||
|
explicitFailover();
|
||||||
|
checkAsyncSchedulerThreads(Thread.currentThread());
|
||||||
|
|
||||||
|
// register NM, kill app1
|
||||||
|
rm2.registerNode("h1:1234", 8192, 8);
|
||||||
|
rm2.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
|
RMAppAttemptState.LAUNCHED);
|
||||||
|
rm2.killApp(app1.getApplicationId());
|
||||||
|
// submit app3 and check
|
||||||
|
RMApp app2 = submitAppAndCheckLaunched(rm2);
|
||||||
|
|
||||||
|
// failover RM2 to RM1
|
||||||
|
HAServiceProtocol.StateChangeRequestInfo requestInfo =
|
||||||
|
new HAServiceProtocol.StateChangeRequestInfo(
|
||||||
|
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||||
|
rm2.adminService.transitionToStandby(requestInfo);
|
||||||
|
rm1.adminService.transitionToActive(requestInfo);
|
||||||
|
Assert.assertTrue(rm2.getRMContext().getHAServiceState()
|
||||||
|
== HAServiceProtocol.HAServiceState.STANDBY);
|
||||||
|
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
|
||||||
|
== HAServiceProtocol.HAServiceState.ACTIVE);
|
||||||
|
// check async schedule threads
|
||||||
|
checkAsyncSchedulerThreads(Thread.currentThread());
|
||||||
|
|
||||||
|
// register NM, kill app2
|
||||||
|
rm1.registerNode("h1:1234", 8192, 8);
|
||||||
|
rm1.waitForState(app2.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
|
RMAppAttemptState.LAUNCHED);
|
||||||
|
rm1.killApp(app2.getApplicationId());
|
||||||
|
// submit app3 and check
|
||||||
|
submitAppAndCheckLaunched(rm1);
|
||||||
|
|
||||||
|
rm1.stop();
|
||||||
|
rm2.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private RMApp submitAppAndCheckLaunched(MockRM rm) throws Exception {
|
||||||
|
RMApp app = rm.submitApp(200, "",
|
||||||
|
UserGroupInformation.getCurrentUser().getShortUserName(), null, false,
|
||||||
|
"default", configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, false,
|
||||||
|
false);
|
||||||
|
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
|
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||||
|
rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||||
|
rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
|
RMAppAttemptState.LAUNCHED);
|
||||||
|
return app;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make sure the state of async-scheduler threads is correct
|
||||||
|
* @param currentThread
|
||||||
|
*/
|
||||||
|
private void checkAsyncSchedulerThreads(Thread currentThread){
|
||||||
|
// Make sure AsyncScheduleThread is interrupted
|
||||||
|
ThreadGroup threadGroup = currentThread.getThreadGroup();
|
||||||
|
while (threadGroup.getParent() != null) {
|
||||||
|
threadGroup = threadGroup.getParent();
|
||||||
|
}
|
||||||
|
Thread[] threads = new Thread[threadGroup.activeCount()];
|
||||||
|
threadGroup.enumerate(threads);
|
||||||
|
int numAsyncScheduleThread = 0;
|
||||||
|
int numResourceCommitterService = 0;
|
||||||
|
Thread asyncScheduleThread = null;
|
||||||
|
Thread resourceCommitterService = null;
|
||||||
|
for (Thread thread : threads) {
|
||||||
|
StackTraceElement[] stackTrace = thread.getStackTrace();
|
||||||
|
if(stackTrace.length>0){
|
||||||
|
String stackBottom = stackTrace[stackTrace.length-1].toString();
|
||||||
|
if(stackBottom.contains("AsyncScheduleThread.run")){
|
||||||
|
numAsyncScheduleThread++;
|
||||||
|
asyncScheduleThread = thread;
|
||||||
|
}else if(stackBottom.contains("ResourceCommitterService.run")){
|
||||||
|
numResourceCommitterService++;
|
||||||
|
resourceCommitterService = thread;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertEquals(1, numResourceCommitterService);
|
||||||
|
Assert.assertEquals(1, numAsyncScheduleThread);
|
||||||
|
Assert.assertNotNull(asyncScheduleThread);
|
||||||
|
Assert.assertNotNull(resourceCommitterService);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue