YARN-7643. Handle recovery of applications in case of auto-created leaf queue mapping. Contributed by Suma Shivaprasad.

This commit is contained in:
Sunil G 2017-12-13 22:49:58 +05:30
parent 10fc8d2a7d
commit cb87e4dc92
6 changed files with 275 additions and 52 deletions

View File

@ -366,22 +366,20 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
String user, boolean isRecovery, long startTime) throws YarnException {
ApplicationPlacementContext placementContext = null;
try {
placementContext = placeApplication(rmContext, submissionContext, user);
} catch (YarnException e) {
String msg =
"Failed to place application " + submissionContext.getApplicationId()
+ " to queue and specified " + "queue is invalid : "
+ submissionContext.getQueue();
LOG.error(msg, e);
throw e;
}
// We only do queue mapping when it's a new application
// We only replace the queue when it's a new application
if (!isRecovery) {
try {
// Do queue mapping
placementContext = placeApplication(rmContext,
submissionContext, user);
replaceQueueFromPlacementContext(placementContext,
submissionContext);
} catch (YarnException e) {
String msg = "Failed to place application " +
submissionContext.getApplicationId() + " to queue and specified "
+ "queue is invalid : " + submissionContext.getQueue();
LOG.error(msg, e);
throw e;
}
replaceQueueFromPlacementContext(placementContext, submissionContext);
// fail the submission if configured application timeout value is invalid
RMServerUtils.validateApplicationTimeouts(

View File

@ -184,6 +184,8 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
if (mappedQueue != null) {
// We have a mapping, should we use it?
if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
//queueName will be same as mapped queue name in case of recovery
|| queueName.equals(mappedQueue.getQueue())
|| overrideWithQueueMappings) {
LOG.info("Application " + applicationId + " user " + user
+ " mapping [" + queueName + "] to [" + mappedQueue

View File

@ -147,7 +147,7 @@ public abstract class AbstractCSQueue implements CSQueue {
this.metrics = old != null ?
(CSQueueMetrics) old.getMetrics() :
CSQueueMetrics.forQueue(getQueuePath(), parent,
configuration.getEnableUserMetrics(), cs.getConf());
cs.getConfiguration().getEnableUserMetrics(), cs.getConf());
this.csContext = cs;
this.minimumAllocation = csContext.getMinimumResourceCapability();

View File

@ -650,24 +650,28 @@ public class CapacityScheduler extends
return this.queueManager.getQueue(queueName);
}
private void addApplicationOnRecovery(
ApplicationId applicationId, String queueName, String user,
Priority priority) {
private void addApplicationOnRecovery(ApplicationId applicationId,
String queueName, String user,
Priority priority, ApplicationPlacementContext placementContext) {
try {
writeLock.lock();
CSQueue queue = getQueue(queueName);
//check if the queue needs to be auto-created during recovery
CSQueue queue = getOrCreateQueueFromPlacementContext(applicationId, user,
queueName, placementContext, true);
if (queue == null) {
//During a restart, this indicates a queue was removed, which is
//not presently supported
if (!YarnConfiguration.shouldRMFailFast(getConfig())) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.KILL,
"Application killed on recovery as it was submitted to queue "
+ queueName + " which no longer exists after restart."));
"Application killed on recovery as it"
+ " was submitted to queue " + queueName
+ " which no longer exists after restart."));
return;
} else{
String queueErrorMsg = "Queue named " + queueName
+ " missing during application recovery."
String queueErrorMsg = "Queue named " + queueName + " missing "
+ "during application recovery."
+ " Queue removal during recovery is not presently "
+ "supported by the capacity scheduler, please "
+ "restart with all queues configured"
@ -682,8 +686,8 @@ public class CapacityScheduler extends
if (!YarnConfiguration.shouldRMFailFast(getConfig())) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.KILL,
"Application killed on recovery as it was submitted to queue "
+ queueName
"Application killed on recovery as it was "
+ "submitted to queue " + queueName
+ " which is no longer a leaf queue after restart."));
return;
} else{
@ -719,6 +723,51 @@ public class CapacityScheduler extends
}
}
private CSQueue getOrCreateQueueFromPlacementContext(ApplicationId
applicationId, String user, String queueName,
ApplicationPlacementContext placementContext,
boolean isRecovery) {
CSQueue queue = getQueue(queueName);
if (queue == null) {
if (placementContext != null && placementContext.hasParentQueue()) {
try {
return autoCreateLeafQueue(placementContext);
} catch (YarnException | IOException e) {
if (isRecovery) {
if (!YarnConfiguration.shouldRMFailFast(getConfig())) {
LOG.error("Could not auto-create leaf queue " + queueName +
" due to : ", e);
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.KILL,
"Application killed on recovery"
+ " as it was submitted to queue " + queueName
+ " which could not be auto-created"));
} else{
String queueErrorMsg =
"Queue named " + queueName + " could not be "
+ "auto-created during application recovery.";
LOG.fatal(queueErrorMsg, e);
throw new QueueInvalidException(queueErrorMsg);
}
} else{
LOG.error("Could not auto-create leaf queue due to : ", e);
final String message =
"Application " + applicationId + " submission by user : "
+ user
+ " to queue : " + queueName + " failed : " + e
.getMessage();
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message));
}
}
}
}
return queue;
}
private void addApplication(ApplicationId applicationId, String queueName,
String user, Priority priority,
ApplicationPlacementContext placementContext) {
@ -732,23 +781,10 @@ public class CapacityScheduler extends
message));
return;
}
// Sanity checks.
CSQueue queue = getQueue(queueName);
if (queue == null && placementContext != null) {
//Could be a potential auto-created leaf queue
try {
queue = autoCreateLeafQueue(placementContext);
} catch (YarnException | IOException e) {
LOG.error("Could not auto-create leaf queue due to : ", e);
final String message =
"Application " + applicationId + " submission by user : " + user
+ " to queue : " + queueName + " failed : " + e.getMessage();
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message));
}
}
//Could be a potential auto-created leaf queue
CSQueue queue = getOrCreateQueueFromPlacementContext(applicationId, user,
queueName, placementContext, false);
if (queue == null) {
final String message =
@ -1534,7 +1570,8 @@ public class CapacityScheduler extends
appAddedEvent.getPlacementContext());
} else {
addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName,
appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority());
appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority(),
appAddedEvent.getPlacementContext());
}
}
}
@ -2058,10 +2095,10 @@ public class CapacityScheduler extends
+ " (should be set and be a PlanQueue or ManagedParentQueue)");
}
AbstractManagedParentQueue parentPlan =
AbstractManagedParentQueue parent =
(AbstractManagedParentQueue) newQueue.getParent();
String queuename = newQueue.getQueueName();
parentPlan.addChildQueue(newQueue);
parent.addChildQueue(newQueue);
this.queueManager.addQueue(queuename, newQueue);
LOG.info("Creation of AutoCreatedLeafQueue " + newQueue + " succeeded");

