YARN-6959. RM may allocate wrong AM Container for new attempt. Contributed by Yuqi Wang

This commit is contained in:
Jian He 2017-08-14 10:51:04 -07:00
parent ce797a1706
commit e2f6299f6f
5 changed files with 63 additions and 27 deletions

View File

@ -323,6 +323,7 @@ public abstract class AbstractYarnScheduler
} }
// TODO: Rename it to getCurrentApplicationAttempt
public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) { public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
SchedulerApplication<T> app = applications.get( SchedulerApplication<T> app = applications.get(
applicationAttemptId.getApplicationId()); applicationAttemptId.getApplicationId());

View File

@ -903,6 +903,19 @@ public class CapacityScheduler extends
ContainerUpdates updateRequests) { ContainerUpdates updateRequests) {
FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
if (application == null) { if (application == null) {
LOG.error("Calling allocate on removed or non existent application " +
applicationAttemptId.getApplicationId());
return EMPTY_ALLOCATION;
}
// The allocate may be the leftover from previous attempt, and it will
// impact current attempt, such as confuse the request and allocation for
// current attempt's AM container.
// Note outside precondition check for the attempt id may be
// outdated here, so double check it here is necessary.
if (!application.getApplicationAttemptId().equals(applicationAttemptId)) {
LOG.error("Calling allocate on previous or removed " +
"or non existent application attempt " + applicationAttemptId);
return EMPTY_ALLOCATION; return EMPTY_ALLOCATION;
} }

View File

@ -835,8 +835,19 @@ public class FairScheduler extends
// Make sure this application exists // Make sure this application exists
FSAppAttempt application = getSchedulerApp(appAttemptId); FSAppAttempt application = getSchedulerApp(appAttemptId);
if (application == null) { if (application == null) {
LOG.info("Calling allocate on removed " + LOG.error("Calling allocate on removed or non existent application " +
"or non existent application " + appAttemptId); appAttemptId.getApplicationId());
return EMPTY_ALLOCATION;
}
// The allocate may be the leftover from previous attempt, and it will
// impact current attempt, such as confuse the request and allocation for
// current attempt's AM container.
// Note outside precondition check for the attempt id may be
// outdated here, so double check it here is necessary.
if (!application.getApplicationAttemptId().equals(appAttemptId)) {
LOG.error("Calling allocate on previous or removed " +
"or non existent application attempt " + appAttemptId);
return EMPTY_ALLOCATION; return EMPTY_ALLOCATION;
} }

View File

@ -329,8 +329,19 @@ public class FifoScheduler extends
ContainerUpdates updateRequests) { ContainerUpdates updateRequests) {
FifoAppAttempt application = getApplicationAttempt(applicationAttemptId); FifoAppAttempt application = getApplicationAttempt(applicationAttemptId);
if (application == null) { if (application == null) {
LOG.error("Calling allocate on removed " + LOG.error("Calling allocate on removed or non existent application " +
"or non-existent application " + applicationAttemptId); applicationAttemptId.getApplicationId());
return EMPTY_ALLOCATION;
}
// The allocate may be the leftover from previous attempt, and it will
// impact current attempt, such as confuse the request and allocation for
// current attempt's AM container.
// Note outside precondition check for the attempt id may be
// outdated here, so double check it here is necessary.
if (!application.getApplicationAttemptId().equals(applicationAttemptId)) {
LOG.error("Calling allocate on previous or removed " +
"or non existent application attempt " + applicationAttemptId);
return EMPTY_ALLOCATION; return EMPTY_ALLOCATION;
} }

View File

@ -2107,49 +2107,49 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.init(conf); scheduler.init(conf);
scheduler.start(); scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext()); scheduler.reinitialize(conf, resourceManager.getRMContext());
int minReqSize =
FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
// First ask, queue1 requests 1 large (minReqSize * 2).
ApplicationAttemptId id11 = createAppAttemptId(1, 1); ApplicationAttemptId id11 = createAppAttemptId(1, 1);
createMockRMApp(id11); createMockRMApp(id11);
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", false); scheduler.addApplication(id11.getApplicationId(),
"root.queue1", "user1", false);
scheduler.addApplicationAttempt(id11, false, false); scheduler.addApplicationAttempt(id11, false, false);
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
createMockRMApp(id21);
scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1", false);
scheduler.addApplicationAttempt(id21, false, false);
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
createMockRMApp(id22);
scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1", false);
scheduler.addApplicationAttempt(id22, false, false);
int minReqSize =
FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
// First ask, queue1 requests 1 large (minReqSize * 2).
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>(); List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
ResourceRequest request1 = ResourceRequest request1 = createResourceRequest(minReqSize * 2,
createResourceRequest(minReqSize * 2, ResourceRequest.ANY, 1, 1, true); ResourceRequest.ANY, 1, 1, true);
ask1.add(request1); ask1.add(request1);
scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS); null, null, NULL_UPDATE_REQUESTS);
// Second ask, queue2 requests 1 large. // Second ask, queue2 requests 1 large.
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
createMockRMApp(id21);
scheduler.addApplication(id21.getApplicationId(),
"root.queue2", "user1", false);
scheduler.addApplicationAttempt(id21, false, false);
List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>(); List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
ResourceRequest request2 = createResourceRequest(2 * minReqSize, "foo", 1, 1, ResourceRequest request2 = createResourceRequest(2 * minReqSize,
false); "foo", 1, 1, false);
ResourceRequest request3 = createResourceRequest(2 * minReqSize, ResourceRequest request3 = createResourceRequest(2 * minReqSize,
ResourceRequest.ANY, 1, 1, false); ResourceRequest.ANY, 1, 1, false);
ask2.add(request2); ask2.add(request2);
ask2.add(request3); ask2.add(request3);
scheduler.allocate(id21, ask2, new ArrayList<ContainerId>(), scheduler.allocate(id21, ask2, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS); null, null, NULL_UPDATE_REQUESTS);
// Third ask, queue2 requests 2 small (minReqSize). // Third ask, queue2 requests 2 small (minReqSize).
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
createMockRMApp(id22);
scheduler.addApplication(id22.getApplicationId(),
"root.queue2", "user1", false);
scheduler.addApplicationAttempt(id22, false, false);
List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>(); List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>();
ResourceRequest request4 = createResourceRequest(minReqSize, "bar", 2, 2, ResourceRequest request4 = createResourceRequest(minReqSize,
true); "bar", 2, 2, true);
ResourceRequest request5 = createResourceRequest(minReqSize, ResourceRequest request5 = createResourceRequest(minReqSize,
ResourceRequest.ANY, 2, 2, true); ResourceRequest.ANY, 2, 2, true);
ask3.add(request4); ask3.add(request4);
ask3.add(request5); ask3.add(request5);
scheduler.allocate(id22, ask3, new ArrayList<ContainerId>(), scheduler.allocate(id22, ask3, new ArrayList<ContainerId>(),