diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 99ecd9971f0..c1d351e0da6 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -71,6 +71,9 @@ Release 2.6.0 - UNRELEASED after RM recovery but before scheduler learns about apps and app-attempts. (Jian He via vinodkv) + YARN-2244. FairScheduler missing handling of containers for unknown + application attempts. (Anubhav Dhoot via kasha) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index b3e835a54d3..5764c8c7a65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -123,6 +123,23 @@ public abstract class AbstractYarnScheduler return maximumAllocation; } + protected void containerLaunchedOnNode(ContainerId containerId, + SchedulerNode node) { + // Get the application for the finished container + SchedulerApplicationAttempt application = getCurrentAttemptForContainer + (containerId); + if (application == null) { + LOG.info("Unknown application " + + containerId.getApplicationAttemptId().getApplicationId() + + " launched container " + containerId + " on node: " + node); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); + return; + } + + application.containerLaunchedOnNode(containerId, node.getNodeID()); + } + public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) { SchedulerApplication app = applications.get(applicationAttemptId.getApplicationId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6d26519ff98..26812388aaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -69,7 +69,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -866,21 +865,6 @@ public class CapacityScheduler extends } - private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) { - // Get the application for the finished container - FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId); - if (application == null) { - LOG.info("Unknown application " - + containerId.getApplicationAttemptId().getApplicationId() - + " launched container " + containerId + " on node: " + node); - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); - return; - } - - application.containerLaunchedOnNode(containerId, node.getNodeID()); - } - @Override public void handle(SchedulerEvent event) { switch(event.getType()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3a847ce7589..18ccf9d8a93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -928,22 +928,6 @@ public class FairScheduler extends } } - /** - * Process a container which has launched on a node, as reported by the node. - */ - private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) { - // Get the application for the finished container - FSSchedulerApp application = getCurrentAttemptForContainer(containerId); - if (application == null) { - LOG.info("Unknown application " - + containerId.getApplicationAttemptId().getApplicationId() - + " launched container " + containerId + " on node: " + node); - return; - } - - application.containerLaunchedOnNode(containerId, node.getNodeID()); - } - /** * Process a heartbeat update from a node. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 571d0558c04..518a8d9d3b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -66,7 +66,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; @@ -831,23 +830,6 @@ public class FifoScheduler extends } } - private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) { - // Get the application for the finished container - FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId); - if (application == null) { - LOG.info("Unknown application " - + containerId.getApplicationAttemptId().getApplicationId() - + " launched container " + containerId + " on node: " + node); - // Some unknown container sneaked into the system. Kill it. - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); - - return; - } - - application.containerLaunchedOnNode(containerId, node.getNodeID()); - } - @Lock(FifoScheduler.class) private synchronized void containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index 45ccd1c3016..e88ebd24f3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -232,20 +232,7 @@ public class TestApplicationCleanup { containerStatuses.put(app.getApplicationId(), containerStatusList); NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true); - dispatcher.await(); - List contsToClean = resp.getContainersToCleanup(); - int cleanedConts = contsToClean.size(); - waitCount = 0; - while (cleanedConts < 1 && waitCount++ < 200) { - LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts); - Thread.sleep(100); - resp = nm1.nodeHeartbeat(true); - dispatcher.await(); - contsToClean = resp.getContainersToCleanup(); - cleanedConts += contsToClean.size(); - } - LOG.info("Got cleanup for " + contsToClean.get(0)); - Assert.assertEquals(1, cleanedConts); + waitForContainerCleanup(dispatcher, nm1, resp); // Now to test the case when RM already gave cleanup, and NM suddenly // realizes that the container is running. @@ -258,26 +245,36 @@ public class TestApplicationCleanup { containerStatuses.put(app.getApplicationId(), containerStatusList); resp = nm1.nodeHeartbeat(containerStatuses, true); - dispatcher.await(); - contsToClean = resp.getContainersToCleanup(); - cleanedConts = contsToClean.size(); // The cleanup list won't be instantaneous as it is given out by scheduler // and not RMNodeImpl. - waitCount = 0; - while (cleanedConts < 1 && waitCount++ < 200) { - LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts); - Thread.sleep(100); - resp = nm1.nodeHeartbeat(true); - dispatcher.await(); - contsToClean = resp.getContainersToCleanup(); - cleanedConts += contsToClean.size(); - } - LOG.info("Got cleanup for " + contsToClean.get(0)); - Assert.assertEquals(1, cleanedConts); + waitForContainerCleanup(dispatcher, nm1, resp); rm.stop(); } - + + protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm, + NodeHeartbeatResponse resp) throws Exception { + int waitCount = 0, cleanedConts = 0; + List contsToClean; + do { + dispatcher.await(); + contsToClean = resp.getContainersToCleanup(); + cleanedConts += contsToClean.size(); + if (cleanedConts >= 1) { + break; + } + Thread.sleep(100); + resp = nm.nodeHeartbeat(true); + } while(waitCount++ < 200); + + if (contsToClean.isEmpty()) { + LOG.error("Failed to get any containers to cleanup"); + } else { + LOG.info("Got cleanup for " + contsToClean.get(0)); + } + Assert.assertEquals(1, cleanedConts); + } + private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId) throws Exception { while (true) { @@ -400,6 +397,58 @@ public class TestApplicationCleanup { rm2.stop(); } + @SuppressWarnings("resource") + @Test (timeout = 60000) + public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws + Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + final DrainDispatcher dispatcher = new DrainDispatcher(); + MockRM rm1 = new MockRM(conf, memStore) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING); + rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING); + + // start new RM + final DrainDispatcher dispatcher2 = new DrainDispatcher(); + MockRM rm2 = new MockRM(conf, memStore) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher2; + } + }; + rm2.start(); + + // nm1 register to rm2, and do a heartbeat + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1.registerNode(Arrays.asList(app0.getApplicationId())); + rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); + + // Add unknown container for application unknown to scheduler + NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0 + .getApplicationAttemptId(), 2, ContainerState.RUNNING); + + waitForContainerCleanup(dispatcher2, nm1, response); + + rm1.stop(); + rm2.stop(); + } + public static void main(String[] args) throws Exception { TestApplicationCleanup t = new TestApplicationCleanup(); t.testAppCleanup();