YARN-3933. FairScheduler: Multiple calls to completedContainer are not safe. (Shiwei Guo and Miklos Szegedi via kasha)

(cherry picked from commit 646c6d6509)
This commit is contained in:
Karthik Kambatla 2017-02-13 11:26:30 -08:00
parent 12ad077aa6
commit 30ff5bff1a
2 changed files with 61 additions and 3 deletions

View File

@ -139,6 +139,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
Container container = rmContainer.getContainer();
ContainerId containerId = container.getId();
// Remove from the list of containers
if (liveContainers.remove(containerId) == null) {
LOG.info("Additional complete request on completed container " +
rmContainer.getContainerId());
return;
}
// Remove from the list of newly allocated containers if found
newlyAllocatedContainers.remove(rmContainer);
@ -150,8 +157,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
+ " in state: " + rmContainer.getState() + " event:" + event);
}
// Remove from the list of containers
liveContainers.remove(rmContainer.getContainerId());
untrackContainerForPreemption(rmContainer);
Resource containerResource = rmContainer.getContainer().getResource();

View File

@ -90,12 +90,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@ -3543,7 +3545,58 @@ public class TestFairScheduler extends FairSchedulerTestBase {
verifyAppRunnable(attId5, false);
verifyQueueNumRunnable("queue1", 2, 1);
}
@Test
public void testMultipleCompletedEvent() throws Exception {
// Set up a fair scheduler
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queue1\">");
out.println("<maxAMShare>0.2</maxAMShare>");
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Create a node
RMNode node =
MockNodes.newNodeInfo(1, Resources.createResource(20480, 20),
0, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
scheduler.handle(nodeEvent);
scheduler.update();
// Launch an app
ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
createApplicationWithAMResource(
attId1, "queue1", "user1",
Resource.newInstance(1024, 1));
createSchedulingRequestExistingApplication(
1024, 1,
RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(), attId1);
FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
scheduler.update();
scheduler.handle(updateEvent);
RMContainer container = app1.getLiveContainersMap().
values().iterator().next();
scheduler.completedContainer(container, SchedulerUtils
.createAbnormalContainerStatus(container.getContainerId(),
SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
scheduler.completedContainer(container, SchedulerUtils
.createAbnormalContainerStatus(container.getContainerId(),
SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.FINISHED);
assertEquals(Resources.none(), app1.getResourceUsage());
}
@Test
public void testQueueMaxAMShare() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);