YARN-3535. Scheduler must re-request container resources when RMContainer transitions from ALLOCATED to KILLED (rohithsharma and peng.zhang via asuresh)
(cherry picked from commit 9b272ccae7
)
This commit is contained in:
parent
a7de3cde13
commit
8018041b49
|
@ -595,6 +595,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-3885. ProportionalCapacityPreemptionPolicy doesn't preempt if queue is
|
||||
more than 2 level. (Ajith S via wangda)
|
||||
|
||||
YARN-3535. Scheduler must re-request container resources when RMContainer transitions
|
||||
from ALLOCATED to KILLED (rohithsharma and peng.zhang via asuresh)
|
||||
|
||||
Release 2.7.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
|
||||
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
|
||||
import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
||||
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
||||
|
@ -94,7 +95,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
|
|||
.addTransition(RMContainerState.ALLOCATED, RMContainerState.EXPIRED,
|
||||
RMContainerEventType.EXPIRE, new FinishedTransition())
|
||||
.addTransition(RMContainerState.ALLOCATED, RMContainerState.KILLED,
|
||||
RMContainerEventType.KILL, new FinishedTransition())
|
||||
RMContainerEventType.KILL, new ContainerRescheduledTransition())
|
||||
|
||||
// Transitions from ACQUIRED state
|
||||
.addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING,
|
||||
|
@ -495,6 +496,17 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
|
|||
}
|
||||
}
|
||||
|
||||
private static final class ContainerRescheduledTransition extends
|
||||
FinishedTransition {
|
||||
|
||||
@Override
|
||||
public void transition(RMContainerImpl container, RMContainerEvent event) {
|
||||
// Tell scheduler to recover request of this container to app
|
||||
container.eventHandler.handle(new ContainerRescheduledEvent(container));
|
||||
super.transition(container, event);
|
||||
}
|
||||
}
|
||||
|
||||
private static class FinishedTransition extends BaseTransition {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -108,6 +108,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
|
@ -1370,6 +1371,14 @@ public class CapacityScheduler extends
|
|||
killContainer(containerToBeKilled);
|
||||
}
|
||||
break;
|
||||
case CONTAINER_RESCHEDULED:
|
||||
{
|
||||
ContainerRescheduledEvent containerRescheduledEvent =
|
||||
(ContainerRescheduledEvent) event;
|
||||
RMContainer container = containerRescheduledEvent.getContainer();
|
||||
recoverResourceRequestForContainer(container);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
|
||||
}
|
||||
|
@ -1543,7 +1552,6 @@ public class CapacityScheduler extends
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("KILL_CONTAINER: container" + cont.toString());
|
||||
}
|
||||
recoverResourceRequestForContainer(cont);
|
||||
completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus(
|
||||
cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER),
|
||||
RMContainerEventType.KILL);
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
|
||||
public class ContainerRescheduledEvent extends SchedulerEvent {
|
||||
|
||||
private RMContainer container;
|
||||
|
||||
public ContainerRescheduledEvent(RMContainer container) {
|
||||
super(SchedulerEventType.CONTAINER_RESCHEDULED);
|
||||
this.container = container;
|
||||
}
|
||||
|
||||
public RMContainer getContainer() {
|
||||
return container;
|
||||
}
|
||||
}
|
|
@ -38,6 +38,9 @@ public enum SchedulerEventType {
|
|||
// Source: ContainerAllocationExpirer
|
||||
CONTAINER_EXPIRED,
|
||||
|
||||
// Source: RMContainer
|
||||
CONTAINER_RESCHEDULED,
|
||||
|
||||
// Source: SchedulingEditPolicy
|
||||
DROP_RESERVATION,
|
||||
PREEMPT_CONTAINER,
|
||||
|
|
|
@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
|
||||
|
@ -462,7 +463,6 @@ public class FairScheduler extends
|
|||
SchedulerUtils.createPreemptedContainerStatus(
|
||||
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
|
||||
|
||||
recoverResourceRequestForContainer(container);
|
||||
// TODO: Not sure if this ever actually adds this to the list of cleanup
|
||||
// containers on the RMNode (see SchedulerNode.releaseContainer()).
|
||||
completedContainer(container, status, RMContainerEventType.KILL);
|
||||
|
@ -1236,6 +1236,15 @@ public class FairScheduler extends
|
|||
SchedulerUtils.EXPIRED_CONTAINER),
|
||||
RMContainerEventType.EXPIRE);
|
||||
break;
|
||||
case CONTAINER_RESCHEDULED:
|
||||
if (!(event instanceof ContainerRescheduledEvent)) {
|
||||
throw new RuntimeException("Unexpected event type: " + event);
|
||||
}
|
||||
ContainerRescheduledEvent containerRescheduledEvent =
|
||||
(ContainerRescheduledEvent) event;
|
||||
RMContainer container = containerRescheduledEvent.getContainer();
|
||||
recoverResourceRequestForContainer(container);
|
||||
break;
|
||||
default:
|
||||
LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
|
||||
}
|
||||
|
|
|
@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
|
||||
|
@ -846,6 +847,14 @@ public class FifoScheduler extends
|
|||
RMContainerEventType.EXPIRE);
|
||||
}
|
||||
break;
|
||||
case CONTAINER_RESCHEDULED:
|
||||
{
|
||||
ContainerRescheduledEvent containerRescheduledEvent =
|
||||
(ContainerRescheduledEvent) event;
|
||||
RMContainer container = containerRescheduledEvent.getContainer();
|
||||
recoverResourceRequestForContainer(container);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
|
||||
}
|
||||
|
|
|
@ -247,7 +247,7 @@ public class TestAMRestart {
|
|||
private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt)
|
||||
throws InterruptedException {
|
||||
int count = 0;
|
||||
while (attempt.getJustFinishedContainers().size() != expectedNum
|
||||
while (attempt.getJustFinishedContainers().size() < expectedNum
|
||||
&& count < 500) {
|
||||
Thread.sleep(100);
|
||||
count++;
|
||||
|
|
|
@ -22,14 +22,20 @@ import static org.mockito.Mockito.mock;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
|
@ -40,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
|
@ -403,6 +410,89 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testResourceRequestRestoreWhenRMContainerIsAtAllocated()
|
||||
throws Exception {
|
||||
configureScheduler();
|
||||
YarnConfiguration conf = getConf();
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
try {
|
||||
rm1.start();
|
||||
RMApp app1 =
|
||||
rm1.submitApp(200, "name", "user",
|
||||
new HashMap<ApplicationAccessType, String>(), false, "default",
|
||||
-1, null, "Test", false, true);
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
||||
MockNM nm2 =
|
||||
new MockNM("127.0.0.1:2351", 10240, rm1.getResourceTrackerService());
|
||||
nm2.registerNode();
|
||||
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
|
||||
int NUM_CONTAINERS = 1;
|
||||
// allocate NUM_CONTAINERS containers
|
||||
am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
|
||||
new ArrayList<ContainerId>());
|
||||
nm1.nodeHeartbeat(true);
|
||||
|
||||
// wait for containers to be allocated.
|
||||
List<Container> containers =
|
||||
am1.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>()).getAllocatedContainers();
|
||||
while (containers.size() != NUM_CONTAINERS) {
|
||||
nm1.nodeHeartbeat(true);
|
||||
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>()).getAllocatedContainers());
|
||||
Thread.sleep(200);
|
||||
}
|
||||
|
||||
// launch the 2nd container, for testing running container transferred.
|
||||
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2,
|
||||
ContainerState.RUNNING);
|
||||
ContainerId containerId2 =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
|
||||
|
||||
// 3rd container is in Allocated state.
|
||||
am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
|
||||
new ArrayList<ContainerId>());
|
||||
nm2.nodeHeartbeat(true);
|
||||
ContainerId containerId3 =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
|
||||
rm1.waitForContainerAllocated(nm2, containerId3);
|
||||
rm1.waitForState(nm2, containerId3, RMContainerState.ALLOCATED);
|
||||
|
||||
// NodeManager restart
|
||||
nm2.registerNode();
|
||||
|
||||
// NM restart kills all allocated and running containers.
|
||||
rm1.waitForState(nm2, containerId3, RMContainerState.KILLED);
|
||||
|
||||
// The killed RMContainer request should be restored. In successive
|
||||
// nodeHeartBeats AM should be able to get container allocated.
|
||||
containers =
|
||||
am1.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>()).getAllocatedContainers();
|
||||
while (containers.size() != NUM_CONTAINERS) {
|
||||
nm2.nodeHeartbeat(true);
|
||||
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>()).getAllocatedContainers());
|
||||
Thread.sleep(200);
|
||||
}
|
||||
|
||||
nm2.nodeHeartbeat(am1.getApplicationAttemptId(), 4,
|
||||
ContainerState.RUNNING);
|
||||
ContainerId containerId4 =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 4);
|
||||
rm1.waitForState(nm2, containerId4, RMContainerState.RUNNING);
|
||||
} finally {
|
||||
rm1.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyMaximumResourceCapability(
|
||||
Resource expectedMaximumResource, AbstractYarnScheduler scheduler) {
|
||||
|
||||
|
|
|
@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSch
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
|
@ -4453,6 +4454,9 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
// preempt now
|
||||
scheduler.warnOrKillContainer(rmContainer);
|
||||
|
||||
// Trigger container rescheduled event
|
||||
scheduler.handle(new ContainerRescheduledEvent(rmContainer));
|
||||
|
||||
List<ResourceRequest> requests = rmContainer.getResourceRequests();
|
||||
// Once recovered, resource request will be present again in app
|
||||
Assert.assertEquals(3, requests.size());
|
||||
|
|
Loading…
Reference in New Issue