YARN-75. Modified ResourceManager's RMContainer to handle a valid RELEASE event at RUNNING state. Contributed by Siddharth Seth.

svn merge -r 1381806:1381809 --ignore-ancestry ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1381816 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-09-07 00:21:36 +00:00
parent f5f0349cc5
commit 445a340593
4 changed files with 252 additions and 6 deletions

View File

@ -65,6 +65,9 @@ Release 2.1.0-alpha - Unreleased
YARN-15. Updated default classpath for YARN applications to reflect split of YARN-15. Updated default classpath for YARN applications to reflect split of
YARN into a sub-project. (Arun C Murthy via vinodkv) YARN into a sub-project. (Arun C Murthy via vinodkv)
YARN-75. Modified ResourceManager's RMContainer to handle a valid RELEASE
event at RUNNING state. (Siddharth Seth via vinodkv)
Release 0.23.4 - UNRELEASED Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -152,9 +152,9 @@ public interface AllocateRequest {
void clearAsks(); void clearAsks();
/** /**
* Get the list of <code>ContainerId</code> of unused containers being * Get the list of <code>ContainerId</code> of containers being
* released by the <code>ApplicationMaster</code>. * released by the <code>ApplicationMaster</code>.
* @return list of <code>ContainerId</code> of unused containers being * @return list of <code>ContainerId</code> of containers being
* released by the <code>ApplicationMaster</code> * released by the <code>ApplicationMaster</code>
*/ */
@Public @Public
@ -170,9 +170,9 @@ public interface AllocateRequest {
int getReleaseCount(); int getReleaseCount();
/** /**
* Add the list of <code>ContainerId</code> of unused containers being * Add the list of <code>ContainerId</code> of containers being
* released by the <code>ApplicationMaster</code> * released by the <code>ApplicationMaster</code>
* @param releaseContainers list of <code>ContainerId</code> of unused * @param releaseContainers list of <code>ContainerId</code> of
* containers being released by the < * containers being released by the <
* code>ApplicationMaster</code> * code>ApplicationMaster</code>
*/ */

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.state.StateMachineFactory;
@SuppressWarnings({"unchecked", "rawtypes"})
public class RMContainerImpl implements RMContainer { public class RMContainerImpl implements RMContainer {
private static final Log LOG = LogFactory.getLog(RMContainerImpl.class); private static final Log LOG = LogFactory.getLog(RMContainerImpl.class);
@ -95,6 +96,8 @@ public class RMContainerImpl implements RMContainer {
RMContainerEventType.FINISHED, new FinishedTransition()) RMContainerEventType.FINISHED, new FinishedTransition())
.addTransition(RMContainerState.RUNNING, RMContainerState.KILLED, .addTransition(RMContainerState.RUNNING, RMContainerState.KILLED,
RMContainerEventType.KILL, new KillTransition()) RMContainerEventType.KILL, new KillTransition())
.addTransition(RMContainerState.RUNNING, RMContainerState.RELEASED,
RMContainerEventType.RELEASED, new KillTransition())
// Transitions from COMPLETED state // Transitions from COMPLETED state
.addTransition(RMContainerState.COMPLETED, RMContainerState.COMPLETED, .addTransition(RMContainerState.COMPLETED, RMContainerState.COMPLETED,
@ -106,11 +109,13 @@ public class RMContainerImpl implements RMContainer {
// Transitions from RELEASED state // Transitions from RELEASED state
.addTransition(RMContainerState.RELEASED, RMContainerState.RELEASED, .addTransition(RMContainerState.RELEASED, RMContainerState.RELEASED,
EnumSet.of(RMContainerEventType.RELEASED, RMContainerEventType.KILL)) EnumSet.of(RMContainerEventType.RELEASED, RMContainerEventType.KILL,
RMContainerEventType.FINISHED))
// Transitions from KILLED state // Transitions from KILLED state
.addTransition(RMContainerState.KILLED, RMContainerState.KILLED, .addTransition(RMContainerState.KILLED, RMContainerState.KILLED,
EnumSet.of(RMContainerEventType.RELEASED, RMContainerEventType.KILL)) EnumSet.of(RMContainerEventType.RELEASED, RMContainerEventType.KILL,
RMContainerEventType.FINISHED))
// create the topology tables // create the topology tables
.installTopology(); .installTopology();

View File

@ -0,0 +1,238 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@SuppressWarnings({ "unchecked", "rawtypes" })
public class TestRMContainerImpl {
@Test
public void testReleaseWhileRunning() {
DrainDispatcher drainDispatcher = new DrainDispatcher();
EventHandler eventHandler = drainDispatcher.getEventHandler();
EventHandler<RMAppAttemptEvent> appAttemptEventHandler = mock(EventHandler.class);
EventHandler generic = mock(EventHandler.class);
drainDispatcher.register(RMAppAttemptEventType.class,
appAttemptEventHandler);
drainDispatcher.register(RMNodeEventType.class, generic);
drainDispatcher.init(new YarnConfiguration());
drainDispatcher.start();
NodeId nodeId = BuilderUtils.newNodeId("host", 3425);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class);
Resource resource = BuilderUtils.newResource(512);
Priority priority = BuilderUtils.newPriority(5);
Container container = BuilderUtils.newContainer(containerId, nodeId,
"host:3465", resource, priority, null);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
nodeId, eventHandler, expirer);
assertEquals(RMContainerState.NEW, rmContainer.getState());
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START));
drainDispatcher.await();
assertEquals(RMContainerState.ALLOCATED, rmContainer.getState());
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.ACQUIRED));
drainDispatcher.await();
assertEquals(RMContainerState.ACQUIRED, rmContainer.getState());
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.LAUNCHED));
drainDispatcher.await();
assertEquals(RMContainerState.RUNNING, rmContainer.getState());
// In RUNNING state. Verify RELEASED and associated actions.
reset(appAttemptEventHandler);
ContainerStatus containerStatus = SchedulerUtils
.createAbnormalContainerStatus(containerId,
SchedulerUtils.RELEASED_CONTAINER);
rmContainer.handle(new RMContainerFinishedEvent(containerId,
containerStatus, RMContainerEventType.RELEASED));
drainDispatcher.await();
assertEquals(RMContainerState.RELEASED, rmContainer.getState());
ArgumentCaptor<RMAppAttemptContainerFinishedEvent> captor = ArgumentCaptor
.forClass(RMAppAttemptContainerFinishedEvent.class);
verify(appAttemptEventHandler).handle(captor.capture());
RMAppAttemptContainerFinishedEvent cfEvent = captor.getValue();
assertEquals(appAttemptId, cfEvent.getApplicationAttemptId());
assertEquals(containerStatus, cfEvent.getContainerStatus());
assertEquals(RMAppAttemptEventType.CONTAINER_FINISHED, cfEvent.getType());
// In RELEASED state. A FINIHSED event may come in.
rmContainer.handle(new RMContainerFinishedEvent(containerId, SchedulerUtils
.createAbnormalContainerStatus(containerId, "FinishedContainer"),
RMContainerEventType.FINISHED));
assertEquals(RMContainerState.RELEASED, rmContainer.getState());
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@SuppressWarnings({ "unchecked", "rawtypes" })
public class TestRMContainerImpl {
@Test
public void testReleaseWhileRunning() {
DrainDispatcher drainDispatcher = new DrainDispatcher();
EventHandler eventHandler = drainDispatcher.getEventHandler();
EventHandler<RMAppAttemptEvent> appAttemptEventHandler = mock(EventHandler.class);
EventHandler generic = mock(EventHandler.class);
drainDispatcher.register(RMAppAttemptEventType.class,
appAttemptEventHandler);
drainDispatcher.register(RMNodeEventType.class, generic);
drainDispatcher.init(new YarnConfiguration());
drainDispatcher.start();
NodeId nodeId = BuilderUtils.newNodeId("host", 3425);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class);
Resource resource = BuilderUtils.newResource(512);
Priority priority = BuilderUtils.newPriority(5);
Container container = BuilderUtils.newContainer(containerId, nodeId,
"host:3465", resource, priority, null);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
nodeId, eventHandler, expirer);
assertEquals(RMContainerState.NEW, rmContainer.getState());
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START));
drainDispatcher.await();
assertEquals(RMContainerState.ALLOCATED, rmContainer.getState());
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.ACQUIRED));
drainDispatcher.await();
assertEquals(RMContainerState.ACQUIRED, rmContainer.getState());
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.LAUNCHED));
drainDispatcher.await();
assertEquals(RMContainerState.RUNNING, rmContainer.getState());
// In RUNNING state. Verify RELEASED and associated actions.
reset(appAttemptEventHandler);
ContainerStatus containerStatus = SchedulerUtils
.createAbnormalContainerStatus(containerId,
SchedulerUtils.RELEASED_CONTAINER);
rmContainer.handle(new RMContainerFinishedEvent(containerId,
containerStatus, RMContainerEventType.RELEASED));
drainDispatcher.await();
assertEquals(RMContainerState.RELEASED, rmContainer.getState());
ArgumentCaptor<RMAppAttemptContainerFinishedEvent> captor = ArgumentCaptor
.forClass(RMAppAttemptContainerFinishedEvent.class);
verify(appAttemptEventHandler).handle(captor.capture());
RMAppAttemptContainerFinishedEvent cfEvent = captor.getValue();
assertEquals(appAttemptId, cfEvent.getApplicationAttemptId());
assertEquals(containerStatus, cfEvent.getContainerStatus());
assertEquals(RMAppAttemptEventType.CONTAINER_FINISHED, cfEvent.getType());
// In RELEASED state. A FINIHSED event may come in.
rmContainer.handle(new RMContainerFinishedEvent(containerId, SchedulerUtils
.createAbnormalContainerStatus(containerId, "FinishedContainer"),
RMContainerEventType.FINISHED));
assertEquals(RMContainerState.RELEASED, rmContainer.getState());
}
}