YARN-9238. Avoid allocating opportunistic containers to previous/removed/non-exist application attempt. Contributed by lujie.
This commit is contained in:
parent
632d5e8a98
commit
9c88695bcd
|
@ -172,6 +172,12 @@ public class OpportunisticContainerAllocatorAMService
|
||||||
((AbstractYarnScheduler)rmContext.getScheduler())
|
((AbstractYarnScheduler)rmContext.getScheduler())
|
||||||
.getApplicationAttempt(appAttemptId);
|
.getApplicationAttempt(appAttemptId);
|
||||||
|
|
||||||
|
if (!appAttempt.getApplicationAttemptId().equals(appAttemptId)){
|
||||||
|
LOG.error("Calling allocate on previous or removed or non "
|
||||||
|
+ "existent application attempt " + appAttemptId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
OpportunisticContainerContext oppCtx =
|
OpportunisticContainerContext oppCtx =
|
||||||
appAttempt.getOpportunisticContainerContext();
|
appAttempt.getOpportunisticContainerContext();
|
||||||
oppCtx.updateNodeList(getLeastLoadedNodes());
|
oppCtx.updateNodeList(getLeastLoadedNodes());
|
||||||
|
|
|
@ -81,10 +81,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
|
@ -749,6 +752,35 @@ public class TestOpportunisticContainerAllocatorAMService {
|
||||||
Assert.assertEquals(allocatedContainers, metrics.getAllocatedContainers());
|
Assert.assertEquals(allocatedContainers, metrics.getAllocatedContainers());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testAMCrashDuringAllocate() throws Exception {
|
||||||
|
MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService());
|
||||||
|
nm.registerNode();
|
||||||
|
|
||||||
|
RMApp app = rm.submitApp(1 * GB, "app", "user", null, "default");
|
||||||
|
ApplicationAttemptId attemptId0 =
|
||||||
|
app.getCurrentAppAttempt().getAppAttemptId();
|
||||||
|
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm);
|
||||||
|
|
||||||
|
//simulate AM crash by replacing the current attempt
|
||||||
|
//Do not use rm.failApplicationAttempt, the bug will skip due to
|
||||||
|
//ApplicationDoesNotExistInCacheException
|
||||||
|
CapacityScheduler scheduler= ((CapacityScheduler) rm.getRMContext().
|
||||||
|
getScheduler());
|
||||||
|
SchedulerApplication<FiCaSchedulerApp> schApp =
|
||||||
|
(SchedulerApplication<FiCaSchedulerApp>)scheduler.
|
||||||
|
getSchedulerApplications().get(attemptId0.getApplicationId());
|
||||||
|
final ApplicationAttemptId appAttemptId1 = TestUtils.
|
||||||
|
getMockApplicationAttemptId(1, 1);
|
||||||
|
schApp.setCurrentAppAttempt(new FiCaSchedulerApp(appAttemptId1,
|
||||||
|
null, scheduler.getQueue("default"), null, rm.getRMContext()));
|
||||||
|
|
||||||
|
//start to allocate
|
||||||
|
am.allocate(
|
||||||
|
Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
|
||||||
|
"*", Resources.createResource(1 * GB), 2)), null);
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testNodeRemovalDuringAllocate() throws Exception {
|
public void testNodeRemovalDuringAllocate() throws Exception {
|
||||||
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
|
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
|
||||||
|
|
Loading…
Reference in New Issue