YARN-7913. Improve error handling when application recovery fails with exception. Contributed by Wilfred Spiegelenburg
This commit is contained in:
parent
4842708320
commit
1e7679035f
|
@ -477,12 +477,25 @@ public class FairScheduler extends
|
|||
writeLock.lock();
|
||||
try {
|
||||
RMApp rmApp = rmContext.getRMApps().get(applicationId);
|
||||
// This will re-create the queue on restore, however this could fail if
|
||||
// the config was changed.
|
||||
FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
|
||||
if (queue == null) {
|
||||
return;
|
||||
if (!isAppRecovering) {
|
||||
return;
|
||||
}
|
||||
// app is recovering we do not want to fail the app now as it was there
|
||||
// before we started the recovery. Add it to the recovery queue:
|
||||
// dynamic queue directly under root, no ACL needed (auto clean up)
|
||||
queueName = "root.recovery";
|
||||
queue = queueMgr.getLeafQueue(queueName, true);
|
||||
}
|
||||
|
||||
if (rmApp != null && rmApp.getAMResourceRequests() != null) {
|
||||
// when recovering the NMs might not have registered and we could have
|
||||
// no resources in the queue, the app is already running and has thus
|
||||
// passed all these checks, skip them now.
|
||||
if (!isAppRecovering && rmApp != null &&
|
||||
rmApp.getAMResourceRequests() != null) {
|
||||
// Resources.fitsIn would always return false when queueMaxShare is 0
|
||||
// for any resource, but only using Resources.fitsIn is not enough
|
||||
// is it would return false for such cases when the requested
|
||||
|
@ -507,17 +520,21 @@ public class FairScheduler extends
|
|||
}
|
||||
}
|
||||
|
||||
// Enforce ACLs
|
||||
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
|
||||
user);
|
||||
// Skip ACL check for recovering applications: they have been accepted
|
||||
// in the queue already recovery should not break that.
|
||||
if (!isAppRecovering) {
|
||||
// Enforce ACLs
|
||||
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
|
||||
user);
|
||||
|
||||
if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) && !queue
|
||||
.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
|
||||
String msg = "User " + userUgi.getUserName()
|
||||
+ " cannot submit applications to queue " + queue.getName()
|
||||
+ "(requested queuename is " + queueName + ")";
|
||||
rejectApplicationWithMessage(applicationId, msg);
|
||||
return;
|
||||
if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) && !queue
|
||||
.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
|
||||
String msg = "User " + userUgi.getUserName()
|
||||
+ " cannot submit applications to queue " + queue.getName()
|
||||
+ "(requested queuename is " + queueName + ")";
|
||||
rejectApplicationWithMessage(applicationId, msg);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
SchedulerApplication<FSAppAttempt> application =
|
||||
|
|
|
@ -260,6 +260,26 @@ public class FairSchedulerTestBase {
|
|||
scheduler.update();
|
||||
}
|
||||
|
||||
protected ApplicationAttemptId createRecoveringApplication(
|
||||
Resource amResource, String queueId, String userId) {
|
||||
ApplicationAttemptId id =
|
||||
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||
|
||||
// On restore the app is already created but we need to check the AM
|
||||
// resource, make sure it is set for test
|
||||
ResourceRequest amRequest = createResourceRequest(
|
||||
// cast to int as we're not testing large values so it is safe
|
||||
(int)amResource.getMemorySize(), amResource.getVirtualCores(),
|
||||
ResourceRequest.ANY, 1, 1, true);
|
||||
List<ResourceRequest> amReqs = new ArrayList<>();
|
||||
amReqs.add(amRequest);
|
||||
createApplicationWithAMResourceInternal(id, queueId, userId, amResource,
|
||||
amReqs);
|
||||
|
||||
scheduler.addApplication(id.getApplicationId(), queueId, userId, true);
|
||||
return id;
|
||||
}
|
||||
|
||||
protected void createApplicationWithAMResource(ApplicationAttemptId attId,
|
||||
String queue, String user, Resource amResource) {
|
||||
createApplicationWithAMResourceInternal(attId, queue, user, amResource,
|
||||
|
|
|
@ -5766,4 +5766,131 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
|
||||
createSchedulingRequest(memory, vCores, "queueA", "user1", 1, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestoreToExistingQueue() throws IOException {
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
generateAllocationFilePercentageResource();
|
||||
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
// no nodes so the resource total should be zero for all queues
|
||||
// AM is using resources
|
||||
Resource amResource = Resources.createResource(1024, 1);
|
||||
// Add the app and the attempt
|
||||
ApplicationAttemptId appAttemptId = null;
|
||||
String queueId = "root.parent.queueA";
|
||||
try {
|
||||
appAttemptId = createRecoveringApplication(amResource, queueId, "user1");
|
||||
} catch (Exception e) {
|
||||
fail("The exception is not expected. Exception message: "
|
||||
+ e.getMessage());
|
||||
}
|
||||
scheduler.addApplicationAttempt(appAttemptId, false, true);
|
||||
|
||||
List<ApplicationAttemptId> appsInQueue =
|
||||
scheduler.getAppsInQueue(queueId);
|
||||
assertEquals("Number of apps in queue 'root.parent.queueA' should be one!",
|
||||
1, appsInQueue.size());
|
||||
|
||||
appAttemptId = scheduler.getAppsInQueue(queueId).get(0);
|
||||
assertNotNull("Scheduler app for appAttemptId " + appAttemptId
|
||||
+ " should not be null!", scheduler.getSchedulerApp(appAttemptId));
|
||||
|
||||
FSAppAttempt schedulerApp = scheduler.getSchedulerApp(appAttemptId);
|
||||
assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId
|
||||
+ " should not be null!", schedulerApp.getAppSchedulingInfo());
|
||||
|
||||
assertTrue("There should be no requests accepted", schedulerApp
|
||||
.getAppSchedulingInfo().getAllResourceRequests().isEmpty());
|
||||
|
||||
// Restore an applications with a user that has no access to the queue
|
||||
try {
|
||||
appAttemptId = createRecoveringApplication(amResource, queueId,
|
||||
"usernotinacl");
|
||||
} catch (Exception e) {
|
||||
fail("The exception is not expected. Exception message: "
|
||||
+ e.getMessage());
|
||||
}
|
||||
scheduler.addApplicationAttempt(appAttemptId, false, true);
|
||||
|
||||
appsInQueue = scheduler.getAppsInQueue(queueId);
|
||||
assertEquals("Number of apps in queue 'root.parent.queueA' should be two!",
|
||||
2, appsInQueue.size());
|
||||
|
||||
appAttemptId = scheduler.getAppsInQueue(queueId).get(1);
|
||||
assertNotNull("Scheduler app for appAttemptId " + appAttemptId
|
||||
+ " should not be null!", scheduler.getSchedulerApp(appAttemptId));
|
||||
|
||||
schedulerApp = scheduler.getSchedulerApp(appAttemptId);
|
||||
assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId
|
||||
+ " should not be null!", schedulerApp.getAppSchedulingInfo());
|
||||
|
||||
assertTrue("There should be no requests accepted", schedulerApp
|
||||
.getAppSchedulingInfo().getAllResourceRequests().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestoreToParentQueue() throws IOException {
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
generateAllocationFilePercentageResource();
|
||||
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
// no nodes so the resource total should be zero for all queues
|
||||
// AM is using resources
|
||||
Resource amResource = Resources.createResource(1024, 1);
|
||||
// Add the app and the attempt, use a queue that is now a parent
|
||||
ApplicationAttemptId appAttemptId = null;
|
||||
String queueId = "root.parent";
|
||||
try {
|
||||
appAttemptId = createRecoveringApplication(amResource, queueId, "user1");
|
||||
} catch (Exception e) {
|
||||
fail("The exception is not expected. Exception message: "
|
||||
+ e.getMessage());
|
||||
}
|
||||
scheduler.addApplicationAttempt(appAttemptId, false, true);
|
||||
|
||||
String recoveredQueue = "root.recovery";
|
||||
List<ApplicationAttemptId> appsInQueue =
|
||||
scheduler.getAppsInQueue(recoveredQueue);
|
||||
assertEquals("Number of apps in queue 'root.recovery' should be one!",
|
||||
1, appsInQueue.size());
|
||||
|
||||
appAttemptId =
|
||||
scheduler.getAppsInQueue(recoveredQueue).get(0);
|
||||
assertNotNull("Scheduler app for appAttemptId " + appAttemptId
|
||||
+ " should not be null!", scheduler.getSchedulerApp(appAttemptId));
|
||||
|
||||
FSAppAttempt schedulerApp = scheduler.getSchedulerApp(appAttemptId);
|
||||
assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId
|
||||
+ " should not be null!", schedulerApp.getAppSchedulingInfo());
|
||||
|
||||
assertTrue("There should be no requests accepted", schedulerApp
|
||||
.getAppSchedulingInfo().getAllResourceRequests().isEmpty());
|
||||
}
|
||||
|
||||
private void generateAllocationFilePercentageResource()
|
||||
throws IOException {
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"root\">");
|
||||
out.println("<aclSubmitApps> </aclSubmitApps>");
|
||||
out.println("<aclAdministerApps> </aclAdministerApps>");
|
||||
out.println("<queue name=\"parent\">");
|
||||
out.println("<maxChildResources>memory-mb=15.0%, vcores=15.0%" +
|
||||
"</maxChildResources>");
|
||||
out.println("<queue name=\"queueA\">");
|
||||
out.println("<aclSubmitApps>user1</aclSubmitApps>");
|
||||
out.println("</queue>");
|
||||
out.println("</queue>");
|
||||
out.println("</queue>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue