YARN-7913. Improve error handling when application recovery fails with exception. Contributed by Wilfred Spiegelenburg
This commit is contained in:
parent
6d52bbbfcf
commit
581072a8f0
@ -480,27 +480,39 @@ protected void addApplication(ApplicationId applicationId,
|
|||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
// Assign the app to the queue creating and prevent queue delete.
|
// Assign the app to the queue creating and prevent queue delete.
|
||||||
|
// This will re-create the queue on restore, however this could fail if
|
||||||
|
// the config was changed.
|
||||||
FSLeafQueue queue = queueMgr.getLeafQueue(queueName, true,
|
FSLeafQueue queue = queueMgr.getLeafQueue(queueName, true,
|
||||||
applicationId);
|
applicationId);
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
rejectApplicationWithMessage(applicationId,
|
if (!isAppRecovering) {
|
||||||
queueName + " is not a leaf queue");
|
rejectApplicationWithMessage(applicationId,
|
||||||
return;
|
queueName + " is not a leaf queue");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// app is recovering we do not want to fail the app now as it was there
|
||||||
|
// before we started the recovery. Add it to the recovery queue:
|
||||||
|
// dynamic queue directly under root, no ACL needed (auto clean up)
|
||||||
|
queueName = "root.recovery";
|
||||||
|
queue = queueMgr.getLeafQueue(queueName, true, applicationId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enforce ACLs: 2nd check, there could be a time laps between the app
|
// Skip ACL check for recovering applications: they have been accepted
|
||||||
// creation in the RMAppManager and getting here. That means we could
|
// in the queue already recovery should not break that.
|
||||||
// have a configuration change (prevent race condition)
|
if (!isAppRecovering) {
|
||||||
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
|
// Enforce ACLs: 2nd check, there could be a time laps between the app
|
||||||
user);
|
// creation in the RMAppManager and getting here. That means we could
|
||||||
|
// have a configuration change (prevent race condition)
|
||||||
if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) &&
|
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
|
||||||
!queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
|
user);
|
||||||
String msg = "User " + user + " does not have permission to submit " +
|
if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) &&
|
||||||
applicationId + " to queue " + queueName;
|
!queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
|
||||||
rejectApplicationWithMessage(applicationId, msg);
|
String msg = "User " + user + " does not have permission to submit "
|
||||||
queue.removeAssignedApp(applicationId);
|
+ applicationId + " to queue " + queueName;
|
||||||
return;
|
rejectApplicationWithMessage(applicationId, msg);
|
||||||
|
queue.removeAssignedApp(applicationId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
RMApp rmApp = rmContext.getRMApps().get(applicationId);
|
RMApp rmApp = rmContext.getRMApps().get(applicationId);
|
||||||
@ -511,7 +523,11 @@ protected void addApplication(ApplicationId applicationId,
|
|||||||
" to set queue name on");
|
" to set queue name on");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rmApp != null && rmApp.getAMResourceRequests() != null) {
|
// when recovering the NMs might not have registered and we could have
|
||||||
|
// no resources in the queue, the app is already running and has thus
|
||||||
|
// passed all these checks, skip them now.
|
||||||
|
if (!isAppRecovering && rmApp != null &&
|
||||||
|
rmApp.getAMResourceRequests() != null) {
|
||||||
// Resources.fitsIn would always return false when queueMaxShare is 0
|
// Resources.fitsIn would always return false when queueMaxShare is 0
|
||||||
// for any resource, but only using Resources.fitsIn is not enough
|
// for any resource, but only using Resources.fitsIn is not enough
|
||||||
// is it would return false for such cases when the requested
|
// is it would return false for such cases when the requested
|
||||||
|
@ -270,6 +270,30 @@ protected void createSchedulingRequestExistingApplication(
|
|||||||
scheduler.update();
|
scheduler.update();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected ApplicationAttemptId createRecoveringApplication(
|
||||||
|
Resource amResource, String queueId, String userId) {
|
||||||
|
ApplicationAttemptId id =
|
||||||
|
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||||
|
|
||||||
|
// On restore the app is already created but we need to check the AM
|
||||||
|
// resource, make sure it is set for test
|
||||||
|
ResourceRequest amRequest = createResourceRequest(
|
||||||
|
// cast to int as we're not testing large values so it is safe
|
||||||
|
(int)amResource.getMemorySize(), amResource.getVirtualCores(),
|
||||||
|
ResourceRequest.ANY, 1, 1, true);
|
||||||
|
List<ResourceRequest> amReqs = new ArrayList<>();
|
||||||
|
amReqs.add(amRequest);
|
||||||
|
createApplicationWithAMResourceInternal(id, queueId, userId, amResource,
|
||||||
|
amReqs);
|
||||||
|
|
||||||
|
// This fakes the placement which is not part of the scheduler anymore
|
||||||
|
ApplicationPlacementContext placementCtx =
|
||||||
|
new ApplicationPlacementContext(queueId);
|
||||||
|
scheduler.addApplication(id.getApplicationId(), queueId, userId, true,
|
||||||
|
placementCtx);
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
protected void createApplicationWithAMResource(ApplicationAttemptId attId,
|
protected void createApplicationWithAMResource(ApplicationAttemptId attId,
|
||||||
String queue, String user, Resource amResource) {
|
String queue, String user, Resource amResource) {
|
||||||
createApplicationWithAMResourceInternal(attId, queue, user, amResource,
|
createApplicationWithAMResourceInternal(attId, queue, user, amResource,
|
||||||
|
@ -5525,4 +5525,128 @@ private void testSchedulingAllowedToQueueZeroCapacityOfResource(
|
|||||||
|
|
||||||
createSchedulingRequest(memory, vCores, "queueA", "user1", 1, 2);
|
createSchedulingRequest(memory, vCores, "queueA", "user1", 1, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRestoreToExistingQueue() throws IOException {
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
generateAllocationFilePercentageResource();
|
||||||
|
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
// no nodes so the resource total should be zero for all queues
|
||||||
|
// AM is using resources
|
||||||
|
Resource amResource = Resources.createResource(1024, 1);
|
||||||
|
// Add the app and the attempt
|
||||||
|
ApplicationAttemptId appAttemptId = null;
|
||||||
|
String queueId = "root.parent.queueA";
|
||||||
|
try {
|
||||||
|
appAttemptId = createRecoveringApplication(amResource, queueId, "user1");
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("The exception is not expected. Exception message: "
|
||||||
|
+ e.getMessage());
|
||||||
|
}
|
||||||
|
scheduler.addApplicationAttempt(appAttemptId, false, true);
|
||||||
|
|
||||||
|
List<ApplicationAttemptId> appsInQueue =
|
||||||
|
scheduler.getAppsInQueue(queueId);
|
||||||
|
assertEquals("Number of apps in queue 'root.parent.queueA' should be one!",
|
||||||
|
1, appsInQueue.size());
|
||||||
|
|
||||||
|
appAttemptId = scheduler.getAppsInQueue(queueId).get(0);
|
||||||
|
assertNotNull("Scheduler app for appAttemptId " + appAttemptId
|
||||||
|
+ " should not be null!", scheduler.getSchedulerApp(appAttemptId));
|
||||||
|
|
||||||
|
FSAppAttempt schedulerApp = scheduler.getSchedulerApp(appAttemptId);
|
||||||
|
assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId
|
||||||
|
+ " should not be null!", schedulerApp.getAppSchedulingInfo());
|
||||||
|
|
||||||
|
assertTrue("There should be no requests accepted", schedulerApp
|
||||||
|
.getAppSchedulingInfo().getAllResourceRequests().isEmpty());
|
||||||
|
|
||||||
|
// Restore an applications with a user that has no access to the queue
|
||||||
|
try {
|
||||||
|
appAttemptId = createRecoveringApplication(amResource, queueId,
|
||||||
|
"usernotinacl");
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("The exception is not expected. Exception message: "
|
||||||
|
+ e.getMessage());
|
||||||
|
}
|
||||||
|
scheduler.addApplicationAttempt(appAttemptId, false, true);
|
||||||
|
|
||||||
|
appsInQueue = scheduler.getAppsInQueue(queueId);
|
||||||
|
assertEquals("Number of apps in queue 'root.parent.queueA' should be two!",
|
||||||
|
2, appsInQueue.size());
|
||||||
|
|
||||||
|
appAttemptId = scheduler.getAppsInQueue(queueId).get(1);
|
||||||
|
assertNotNull("Scheduler app for appAttemptId " + appAttemptId
|
||||||
|
+ " should not be null!", scheduler.getSchedulerApp(appAttemptId));
|
||||||
|
|
||||||
|
schedulerApp = scheduler.getSchedulerApp(appAttemptId);
|
||||||
|
assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId
|
||||||
|
+ " should not be null!", schedulerApp.getAppSchedulingInfo());
|
||||||
|
|
||||||
|
assertTrue("There should be no requests accepted", schedulerApp
|
||||||
|
.getAppSchedulingInfo().getAllResourceRequests().isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRestoreToParentQueue() throws IOException {
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
generateAllocationFilePercentageResource();
|
||||||
|
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
// no nodes so the resource total should be zero for all queues
|
||||||
|
// AM is using resources
|
||||||
|
Resource amResource = Resources.createResource(1024, 1);
|
||||||
|
// Add the app and the attempt, use a queue that is now a parent
|
||||||
|
ApplicationAttemptId appAttemptId = null;
|
||||||
|
String queueId = "root.parent";
|
||||||
|
try {
|
||||||
|
appAttemptId = createRecoveringApplication(amResource, queueId, "user1");
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("The exception is not expected. Exception message: "
|
||||||
|
+ e.getMessage());
|
||||||
|
}
|
||||||
|
scheduler.addApplicationAttempt(appAttemptId, false, true);
|
||||||
|
|
||||||
|
String recoveredQueue = "root.recovery";
|
||||||
|
List<ApplicationAttemptId> appsInQueue =
|
||||||
|
scheduler.getAppsInQueue(recoveredQueue);
|
||||||
|
assertEquals("Number of apps in queue 'root.recovery' should be one!",
|
||||||
|
1, appsInQueue.size());
|
||||||
|
|
||||||
|
appAttemptId =
|
||||||
|
scheduler.getAppsInQueue(recoveredQueue).get(0);
|
||||||
|
assertNotNull("Scheduler app for appAttemptId " + appAttemptId
|
||||||
|
+ " should not be null!", scheduler.getSchedulerApp(appAttemptId));
|
||||||
|
|
||||||
|
FSAppAttempt schedulerApp = scheduler.getSchedulerApp(appAttemptId);
|
||||||
|
assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId
|
||||||
|
+ " should not be null!", schedulerApp.getAppSchedulingInfo());
|
||||||
|
|
||||||
|
assertTrue("There should be no requests accepted", schedulerApp
|
||||||
|
.getAppSchedulingInfo().getAllResourceRequests().isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void generateAllocationFilePercentageResource() {
|
||||||
|
AllocationFileWriter.create()
|
||||||
|
.addQueue(new AllocationFileQueue.Builder("root")
|
||||||
|
.parent(true)
|
||||||
|
.aclSubmitApps(" ")
|
||||||
|
.aclAdministerApps(" ")
|
||||||
|
.subQueue(new AllocationFileQueue.Builder("parent")
|
||||||
|
.parent(true)
|
||||||
|
.maxChildResources("memory-mb=15.0%, vcores=15.0%")
|
||||||
|
.subQueue(new AllocationFileQueue.Builder("queueA")
|
||||||
|
.aclSubmitApps("user1")
|
||||||
|
.build())
|
||||||
|
.build())
|
||||||
|
.build())
|
||||||
|
.writeToFile(ALLOC_FILE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user