YARN-209. Fix CapacityScheduler to trigger application-activation when the cluster capacity changes. Contributed by Zhijie Shen.
svn merge --ignore-ancestry -c 1461773 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1461790 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
932a394ba5
commit
c23fa2aff4
|
@ -94,6 +94,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
YARN-474. Fix CapacityScheduler to trigger application-activation when
|
YARN-474. Fix CapacityScheduler to trigger application-activation when
|
||||||
am-resource-percent configuration is refreshed. (Zhijie Shen via vinodkv)
|
am-resource-percent configuration is refreshed. (Zhijie Shen via vinodkv)
|
||||||
|
|
||||||
|
YARN-209. Fix CapacityScheduler to trigger application-activation when
|
||||||
|
the cluster capacity changes. (Zhijie Shen via vinodkv)
|
||||||
|
|
||||||
Release 2.0.4-alpha - UNRELEASED
|
Release 2.0.4-alpha - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -1481,7 +1481,11 @@ public class LeafQueue implements CSQueue {
|
||||||
CSQueueUtils.updateQueueStatistics(
|
CSQueueUtils.updateQueueStatistics(
|
||||||
resourceCalculator, this, getParent(), clusterResource,
|
resourceCalculator, this, getParent(), clusterResource,
|
||||||
minimumAllocation);
|
minimumAllocation);
|
||||||
|
|
||||||
|
// queue metrics are updated, more resource may be available
|
||||||
|
// activate the pending applications if possible
|
||||||
|
activateApplications();
|
||||||
|
|
||||||
// Update application properties
|
// Update application properties
|
||||||
for (FiCaSchedulerApp application : activeApplications) {
|
for (FiCaSchedulerApp application : activeApplications) {
|
||||||
synchronized (application) {
|
synchronized (application) {
|
||||||
|
|
|
@ -26,10 +26,12 @@ import junit.framework.Assert;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
@ -135,6 +137,51 @@ public class TestRM {
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 30000)
|
||||||
|
public void testActivatingApplicationAfterAddingNM() throws Exception {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
|
||||||
|
// start like normal because state is empty
|
||||||
|
rm1.start();
|
||||||
|
|
||||||
|
// app that gets launched
|
||||||
|
RMApp app1 = rm1.submitApp(200);
|
||||||
|
|
||||||
|
// app that does not get launched
|
||||||
|
RMApp app2 = rm1.submitApp(200);
|
||||||
|
|
||||||
|
// app1 and app2 should be scheduled, but because no resource is available,
|
||||||
|
// they are not activated.
|
||||||
|
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||||
|
ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId();
|
||||||
|
rm1.waitForState(attemptId1, RMAppAttemptState.SCHEDULED);
|
||||||
|
RMAppAttempt attempt2 = app2.getCurrentAppAttempt();
|
||||||
|
ApplicationAttemptId attemptId2 = attempt2.getAppAttemptId();
|
||||||
|
rm1.waitForState(attemptId2, RMAppAttemptState.SCHEDULED);
|
||||||
|
|
||||||
|
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
MockNM nm2 = new MockNM("h2:5678", 15120, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
nm2.registerNode();
|
||||||
|
|
||||||
|
//kick the scheduling
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
|
||||||
|
// app1 should be allocated now
|
||||||
|
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
|
||||||
|
rm1.waitForState(attemptId2, RMAppAttemptState.SCHEDULED);
|
||||||
|
|
||||||
|
nm2.nodeHeartbeat(true);
|
||||||
|
|
||||||
|
// app2 should be allocated now
|
||||||
|
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
|
||||||
|
rm1.waitForState(attemptId2, RMAppAttemptState.ALLOCATED);
|
||||||
|
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
TestRM t = new TestRM();
|
TestRM t = new TestRM();
|
||||||
t.testGetNewAppId();
|
t.testGetNewAppId();
|
||||||
|
|
|
@ -1625,6 +1625,49 @@ public class TestLeafQueue {
|
||||||
assertEquals(0, e.pendingApplications.size());
|
assertEquals(0, e.pendingApplications.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 30000)
|
||||||
|
public void testActivateApplicationByUpdatingClusterResource()
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
// Manipulate queue 'e'
|
||||||
|
LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
|
||||||
|
|
||||||
|
// Users
|
||||||
|
final String user_e = "user_e";
|
||||||
|
|
||||||
|
// Submit applications
|
||||||
|
final ApplicationAttemptId appAttemptId_0 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||||
|
FiCaSchedulerApp app_0 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_0, user_e, e,
|
||||||
|
mock(ActiveUsersManager.class), rmContext);
|
||||||
|
e.submitApplication(app_0, user_e, E);
|
||||||
|
|
||||||
|
final ApplicationAttemptId appAttemptId_1 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(1, 0);
|
||||||
|
FiCaSchedulerApp app_1 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_1, user_e, e,
|
||||||
|
mock(ActiveUsersManager.class), rmContext);
|
||||||
|
e.submitApplication(app_1, user_e, E); // same user
|
||||||
|
|
||||||
|
final ApplicationAttemptId appAttemptId_2 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(2, 0);
|
||||||
|
FiCaSchedulerApp app_2 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_2, user_e, e,
|
||||||
|
mock(ActiveUsersManager.class), rmContext);
|
||||||
|
e.submitApplication(app_2, user_e, E); // same user
|
||||||
|
|
||||||
|
// before updating cluster resource
|
||||||
|
assertEquals(2, e.activeApplications.size());
|
||||||
|
assertEquals(1, e.pendingApplications.size());
|
||||||
|
|
||||||
|
e.updateClusterResource(Resources.createResource(200 * 16 * GB, 100 * 32));
|
||||||
|
|
||||||
|
// after updating cluster resource
|
||||||
|
assertEquals(3, e.activeApplications.size());
|
||||||
|
assertEquals(0, e.pendingApplications.size());
|
||||||
|
}
|
||||||
|
|
||||||
public boolean hasQueueACL(List<QueueUserACLInfo> aclInfos, QueueACL acl) {
|
public boolean hasQueueACL(List<QueueUserACLInfo> aclInfos, QueueACL acl) {
|
||||||
for (QueueUserACLInfo aclInfo : aclInfos) {
|
for (QueueUserACLInfo aclInfo : aclInfos) {
|
||||||
if (aclInfo.getUserAcls().contains(acl)) {
|
if (aclInfo.getUserAcls().contains(acl)) {
|
||||||
|
|
Loading…
Reference in New Issue