View File

@ -64,6 +64,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.TestCapacitySchedulerAutoCreatedQueueBase;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueueMetrics;
@ -97,6 +100,10 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.capacity.TestCapacitySchedulerAutoCreatedQueueBase.USER1;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp
.RMWebServices.DEFAULT_QUEUE;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -281,6 +288,18 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
}
}
private CapacitySchedulerConfiguration
getSchedulerAutoCreatedQueueConfiguration(
boolean overrideWithQueueMappings) throws IOException {
CapacitySchedulerConfiguration schedulerConf =
new CapacitySchedulerConfiguration(conf);
TestCapacitySchedulerAutoCreatedQueueBase
.setupQueueConfigurationForSingleAutoCreatedLeafQueue(schedulerConf);
TestCapacitySchedulerAutoCreatedQueueBase.setupQueueMappings(schedulerConf,
"c", overrideWithQueueMappings, new int[] {0, 1});
return schedulerConf;
}
// Test work preserving recovery of apps running under reservation.
// This involves:
// 1. Setting up a dynamic reservable queue,
@ -1532,4 +1551,141 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
Assert.assertEquals(RMAppState.FINISHED, recoveredApp.getState());
}
@Test(timeout = 30000)
public void testDynamicAutoCreatedQueueRecoveryWithDefaultQueue()
throws Exception {
//if queue name is not specified, it should submit to 'default' queue
testDynamicAutoCreatedQueueRecovery(USER1, null);
}
@Test(timeout = 30000)
public void testDynamicAutoCreatedQueueRecoveryWithOverrideQueueMappingFlag()
throws Exception {
testDynamicAutoCreatedQueueRecovery(USER1, USER1);
}
// Test work preserving recovery of apps running on auto-created queues.
// This involves:
// 1. Setting up a dynamic auto-created queue,
// 2. Submitting an app to it,
// 3. Failing over RM,
// 4. Validating that the app is recovered post failover,
// 5. Check if all running containers are recovered,
// 6. Verify the scheduler state like attempt info,
// 7. Verify the queue/user metrics for the dynamic auto-created queue.
public void testDynamicAutoCreatedQueueRecovery(String user, String queueName)
throws Exception {
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class.getName());
conf.setBoolean(YarnConfiguration.RM_FAIL_FAST, true);
// 1. Set up dynamic auto-created queue.
CapacitySchedulerConfiguration schedulerConf = null;
if (queueName == null || queueName.equals(DEFAULT_QUEUE)) {
schedulerConf = getSchedulerAutoCreatedQueueConfiguration(false);
} else{
schedulerConf = getSchedulerAutoCreatedQueueConfiguration(true);
}
int containerMemory = 1024;
Resource containerResource = Resource.newInstance(containerMemory, 1);
rm1 = new MockRM(schedulerConf);
rm1.start();
MockNM nm1 = new MockNM("127.0.0.1:1234", 8192,
rm1.getResourceTrackerService());
nm1.registerNode();
// 2. submit app to queue which is auto-created.
RMApp app1 = rm1.submitApp(200, "autoCreatedQApp", user, null, queueName);
Resource amResources = app1.getAMResourceRequests().get(0).getCapability();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// clear queue metrics
rm1.clearQueueMetrics(app1);
// 3. Fail over (restart) RM.
rm2 = new MockRM(schedulerConf, rm1.getRMStateStore());
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
// 4. Validate app is recovered post failover.
RMApp recoveredApp1 = rm2.getRMContext().getRMApps().get(
app1.getApplicationId());
RMAppAttempt loadedAttempt1 = recoveredApp1.getCurrentAppAttempt();
NMContainerStatus amContainer = TestRMRestart.createNMContainerStatus(
am1.getApplicationAttemptId(), 1, ContainerState.RUNNING);
NMContainerStatus runningContainer = TestRMRestart.createNMContainerStatus(
am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
NMContainerStatus completedContainer =
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
ContainerState.COMPLETE);
nm1.registerNode(
Arrays.asList(amContainer, runningContainer, completedContainer), null);
// Wait for RM to settle down on recovering containers.
waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
Set<ContainerId> launchedContainers =
((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId()))
.getLaunchedContainers();
assertTrue(launchedContainers.contains(amContainer.getContainerId()));
assertTrue(launchedContainers.contains(runningContainer.getContainerId()));
// 5. Check RMContainers are re-recreated and the container state is
// correct.
rm2.waitForState(nm1, amContainer.getContainerId(),
RMContainerState.RUNNING);
rm2.waitForState(nm1, runningContainer.getContainerId(),
RMContainerState.RUNNING);
rm2.waitForContainerToComplete(loadedAttempt1, completedContainer);
AbstractYarnScheduler scheduler =
(AbstractYarnScheduler) rm2.getResourceScheduler();
SchedulerNode schedulerNode1 = scheduler.getSchedulerNode(nm1.getNodeId());
// ********* check scheduler node state.*******
// 2 running containers.
Resource usedResources = Resources.multiply(containerResource, 2);
Resource nmResource = Resource.newInstance(nm1.getMemory(),
nm1.getvCores());
assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
assertTrue(
schedulerNode1.isValidContainer(runningContainer.getContainerId()));
assertFalse(
schedulerNode1.isValidContainer(completedContainer.getContainerId()));
// 2 launched containers, 1 completed container
assertEquals(2, schedulerNode1.getNumContainers());
assertEquals(Resources.subtract(nmResource, usedResources),
schedulerNode1.getUnallocatedResource());
assertEquals(usedResources, schedulerNode1.getAllocatedResource());
// Resource availableResources = Resources.subtract(nmResource,
// usedResources);
// 6. Verify the scheduler state like attempt info.
Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> sa =
((AbstractYarnScheduler) rm2.getResourceScheduler())
.getSchedulerApplications();
SchedulerApplication<SchedulerApplicationAttempt> schedulerApp = sa.get(
recoveredApp1.getApplicationId());
// 7. Verify the queue/user metrics for the dynamic reservable queue.
if (getSchedulerType() == SchedulerType.CAPACITY) {
checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
}
// *********** check scheduler attempt state.********
SchedulerApplicationAttempt schedulerAttempt =
schedulerApp.getCurrentAppAttempt();
assertTrue(schedulerAttempt.getLiveContainers()
.contains(scheduler.getRMContainer(amContainer.getContainerId())));
assertTrue(schedulerAttempt.getLiveContainers()
.contains(scheduler.getRMContainer(runningContainer.getContainerId())));
assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources);
// *********** check appSchedulingInfo state ***********
assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId());
}
}

