YARN-3535. Scheduler must re-request container resources when RMContainer transitions from ALLOCATED to KILLED (rohithsharma and peng.zhang via asuresh)

(cherry picked from commit 9b272ccae7)

Conflicts:
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
This commit is contained in:
Arun Suresh 2015-07-17 04:31:34 -07:00
parent 009f5fbad6
commit 63e4ada517
10 changed files with 194 additions and 4 deletions

View File

@ -26,6 +26,9 @@ Release 2.7.2 - UNRELEASED
YARN-3905. Application History Server UI NPEs when accessing apps run after
RM restart (Eric Payne via jeagles)
YARN-3535. Scheduler must re-request container resources when RMContainer transitions
from ALLOCATED to KILLED (rohithsharma and peng.zhang via asuresh)
Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
@ -93,7 +94,7 @@ public class RMContainerImpl implements RMContainer {
.addTransition(RMContainerState.ALLOCATED, RMContainerState.EXPIRED,
RMContainerEventType.EXPIRE, new FinishedTransition())
.addTransition(RMContainerState.ALLOCATED, RMContainerState.KILLED,
RMContainerEventType.KILL, new FinishedTransition())
RMContainerEventType.KILL, new ContainerRescheduledTransition())
// Transitions from ACQUIRED state
.addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING,
@ -485,6 +486,17 @@ public class RMContainerImpl implements RMContainer {
}
}
private static final class ContainerRescheduledTransition extends
FinishedTransition {
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
// Tell scheduler to recover request of this container to app
container.eventHandler.handle(new ContainerRescheduledEvent(container));
super.transition(container, event);
}
}
private static class FinishedTransition extends BaseTransition {
@Override

View File

@ -103,6 +103,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@ -1236,6 +1237,14 @@ public class CapacityScheduler extends
killContainer(containerToBeKilled);
}
break;
case CONTAINER_RESCHEDULED:
{
ContainerRescheduledEvent containerRescheduledEvent =
(ContainerRescheduledEvent) event;
RMContainer container = containerRescheduledEvent.getContainer();
recoverResourceRequestForContainer(container);
}
break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
@ -1400,7 +1409,6 @@ public class CapacityScheduler extends
if (LOG.isDebugEnabled()) {
LOG.debug("KILL_CONTAINER: container" + cont.toString());
}
recoverResourceRequestForContainer(cont);
completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus(
cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER),
RMContainerEventType.KILL);

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
public class ContainerRescheduledEvent extends SchedulerEvent {
private RMContainer container;
public ContainerRescheduledEvent(RMContainer container) {
super(SchedulerEventType.CONTAINER_RESCHEDULED);
this.container = container;
}
public RMContainer getContainer() {
return container;
}
}

View File

@ -38,6 +38,9 @@ public enum SchedulerEventType {
// Source: ContainerAllocationExpirer
CONTAINER_EXPIRED,
// Source: RMContainer
CONTAINER_RESCHEDULED,
// Source: SchedulingEditPolicy
DROP_RESERVATION,
PREEMPT_CONTAINER,

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@ -450,7 +451,6 @@ public class FairScheduler extends
SchedulerUtils.createPreemptedContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
recoverResourceRequestForContainer(container);
// TODO: Not sure if this ever actually adds this to the list of cleanup
// containers on the RMNode (see SchedulerNode.releaseContainer()).
completedContainer(container, status, RMContainerEventType.KILL);
@ -1246,6 +1246,15 @@ public class FairScheduler extends
SchedulerUtils.EXPIRED_CONTAINER),
RMContainerEventType.EXPIRE);
break;
case CONTAINER_RESCHEDULED:
if (!(event instanceof ContainerRescheduledEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
ContainerRescheduledEvent containerRescheduledEvent =
(ContainerRescheduledEvent) event;
RMContainer container = containerRescheduledEvent.getContainer();
recoverResourceRequestForContainer(container);
break;
default:
LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
}

View File

@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@ -837,6 +838,14 @@ public class FifoScheduler extends
RMContainerEventType.EXPIRE);
}
break;
case CONTAINER_RESCHEDULED:
{
ContainerRescheduledEvent containerRescheduledEvent =
(ContainerRescheduledEvent) event;
RMContainer container = containerRescheduledEvent.getContainer();
recoverResourceRequestForContainer(container);
}
break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}

View File

@ -247,7 +247,7 @@ public class TestAMRestart {
private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt)
throws InterruptedException {
int count = 0;
while (attempt.getJustFinishedContainers().size() != expectedNum
while (attempt.getJustFinishedContainers().size() < expectedNum
&& count < 500) {
Thread.sleep(100);
count++;

View File

@ -18,14 +18,37 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -341,6 +364,89 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
}
}
@Test(timeout = 60000)
public void testResourceRequestRestoreWhenRMContainerIsAtAllocated()
throws Exception {
configureScheduler();
YarnConfiguration conf = getConf();
MockRM rm1 = new MockRM(conf);
try {
rm1.start();
RMApp app1 =
rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default",
-1, null, "Test", false, true);
MockNM nm1 =
new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
nm1.registerNode();
MockNM nm2 =
new MockNM("127.0.0.1:2351", 10240, rm1.getResourceTrackerService());
nm2.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
int NUM_CONTAINERS = 1;
// allocate NUM_CONTAINERS containers
am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
// wait for containers to be allocated.
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.size() != NUM_CONTAINERS) {
nm1.nodeHeartbeat(true);
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(200);
}
// launch the 2nd container, for testing running container transferred.
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2,
ContainerState.RUNNING);
ContainerId containerId2 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
// 3rd container is in Allocated state.
am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
new ArrayList<ContainerId>());
nm2.nodeHeartbeat(true);
ContainerId containerId3 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
rm1.waitForContainerAllocated(nm2, containerId3);
rm1.waitForState(nm2, containerId3, RMContainerState.ALLOCATED);
// NodeManager restart
nm2.registerNode();
// NM restart kills all allocated and running containers.
rm1.waitForState(nm2, containerId3, RMContainerState.KILLED);
// The killed RMContainer request should be restored. In successive
// nodeHeartBeats AM should be able to get container allocated.
containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.size() != NUM_CONTAINERS) {
nm2.nodeHeartbeat(true);
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(200);
}
nm2.nodeHeartbeat(am1.getApplicationAttemptId(), 4,
ContainerState.RUNNING);
ContainerId containerId4 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 4);
rm1.waitForState(nm2, containerId4, RMContainerState.RUNNING);
} finally {
rm1.stop();
}
}
private void verifyMaximumResourceCapability(
Resource expectedMaximumResource, AbstractYarnScheduler scheduler) {

View File

@ -92,6 +92,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtil
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@ -4125,6 +4127,9 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// preempt now
scheduler.warnOrKillContainer(rmContainer);
// Trigger container rescheduled event
scheduler.handle(new ContainerRescheduledEvent(rmContainer));
List<ResourceRequest> requests = rmContainer.getResourceRequests();
// Once recovered, resource request will be present again in app
Assert.assertEquals(3, requests.size());