YARN-7913. Improve error handling when application recovery fails with exception. Contributed by Wilfred Spiegelenburg

This commit is contained in:
Szilard Nemeth 2020-01-22 16:30:59 +01:00
parent b4e9725955
commit 9638985428
3 changed files with 178 additions and 14 deletions

View File

@ -470,14 +470,26 @@ public class FairScheduler extends
return; return;
} }
try {
writeLock.lock(); writeLock.lock();
try {
RMApp rmApp = rmContext.getRMApps().get(applicationId); RMApp rmApp = rmContext.getRMApps().get(applicationId);
// This will re-create the queue on restore, however this could fail if
// the config was changed.
FSLeafQueue queue = assignToQueue(rmApp, queueName, user); FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
if (queue == null) { if (queue == null) {
if (!isAppRecovering) {
return; return;
} }
// app is recovering we do not want to fail the app now as it was there
// before we started the recovery. Add it to the recovery queue:
// dynamic queue directly under root, no ACL needed (auto clean up)
queueName = "root.recovery";
queue = queueMgr.getLeafQueue(queueName, true);
}
// Skip ACL check for recovering applications: they have been accepted
// in the queue already recovery should not break that.
if (!isAppRecovering) {
// Enforce ACLs // Enforce ACLs
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser( UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
user); user);
@ -492,6 +504,7 @@ public class FairScheduler extends
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, msg)); new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, msg));
return; return;
} }
}
SchedulerApplication<FSAppAttempt> application = SchedulerApplication<FSAppAttempt> application =
new SchedulerApplication<FSAppAttempt>(queue, user); new SchedulerApplication<FSAppAttempt>(queue, user);

View File

@ -250,6 +250,31 @@ public class FairSchedulerTestBase {
scheduler.update(); scheduler.update();
} }
protected ApplicationAttemptId createRecoveringApplication(
Resource amResource, String queueId, String userId) {
ApplicationAttemptId id =
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
// On restore the app is already created but we need to check the AM
// resource, make sure it is set for test
ResourceRequest amRequest = createResourceRequest(
// cast to int as we're not testing large values so it is safe
(int)amResource.getMemorySize(), amResource.getVirtualCores(),
ResourceRequest.ANY, 1, 1, true);
List<ResourceRequest> amReqs = new ArrayList<>();
amReqs.add(amRequest);
RMContext rmContext = resourceManager.getRMContext();
ApplicationId appId = id.getApplicationId();
RMApp rmApp = new RMAppImpl(appId, rmContext, conf, null, userId, null,
ApplicationSubmissionContext.newInstance(appId, null, queueId, null,
mock(ContainerLaunchContext.class), false, false, 0, amResource,
null), scheduler, null, 0, null, null, amReqs);
rmContext.getRMApps().put(appId, rmApp);
scheduler.addApplication(appId, queueId, userId, true);
return id;
}
protected void createApplicationWithAMResource(ApplicationAttemptId attId, protected void createApplicationWithAMResource(ApplicationAttemptId attId,
String queue, String user, Resource amResource) { String queue, String user, Resource amResource) {
RMContext rmContext = resourceManager.getRMContext(); RMContext rmContext = resourceManager.getRMContext();

View File

@ -5451,4 +5451,130 @@ public class TestFairScheduler extends FairSchedulerTestBase {
SchedulerUtils.COMPLETED_APPLICATION), SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.EXPIRE); RMContainerEventType.EXPIRE);
} }
@Test
public void testRestoreToExistingQueue() throws IOException {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
generateAllocationFilePercentageResource();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// no nodes so the resource total should be zero for all queues
// AM is using resources
Resource amResource = Resources.createResource(1024, 1);
// Add the app and the attempt
ApplicationAttemptId appAttemptId = null;
String queueId = "root.parent.queueA";
try {
appAttemptId = createRecoveringApplication(amResource, queueId, "user1");
} catch (Exception e) {
fail("The exception is not expected. Exception message: "
+ e.getMessage());
}
scheduler.addApplicationAttempt(appAttemptId, false, true);
List<ApplicationAttemptId> appsInQueue =
scheduler.getAppsInQueue(queueId);
assertEquals("Number of apps in queue 'root.parent.queueA' should be one!",
1, appsInQueue.size());
appAttemptId = scheduler.getAppsInQueue(queueId).get(0);
assertNotNull("Scheduler app for appAttemptId " + appAttemptId
+ " should not be null!", scheduler.getSchedulerApp(appAttemptId));
FSAppAttempt schedulerApp = scheduler.getSchedulerApp(appAttemptId);
assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId
+ " should not be null!", schedulerApp.getAppSchedulingInfo());
assertTrue("There should be no requests accepted", schedulerApp
.getAppSchedulingInfo().getAllResourceRequests().isEmpty());
// Restore an applications with a user that has no access to the queue
try {
appAttemptId = createRecoveringApplication(amResource, queueId,
"usernotinacl");
} catch (Exception e) {
fail("The exception is not expected. Exception message: "
+ e.getMessage());
}
scheduler.addApplicationAttempt(appAttemptId, false, true);
appsInQueue = scheduler.getAppsInQueue(queueId);
assertEquals("Number of apps in queue 'root.parent.queueA' should be two!",
2, appsInQueue.size());
appAttemptId = scheduler.getAppsInQueue(queueId).get(1);
assertNotNull("Scheduler app for appAttemptId " + appAttemptId
+ " should not be null!", scheduler.getSchedulerApp(appAttemptId));
schedulerApp = scheduler.getSchedulerApp(appAttemptId);
assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId
+ " should not be null!", schedulerApp.getAppSchedulingInfo());
assertTrue("There should be no requests accepted", schedulerApp
.getAppSchedulingInfo().getAllResourceRequests().isEmpty());
}
@Test
public void testRestoreToParentQueue() throws IOException {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
generateAllocationFilePercentageResource();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// no nodes so the resource total should be zero for all queues
// AM is using resources
Resource amResource = Resources.createResource(1024, 1);
// Add the app and the attempt, use a queue that is now a parent
ApplicationAttemptId appAttemptId = null;
String queueId = "root.parent";
try {
appAttemptId = createRecoveringApplication(amResource, queueId, "user1");
} catch (Exception e) {
fail("The exception is not expected. Exception message: "
+ e.getMessage());
}
scheduler.addApplicationAttempt(appAttemptId, false, true);
String recoveredQueue = "root.recovery";
List<ApplicationAttemptId> appsInQueue =
scheduler.getAppsInQueue(recoveredQueue);
assertEquals("Number of apps in queue 'root.recovery' should be one!",
1, appsInQueue.size());
appAttemptId =
scheduler.getAppsInQueue(recoveredQueue).get(0);
assertNotNull("Scheduler app for appAttemptId " + appAttemptId
+ " should not be null!", scheduler.getSchedulerApp(appAttemptId));
FSAppAttempt schedulerApp = scheduler.getSchedulerApp(appAttemptId);
assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId
+ " should not be null!", schedulerApp.getAppSchedulingInfo());
assertTrue("There should be no requests accepted", schedulerApp
.getAppSchedulingInfo().getAllResourceRequests().isEmpty());
}
private void generateAllocationFilePercentageResource()
throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"root\">");
out.println("<aclSubmitApps> </aclSubmitApps>");
out.println("<aclAdministerApps> </aclAdministerApps>");
out.println("<queue name=\"parent\">");
out.println("<maxChildResources>15.0%</maxChildResources>");
out.println("<queue name=\"queueA\">");
out.println("<aclSubmitApps>user1</aclSubmitApps>");
out.println("</queue>");
out.println("</queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
}
} }