View File

@ -100,6 +100,7 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
public static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
public static final String C = CapacitySchedulerConfiguration.ROOT + ".c";
public static final String D = CapacitySchedulerConfiguration.ROOT + ".d";
public static final String E = CapacitySchedulerConfiguration.ROOT + ".e";
public static final String A1 = A + ".a1";
public static final String A2 = A + ".a2";
public static final String B1 = B + ".b1";
@ -129,8 +130,8 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
public static final String USER = "user_";
public static final String USER0 = USER + 0;
public static final String USER1 = USER + 1;
public static final String USER3 = USER + 3;
public static final String USER2 = USER + 2;
public static final String USER3 = USER + 3;
public static final String PARENT_QUEUE = "c";
public static final Set<String> accessibleNodeLabelsOnC = new HashSet<>();
@ -183,7 +184,7 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
setupQueueMappings(conf);
setupQueueMappings(conf, PARENT_QUEUE, true, new int[] {0, 1, 2, 3});
dispatcher = new SpyDispatcher();
rmAppEventEventHandler = new SpyDispatcher.SpyRMAppEventHandler();
@ -225,27 +226,33 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
}
public static CapacitySchedulerConfiguration setupQueueMappings(
CapacitySchedulerConfiguration conf) {
CapacitySchedulerConfiguration conf, String parentQueue, boolean
overrideWithQueueMappings, int[] userIds) {
List<String> queuePlacementRules = new ArrayList<>();
queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
conf.setQueuePlacementRules(queuePlacementRules);
List<UserGroupMappingPlacementRule.QueueMapping> existingMappings = conf
.getQueueMappings();
//set queue mapping
List<UserGroupMappingPlacementRule.QueueMapping> queueMappings =
new ArrayList<>();
for (int i = 0; i <= 3; i++) {
for (int i = 0; i < userIds.length; i++) {
//Set C as parent queue name for auto queue creation
UserGroupMappingPlacementRule.QueueMapping userQueueMapping =
new UserGroupMappingPlacementRule.QueueMapping(
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
USER + i, getQueueMapping(PARENT_QUEUE, USER + i));
USER + userIds[i], getQueueMapping(parentQueue, USER +
userIds[i]));
queueMappings.add(userQueueMapping);
}
conf.setQueueMappings(queueMappings);
existingMappings.addAll(queueMappings);
conf.setQueueMappings(existingMappings);
//override with queue mappings
conf.setOverrideWithQueueMappings(true);
conf.setOverrideWithQueueMappings(overrideWithQueueMappings);
return conf;
}
@ -327,6 +334,29 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
return conf;
}
public static CapacitySchedulerConfiguration
setupQueueConfigurationForSingleAutoCreatedLeafQueue(
CapacitySchedulerConfiguration conf) {
//setup new queues with one of them auto enabled
// Define top-level queues
// Set childQueue for root
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] {"c"});
conf.setCapacity(C, 100f);
conf.setUserLimitFactor(C, 1.0f);
conf.setAutoCreateChildQueueEnabled(C, true);
//Setup leaf queue template configs
conf.setAutoCreatedLeafQueueConfigCapacity(C, 100f);
conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 100.0f);
conf.setAutoCreatedLeafQueueConfigUserLimit(C, 100);
conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f);
return conf;
}
@After
public void tearDown() throws Exception {
if (mockRM != null) {
@ -395,7 +425,7 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
setupQueueMappings(conf);
setupQueueMappings(conf, PARENT_QUEUE, true, new int[] {0, 1, 2, 3});
RMNodeLabelsManager mgr = setupNodeLabelManager(conf);
MockRM newMockRM = new MockRM(conf) {