diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index 2be013f2cdf..bb3d158b612 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -171,6 +171,12 @@ public class OpportunisticContainerAllocatorAMService ((AbstractYarnScheduler)rmContext.getScheduler()) .getApplicationAttempt(appAttemptId); + if (!appAttempt.getApplicationAttemptId().equals(appAttemptId)){ + LOG.error("Calling allocate on previous or removed or non " + + "existent application attempt " + appAttemptId); + return; + } + OpportunisticContainerContext oppCtx = appAttempt.getOpportunisticContainerContext(); oppCtx.updateNodeList(getLeastLoadedNodes()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index 5adb924b27c..8c23a43cea3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -81,10 +81,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; @@ -749,6 +752,35 @@ public class TestOpportunisticContainerAllocatorAMService { Assert.assertEquals(allocatedContainers, metrics.getAllocatedContainers()); } + @Test(timeout = 60000) + public void testAMCrashDuringAllocate() throws Exception { + MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService()); + nm.registerNode(); + + RMApp app = rm.submitApp(1 * GB, "app", "user", null, "default"); + ApplicationAttemptId attemptId0 = + app.getCurrentAppAttempt().getAppAttemptId(); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm); + + //simulate AM crash by replacing the current attempt + //Do not use rm.failApplicationAttempt, the bug will skip due to + //ApplicationDoesNotExistInCacheException + CapacityScheduler scheduler= ((CapacityScheduler) rm.getRMContext(). + getScheduler()); + SchedulerApplication schApp = + (SchedulerApplication)scheduler. + getSchedulerApplications().get(attemptId0.getApplicationId()); + final ApplicationAttemptId appAttemptId1 = TestUtils. + getMockApplicationAttemptId(1, 1); + schApp.setCurrentAppAttempt(new FiCaSchedulerApp(appAttemptId1, + null, scheduler.getQueue("default"), null, rm.getRMContext())); + + //start to allocate + am.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 2)), null); + } + @Test(timeout = 60000) public void testNodeRemovalDuringAllocate() throws Exception { MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());