YARN-6714. IllegalStateException while handling APP_ATTEMPT_REMOVED event when async-scheduling enabled in CapacityScheduler. Contributed by Tao Yang.

This commit is contained in:
Sunil G 2017-07-11 14:52:44 +05:30
parent fce7951014
commit 34f113df5c
2 changed files with 153 additions and 1 deletions

View File

@ -2392,7 +2392,10 @@ public class CapacityScheduler extends
if (attemptId != null) {
FiCaSchedulerApp app = getApplicationAttempt(attemptId);
if (app != null) {
// Required sanity check for attemptId - when async-scheduling enabled,
// proposal might be outdated if AM failover just finished
// and proposal queue was not be consumed in time
if (app != null && attemptId.equals(app.getApplicationAttemptId())) {
if (app.accept(cluster, request)) {
app.apply(cluster, request);
LOG.info("Allocation proposal accepted");

View File

@ -18,6 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@ -25,12 +31,29 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class TestCapacitySchedulerAsyncScheduling {
@ -140,4 +163,130 @@ public class TestCapacitySchedulerAsyncScheduling {
rm.close();
}
// Testcase for YARN-6714
@Test (timeout = 30000)
public void testCommitProposalForFailedAppAttempt()
throws Exception {
// disable async-scheduling for simulating complex since scene
Configuration disableAsyncConf = new Configuration(conf);
disableAsyncConf.setBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
// init RM & NMs & Nodes
final MockRM rm = new MockRM(disableAsyncConf);
rm.start();
final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB);
List<MockNM> nmLst = new ArrayList<>();
nmLst.add(nm1);
nmLst.add(nm2);
// init scheduler & nodes
while (
((CapacityScheduler) rm.getRMContext().getScheduler()).getNodeTracker()
.nodeCount() < 2) {
Thread.sleep(10);
}
Assert.assertEquals(2,
((AbstractYarnScheduler) rm.getRMContext().getScheduler())
.getNodeTracker().nodeCount());
CapacityScheduler scheduler =
(CapacityScheduler) rm.getRMContext().getScheduler();
SchedulerNode sn1 = scheduler.getSchedulerNode(nm1.getNodeId());
SchedulerNode sn2 = scheduler.getSchedulerNode(nm2.getNodeId());
// launch app
RMApp app = rm.submitApp(200, "app", "user", null, false, "default",
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
FiCaSchedulerApp schedulerApp =
scheduler.getApplicationAttempt(am.getApplicationAttemptId());
// allocate and launch 1 containers and running on nm2
allocateAndLaunchContainers(am, nm2, rm, 1,
Resources.createResource(5 * GB), 0, 2);
// nm1 runs 1 container(app1-container_01/AM)
// nm2 runs 1 container(app1-container_02)
Assert.assertEquals(1, sn1.getNumContainers());
Assert.assertEquals(1, sn2.getNumContainers());
// kill app attempt1
scheduler.handle(
new AppAttemptRemovedSchedulerEvent(am.getApplicationAttemptId(),
RMAppAttemptState.KILLED, true));
// wait until app attempt1 removed on nm1
while (sn1.getCopiedListOfRunningContainers().size() == 1) {
Thread.sleep(100);
}
// wait until app attempt2 launched on nm1
while (sn1.getCopiedListOfRunningContainers().size() == 0) {
nm1.nodeHeartbeat(true);
Thread.sleep(100);
}
// generate reserved proposal of stopped app attempt
// and it could be committed for async-scheduling
// this kind of proposal should be skipped
Resource reservedResource = Resources.createResource(5 * GB);
Container container = Container.newInstance(
ContainerId.newContainerId(am.getApplicationAttemptId(), 3),
sn2.getNodeID(), sn2.getHttpAddress(), reservedResource,
Priority.newInstance(0), null);
RMContainer rmContainer = new RMContainerImpl(container, SchedulerRequestKey
.create(ResourceRequest
.newInstance(Priority.newInstance(0), "*", reservedResource, 1)),
am.getApplicationAttemptId(), sn2.getNodeID(), "user",
rm.getRMContext());
SchedulerContainer reservedContainer =
new SchedulerContainer(schedulerApp, scheduler.getNode(sn2.getNodeID()),
rmContainer, "", false);
ContainerAllocationProposal reservedForAttempt1Proposal =
new ContainerAllocationProposal(reservedContainer, null,
reservedContainer, NodeType.OFF_SWITCH, NodeType.OFF_SWITCH,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, reservedResource);
List<ContainerAllocationProposal> reservedProposals = new ArrayList<>();
reservedProposals.add(reservedForAttempt1Proposal);
ResourceCommitRequest request =
new ResourceCommitRequest(null, reservedProposals, null);
scheduler.tryCommit(scheduler.getClusterResource(), request);
Assert.assertNull("Outdated proposal should not be accepted!",
sn2.getReservedContainer());
rm.stop();
}
private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
int nContainer, Resource resource, int priority, int startContainerId)
throws Exception {
am.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.newInstance(priority), "*", resource,
nContainer)), null);
ContainerId lastContainerId = ContainerId
.newContainerId(am.getApplicationAttemptId(),
startContainerId + nContainer - 1);
Assert.assertTrue(
rm.waitForState(nm, lastContainerId, RMContainerState.ALLOCATED));
// Acquire them, and NM report RUNNING
am.allocate(null, null);
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
for (int cId = startContainerId;
cId < startContainerId + nContainer; cId++) {
ContainerId containerId =
ContainerId.newContainerId(am.getApplicationAttemptId(), cId);
RMContainer rmContainer = cs.getRMContainer(containerId);
if (rmContainer != null) {
rmContainer.handle(
new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
} else {
Assert.fail("Cannot find RMContainer");
}
rm.waitForState(nm,
ContainerId.newContainerId(am.getApplicationAttemptId(), cId),
RMContainerState.RUNNING);
}
}
}