From 445a340593172ac7ef7e6257aa5edbb46dbbb010 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Fri, 7 Sep 2012 00:21:36 +0000 Subject: [PATCH] YARN-75. Modified ResourceManager's RMContainer to handle a valid RELEASE event at RUNNING state. Contributed by Siddharth Seth. svn merge -r 1381806:1381809 --ignore-ancestry ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1381816 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../api/protocolrecords/AllocateRequest.java | 8 +- .../rmcontainer/RMContainerImpl.java | 9 +- .../rmcontainer/TestRMContainerImpl.java | 238 ++++++++++++++++++ 4 files changed, 252 insertions(+), 6 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 6f3cabc9ceb..280755b8cb9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -65,6 +65,9 @@ Release 2.1.0-alpha - Unreleased YARN-15. Updated default classpath for YARN applications to reflect split of YARN into a sub-project. (Arun C Murthy via vinodkv) + YARN-75. Modified ResourceManager's RMContainer to handle a valid RELEASE + event at RUNNING state. (Siddharth Seth via vinodkv) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java index b3727a3f5c4..a5e50af6923 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java @@ -152,9 +152,9 @@ public interface AllocateRequest { void clearAsks(); /** - * Get the list of ContainerId of unused containers being + * Get the list of ContainerId of containers being * released by the ApplicationMaster. - * @return list of ContainerId of unused containers being + * @return list of ContainerId of containers being * released by the ApplicationMaster */ @Public @@ -170,9 +170,9 @@ public interface AllocateRequest { int getReleaseCount(); /** - * Add the list of ContainerId of unused containers being + * Add the list of ContainerId of containers being * released by the ApplicationMaster - * @param releaseContainers list of ContainerId of unused + * @param releaseContainers list of ContainerId of * containers being released by the < * code>ApplicationMaster */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index d845edeb614..b93b333eb2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; +@SuppressWarnings({"unchecked", "rawtypes"}) public class RMContainerImpl implements RMContainer { private static final Log LOG = LogFactory.getLog(RMContainerImpl.class); @@ -95,6 +96,8 @@ public class RMContainerImpl implements RMContainer { RMContainerEventType.FINISHED, new FinishedTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.KILLED, RMContainerEventType.KILL, new KillTransition()) + .addTransition(RMContainerState.RUNNING, RMContainerState.RELEASED, + RMContainerEventType.RELEASED, new KillTransition()) // Transitions from COMPLETED state .addTransition(RMContainerState.COMPLETED, RMContainerState.COMPLETED, @@ -106,11 +109,13 @@ public class RMContainerImpl implements RMContainer { // Transitions from RELEASED state .addTransition(RMContainerState.RELEASED, RMContainerState.RELEASED, - EnumSet.of(RMContainerEventType.RELEASED, RMContainerEventType.KILL)) + EnumSet.of(RMContainerEventType.RELEASED, RMContainerEventType.KILL, + RMContainerEventType.FINISHED)) // Transitions from KILLED state .addTransition(RMContainerState.KILLED, RMContainerState.KILLED, - EnumSet.of(RMContainerEventType.RELEASED, RMContainerEventType.KILL)) + EnumSet.of(RMContainerEventType.RELEASED, RMContainerEventType.KILL, + RMContainerEventType.FINISHED)) // create the topology tables .installTopology(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java new file mode 100644 index 00000000000..a9083c6550a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +@SuppressWarnings({ "unchecked", "rawtypes" }) +public class TestRMContainerImpl { + + @Test + public void testReleaseWhileRunning() { + + DrainDispatcher drainDispatcher = new DrainDispatcher(); + EventHandler eventHandler = drainDispatcher.getEventHandler(); + EventHandler appAttemptEventHandler = mock(EventHandler.class); + EventHandler generic = mock(EventHandler.class); + drainDispatcher.register(RMAppAttemptEventType.class, + appAttemptEventHandler); + drainDispatcher.register(RMNodeEventType.class, generic); + drainDispatcher.init(new YarnConfiguration()); + drainDispatcher.start(); + NodeId nodeId = BuilderUtils.newNodeId("host", 3425); + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 1); + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); + ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class); + + Resource resource = BuilderUtils.newResource(512); + Priority priority = BuilderUtils.newPriority(5); + + Container container = BuilderUtils.newContainer(containerId, nodeId, + "host:3465", resource, priority, null); + + RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + nodeId, eventHandler, expirer); + + assertEquals(RMContainerState.NEW, rmContainer.getState()); + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.START)); + drainDispatcher.await(); + assertEquals(RMContainerState.ALLOCATED, rmContainer.getState()); + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.ACQUIRED)); + drainDispatcher.await(); + assertEquals(RMContainerState.ACQUIRED, rmContainer.getState()); + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.LAUNCHED)); + drainDispatcher.await(); + assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + + // In RUNNING state. Verify RELEASED and associated actions. + reset(appAttemptEventHandler); + ContainerStatus containerStatus = SchedulerUtils + .createAbnormalContainerStatus(containerId, + SchedulerUtils.RELEASED_CONTAINER); + rmContainer.handle(new RMContainerFinishedEvent(containerId, + containerStatus, RMContainerEventType.RELEASED)); + drainDispatcher.await(); + assertEquals(RMContainerState.RELEASED, rmContainer.getState()); + + ArgumentCaptor captor = ArgumentCaptor + .forClass(RMAppAttemptContainerFinishedEvent.class); + verify(appAttemptEventHandler).handle(captor.capture()); + RMAppAttemptContainerFinishedEvent cfEvent = captor.getValue(); + assertEquals(appAttemptId, cfEvent.getApplicationAttemptId()); + assertEquals(containerStatus, cfEvent.getContainerStatus()); + assertEquals(RMAppAttemptEventType.CONTAINER_FINISHED, cfEvent.getType()); + + // In RELEASED state. A FINIHSED event may come in. + rmContainer.handle(new RMContainerFinishedEvent(containerId, SchedulerUtils + .createAbnormalContainerStatus(containerId, "FinishedContainer"), + RMContainerEventType.FINISHED)); + assertEquals(RMContainerState.RELEASED, rmContainer.getState()); + } + +} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +@SuppressWarnings({ "unchecked", "rawtypes" }) +public class TestRMContainerImpl { + + @Test + public void testReleaseWhileRunning() { + + DrainDispatcher drainDispatcher = new DrainDispatcher(); + EventHandler eventHandler = drainDispatcher.getEventHandler(); + EventHandler appAttemptEventHandler = mock(EventHandler.class); + EventHandler generic = mock(EventHandler.class); + drainDispatcher.register(RMAppAttemptEventType.class, + appAttemptEventHandler); + drainDispatcher.register(RMNodeEventType.class, generic); + drainDispatcher.init(new YarnConfiguration()); + drainDispatcher.start(); + NodeId nodeId = BuilderUtils.newNodeId("host", 3425); + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 1); + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); + ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class); + + Resource resource = BuilderUtils.newResource(512); + Priority priority = BuilderUtils.newPriority(5); + + Container container = BuilderUtils.newContainer(containerId, nodeId, + "host:3465", resource, priority, null); + + RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + nodeId, eventHandler, expirer); + + assertEquals(RMContainerState.NEW, rmContainer.getState()); + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.START)); + drainDispatcher.await(); + assertEquals(RMContainerState.ALLOCATED, rmContainer.getState()); + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.ACQUIRED)); + drainDispatcher.await(); + assertEquals(RMContainerState.ACQUIRED, rmContainer.getState()); + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.LAUNCHED)); + drainDispatcher.await(); + assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + + // In RUNNING state. Verify RELEASED and associated actions. + reset(appAttemptEventHandler); + ContainerStatus containerStatus = SchedulerUtils + .createAbnormalContainerStatus(containerId, + SchedulerUtils.RELEASED_CONTAINER); + rmContainer.handle(new RMContainerFinishedEvent(containerId, + containerStatus, RMContainerEventType.RELEASED)); + drainDispatcher.await(); + assertEquals(RMContainerState.RELEASED, rmContainer.getState()); + + ArgumentCaptor captor = ArgumentCaptor + .forClass(RMAppAttemptContainerFinishedEvent.class); + verify(appAttemptEventHandler).handle(captor.capture()); + RMAppAttemptContainerFinishedEvent cfEvent = captor.getValue(); + assertEquals(appAttemptId, cfEvent.getApplicationAttemptId()); + assertEquals(containerStatus, cfEvent.getContainerStatus()); + assertEquals(RMAppAttemptEventType.CONTAINER_FINISHED, cfEvent.getType()); + + // In RELEASED state. A FINIHSED event may come in. + rmContainer.handle(new RMContainerFinishedEvent(containerId, SchedulerUtils + .createAbnormalContainerStatus(containerId, "FinishedContainer"), + RMContainerEventType.FINISHED)); + assertEquals(RMContainerState.RELEASED, rmContainer.getState()); + } + +}