YARN-10526. RMAppManager CS Placement ignores parent path. Contributed by Gergely Pollak
This commit is contained in:
parent
5bf977e6b1
commit
df35c7f519
|
@ -500,11 +500,25 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//In the case of capacity scheduler the queue name only means the name of
|
||||||
|
// the leaf queue, but since YARN-9879, internal queue references should
|
||||||
|
// use full path, so we get the queue and parent name from the placement
|
||||||
|
// context instead of the submissionContext.
|
||||||
|
String placementQueueName = submissionContext.getQueue();
|
||||||
|
if (placementContext != null && scheduler instanceof CapacityScheduler) {
|
||||||
|
if (placementContext.hasParentQueue()) {
|
||||||
|
placementQueueName = placementContext.getParentQueue() + "." +
|
||||||
|
placementContext.getQueue();
|
||||||
|
} else {
|
||||||
|
placementQueueName = placementContext.getQueue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Create RMApp
|
// Create RMApp
|
||||||
RMAppImpl application =
|
RMAppImpl application =
|
||||||
new RMAppImpl(applicationId, rmContext, this.conf,
|
new RMAppImpl(applicationId, rmContext, this.conf,
|
||||||
submissionContext.getApplicationName(), user,
|
submissionContext.getApplicationName(), user,
|
||||||
submissionContext.getQueue(),
|
placementQueueName,
|
||||||
submissionContext, this.scheduler, this.masterService,
|
submissionContext, this.scheduler, this.masterService,
|
||||||
submitTime, submissionContext.getApplicationType(),
|
submitTime, submissionContext.getApplicationType(),
|
||||||
submissionContext.getApplicationTags(), amReqs, placementContext,
|
submissionContext.getApplicationTags(), amReqs, placementContext,
|
||||||
|
|
|
@ -1835,6 +1835,40 @@ public class CapacityScheduler extends
|
||||||
return assignment;
|
return assignment;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method extracts the actual queue name from an app add event.
|
||||||
|
* Currently unfortunately ApplicationPlacementContext and
|
||||||
|
* ApplicationSubmissionContext are used in a quite erratic way, this method
|
||||||
|
* helps to get the proper placement path for the queue if placement context
|
||||||
|
* is provided
|
||||||
|
* @param appAddedEvent The application add event with details about the app
|
||||||
|
* @return The name of the queue the application should be added
|
||||||
|
*/
|
||||||
|
private String getAddedAppQueueName(AppAddedSchedulerEvent appAddedEvent) {
|
||||||
|
//appAddedEvent uses the queue from ApplicationSubmissionContext but in
|
||||||
|
//the case of CS it may be only a leaf name due to legacy reasons
|
||||||
|
String ret = appAddedEvent.getQueue();
|
||||||
|
ApplicationPlacementContext placementContext =
|
||||||
|
appAddedEvent.getPlacementContext();
|
||||||
|
|
||||||
|
//If we have a placement context, it means a mapping rule made a decision
|
||||||
|
//about the queue placement, so we use those data, it is supposed to be in
|
||||||
|
//sync with the ApplicationSubmissionContext and appAddedEvent.getQueue, but
|
||||||
|
//because of the aforementioned legacy reasons these two may only contain
|
||||||
|
//the leaf queue name.
|
||||||
|
if (placementContext != null) {
|
||||||
|
String leafName = placementContext.getQueue();
|
||||||
|
String parentName = placementContext.getParentQueue();
|
||||||
|
if (leafName != null) {
|
||||||
|
//building the proper queue path from the parent and leaf queue name
|
||||||
|
ret = placementContext.hasParentQueue() ?
|
||||||
|
(parentName + "." + leafName) : leafName;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handle(SchedulerEvent event) {
|
public void handle(SchedulerEvent event) {
|
||||||
switch(event.getType()) {
|
switch(event.getType()) {
|
||||||
|
@ -1886,9 +1920,9 @@ public class CapacityScheduler extends
|
||||||
case APP_ADDED:
|
case APP_ADDED:
|
||||||
{
|
{
|
||||||
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
|
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
|
||||||
String queueName = resolveReservationQueueName(appAddedEvent.getQueue(),
|
String queueName = resolveReservationQueueName(
|
||||||
appAddedEvent.getApplicationId(), appAddedEvent.getReservationID(),
|
getAddedAppQueueName(appAddedEvent), appAddedEvent.getApplicationId(),
|
||||||
appAddedEvent.getIsAppRecovering());
|
appAddedEvent.getReservationID(), appAddedEvent.getIsAppRecovering());
|
||||||
if (queueName != null) {
|
if (queueName != null) {
|
||||||
if (!appAddedEvent.getIsAppRecovering()) {
|
if (!appAddedEvent.getIsAppRecovering()) {
|
||||||
addApplication(appAddedEvent.getApplicationId(), queueName,
|
addApplication(appAddedEvent.getApplicationId(), queueName,
|
||||||
|
|
|
@ -420,7 +420,7 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
||||||
(C, NODEL_LABEL_SSD);
|
(C, NODEL_LABEL_SSD);
|
||||||
|
|
||||||
|
|
||||||
LOG.info("Setup " + C + " as an auto leaf creation enabled parent queue");
|
LOG.info("Setup " + D + " as an auto leaf creation enabled parent queue");
|
||||||
|
|
||||||
conf.setUserLimitFactor(D, 1.0f);
|
conf.setUserLimitFactor(D, 1.0f);
|
||||||
conf.setAutoCreateChildQueueEnabled(D, true);
|
conf.setAutoCreateChildQueueEnabled(D, true);
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabels
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.placement
|
import org.apache.hadoop.yarn.server.resourcemanager.placement
|
||||||
.ApplicationPlacementContext;
|
.ApplicationPlacementContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||||
|
@ -90,6 +91,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.C
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -501,7 +503,12 @@ public class TestCapacitySchedulerAutoQueueCreation
|
||||||
|
|
||||||
assertNotNull(newCS.getQueue(USER0));
|
assertNotNull(newCS.getQueue(USER0));
|
||||||
|
|
||||||
setupQueueMapping(newCS, USER0, "d", USER0);
|
//The new placement engine's validation is a bit more
|
||||||
|
//strict so it would reject the original u:user_0:a.user_0 rule since
|
||||||
|
//it checks if that paths exists or is a managed parent, but if we use
|
||||||
|
//a.%user we can trick the engine, since it cannot validate if the actual
|
||||||
|
//value of the %user will exist or not, it allows the rule
|
||||||
|
setupQueueMapping(newCS, USER0, "a", "%user");
|
||||||
newCS.updatePlacementRules();
|
newCS.updatePlacementRules();
|
||||||
|
|
||||||
RMContext rmContext = mock(RMContext.class);
|
RMContext rmContext = mock(RMContext.class);
|
||||||
|
@ -509,8 +516,10 @@ public class TestCapacitySchedulerAutoQueueCreation
|
||||||
newCS.setRMContext(rmContext);
|
newCS.setRMContext(rmContext);
|
||||||
|
|
||||||
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
|
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
|
||||||
|
//The new engine would return root.a as the parent queue for this
|
||||||
|
// submission so creating the ApplicationPlacementContext accordingly
|
||||||
SchedulerEvent addAppEvent = new AppAddedSchedulerEvent(appId, USER0,
|
SchedulerEvent addAppEvent = new AppAddedSchedulerEvent(appId, USER0,
|
||||||
USER0, new ApplicationPlacementContext(USER0, "d"));
|
USER0, new ApplicationPlacementContext(USER0, "root.a"));
|
||||||
newCS.handle(addAppEvent);
|
newCS.handle(addAppEvent);
|
||||||
|
|
||||||
RMAppEvent event = new RMAppEvent(appId, RMAppEventType.APP_REJECTED,
|
RMAppEvent event = new RMAppEvent(appId, RMAppEventType.APP_REJECTED,
|
||||||
|
@ -524,6 +533,56 @@ public class TestCapacitySchedulerAutoQueueCreation
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test case checks if a mapping rule can put an application to an auto
|
||||||
|
* created queue even if an other queue with the same leaf name already
|
||||||
|
* exists.
|
||||||
|
*
|
||||||
|
* In this scenario we use the following queues
|
||||||
|
* root.a.a1 - already existing queue
|
||||||
|
* root.c - managed parent queue
|
||||||
|
*
|
||||||
|
* And the following mapping rule
|
||||||
|
* u:%user:root.c.%user - Any submission should go to root.c.USERNAME queue
|
||||||
|
*
|
||||||
|
* When user 'a1' submits a new application we expect it to go to 'root.c.a1'
|
||||||
|
* because of the mapping rule, and the queue should be created.
|
||||||
|
*
|
||||||
|
* @throws Exception - When stuff go wrong, obvious reason to fail the test
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testAutoQueueCreationWhenQueueExistsWithSameName()
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
MockRM newMockRM = setupSchedulerInstance();
|
||||||
|
CapacityScheduler newCS =
|
||||||
|
(CapacityScheduler) newMockRM.getResourceScheduler();
|
||||||
|
|
||||||
|
try {
|
||||||
|
setupQueueMapping(newCS, "%user", "root.c", "%user");
|
||||||
|
newCS.updatePlacementRules();
|
||||||
|
|
||||||
|
//making sure the target queue does not exist before submission
|
||||||
|
assertNull(newCS.getQueue("root.c.a1"));
|
||||||
|
RMApp app = MockRMAppSubmitter.submit(newMockRM,
|
||||||
|
MockRMAppSubmissionData.Builder.createWithMemory(512, newMockRM)
|
||||||
|
.withAppName("testAutoQueueCreationWhenQueueExistsWithSameName")
|
||||||
|
.withUser("a1")
|
||||||
|
.withQueue("default")
|
||||||
|
.build());
|
||||||
|
RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, newMockRM);
|
||||||
|
//checking if the target queue have been created during the submission
|
||||||
|
assertNotNull(newCS.getQueue("root.c.a1"));
|
||||||
|
//making sure the application is indeed in the right queue
|
||||||
|
assertEquals("root.c.a1", app.getQueue());
|
||||||
|
} finally {
|
||||||
|
if (newMockRM != null) {
|
||||||
|
((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
|
||||||
|
newMockRM.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAutoCreationFailsWhenParentCapacityExceeded()
|
public void testAutoCreationFailsWhenParentCapacityExceeded()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue