From fc06ae38cabd757d8afc24aaec02b07dbc71cdfe Mon Sep 17 00:00:00 2001 From: Jian He Date: Thu, 11 Feb 2016 10:06:27 +0800 Subject: [PATCH] YARN-4138. Roll back container resource allocation after resource increase token expires. Contributed by Meng Ding --- hadoop-yarn-project/CHANGES.txt | 4 + .../rmcontainer/AllocationExpirationInfo.java | 75 +++ .../ContainerAllocationExpirer.java | 11 +- .../rmcontainer/RMContainer.java | 2 + .../rmcontainer/RMContainerImpl.java | 132 ++++-- .../RMContainerNMDoneChangeResourceEvent.java | 37 ++ .../resourcemanager/rmnode/RMNodeImpl.java | 7 +- .../scheduler/AbstractYarnScheduler.java | 56 +-- .../scheduler/capacity/CapacityScheduler.java | 43 +- .../event/ContainerExpiredSchedulerEvent.java | 12 +- .../yarn/server/resourcemanager/MockNM.java | 20 + .../TestRMNodeTransitions.java | 18 +- .../rmcontainer/TestRMContainerImpl.java | 112 +---- .../capacity/TestContainerResizing.java | 44 +- .../TestIncreaseAllocationExpirer.java | 443 ++++++++++++++++++ .../scheduler/capacity/TestUtils.java | 15 +- 16 files changed, 804 insertions(+), 227 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/AllocationExpirationInfo.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerNMDoneChangeResourceEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 71bee1fb8d2..9fb012c007a 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -602,6 +602,10 @@ Release 2.8.0 - UNRELEASED YARN-4420. Add REST API for List Reservations. (Sean Po via curino) + YARN-4138. Roll back container resource allocation after resource + increase token expires. (Meng Ding via jianhe) + + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/AllocationExpirationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/AllocationExpirationInfo.java new file mode 100644 index 00000000000..f4fc72a5bc3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/AllocationExpirationInfo.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; + + +import org.apache.hadoop.yarn.api.records.ContainerId; + +public class AllocationExpirationInfo implements + Comparable { + + private final ContainerId containerId; + private final boolean increase; + + public AllocationExpirationInfo(ContainerId containerId) { + this(containerId, false); + } + + public AllocationExpirationInfo( + ContainerId containerId, boolean increase) { + this.containerId = containerId; + this.increase = increase; + } + + public ContainerId getContainerId() { + return this.containerId; + } + + public boolean isIncrease() { + return this.increase; + } + + @Override + public int hashCode() { + return (getContainerId().hashCode() << 16); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof AllocationExpirationInfo)) { + return false; + } + return compareTo((AllocationExpirationInfo)other) == 0; + } + + @Override + public int compareTo(AllocationExpirationInfo other) { + if (other == null) { + return -1; + } + // Only need to compare containerId. + return getContainerId().compareTo(other.getContainerId()); + } + + @Override + public String toString() { + return ""; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java index c393f4e6e35..d8198f41429 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -28,7 +27,7 @@ @SuppressWarnings({"unchecked", "rawtypes"}) public class ContainerAllocationExpirer extends - AbstractLivelinessMonitor { + AbstractLivelinessMonitor { private EventHandler dispatcher; @@ -47,7 +46,9 @@ public void serviceInit(Configuration conf) throws Exception { } @Override - protected void expire(ContainerId containerId) { - dispatcher.handle(new ContainerExpiredSchedulerEvent(containerId)); + protected void expire(AllocationExpirationInfo allocationExpirationInfo) { + dispatcher.handle(new ContainerExpiredSchedulerEvent( + allocationExpirationInfo.getContainerId(), + allocationExpirationInfo.isIncrease())); } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index dc0d9baa9b0..5d269311d83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -57,6 +57,8 @@ public interface RMContainer extends EventHandler { Resource getAllocatedResource(); + Resource getLastConfirmedResource(); + NodeId getAllocatedNode(); Priority getAllocatedPriority(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 83876d0a15c..16ab55df643 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; +import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -49,12 +50,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode + .RMNodeDecreaseContainerEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @SuppressWarnings({"unchecked", "rawtypes"}) @@ -119,9 +123,6 @@ RMContainerEventType.KILL, new KillTransition()) RMContainerEventType.RELEASED, new KillTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, RMContainerEventType.RESERVED, new ContainerReservedTransition()) - .addTransition(RMContainerState.RUNNING, RMContainerState.EXPIRED, - RMContainerEventType.EXPIRE, - new ContainerExpiredWhileRunningTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, @@ -177,7 +178,10 @@ RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition()) private List resourceRequests; private volatile boolean hasIncreaseReservation = false; - + // Only used for container resource increase and decrease. This is the + // resource to rollback to should container resource increase token expires. + private Resource lastConfirmedResource; + public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext) { @@ -210,6 +214,7 @@ public RMContainerImpl(Container container, this.isAMContainer = false; this.resourceRequests = null; this.nodeLabelExpression = nodeLabelExpression; + this.lastConfirmedResource = container.getResource(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); @@ -283,6 +288,16 @@ public Resource getAllocatedResource() { } } + @Override + public Resource getLastConfirmedResource() { + try { + readLock.lock(); + return this.lastConfirmedResource; + } finally { + readLock.unlock(); + } + } + @Override public NodeId getAllocatedNode() { return container.getNodeId(); @@ -525,7 +540,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { container.setResourceRequests(null); // Register with containerAllocationExpirer. - container.containerAllocationExpirer.register(container.getContainerId()); + container.containerAllocationExpirer.register( + new AllocationExpirationInfo(container.getContainerId())); // Tell the app container.eventHandler.handle(new RMAppRunningOnNodeEvent(container @@ -543,7 +559,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { if (acquiredEvent.isIncreasedContainer()) { // If container is increased but not acquired by AM, we will start // containerAllocationExpirer for this container in this transition. - container.containerAllocationExpirer.register(event.getContainerId()); + container.containerAllocationExpirer.register( + new AllocationExpirationInfo(event.getContainerId(), true)); } } } @@ -553,22 +570,65 @@ private static final class NMReportedContainerChangeIsDoneTransition @Override public void transition(RMContainerImpl container, RMContainerEvent event) { - // Unregister the allocation expirer, it is already increased.. - container.containerAllocationExpirer.unregister(event.getContainerId()); - } - } - - private static final class ContainerExpiredWhileRunningTransition extends - BaseTransition { + RMContainerNMDoneChangeResourceEvent nmDoneChangeResourceEvent = + (RMContainerNMDoneChangeResourceEvent)event; + Resource rmContainerResource = container.getAllocatedResource(); + Resource nmContainerResource = + nmDoneChangeResourceEvent.getNMContainerResource(); - @Override - public void transition(RMContainerImpl container, RMContainerEvent event) { - // When the container expired, and it has a pending increased request, we - // will kill the container. - // TODO, we can do better for this: roll back container resource to the - // resource before increase, and notify scheduler about this decrease as - // well. Will do that in a separated JIRA. - new KillTransition().transition(container, event); + if (Resources.equals(rmContainerResource, nmContainerResource)) { + // If rmContainerResource == nmContainerResource, the resource + // increase is confirmed. + // In this case: + // - Set the lastConfirmedResource as nmContainerResource + // - Unregister the allocation expirer + container.lastConfirmedResource = nmContainerResource; + container.containerAllocationExpirer.unregister( + new AllocationExpirationInfo(event.getContainerId())); + } else if (Resources.fitsIn(rmContainerResource, nmContainerResource)) { + // If rmContainerResource < nmContainerResource, this is caused by the + // following sequence: + // 1. AM asks for increase from 1G to 5G, and RM approves it + // 2. AM acquires the increase token and increases on NM + // 3. Before NM reports 5G to RM to confirm the increase, AM sends + // a decrease request to 4G, and RM approves it + // 4. When NM reports 5G to RM, RM now sees its own allocation as 4G + // In this cases: + // - Set the lastConfirmedResource as rmContainerResource + // - Unregister the allocation expirer + // - Notify NM to reduce its resource to rmContainerResource + container.lastConfirmedResource = rmContainerResource; + container.containerAllocationExpirer.unregister( + new AllocationExpirationInfo(event.getContainerId())); + container.eventHandler.handle(new RMNodeDecreaseContainerEvent( + container.nodeId, + Collections.singletonList(container.getContainer()))); + } else if (Resources.fitsIn(nmContainerResource, rmContainerResource)) { + // If nmContainerResource < rmContainerResource, this is caused by the + // following sequence: + // 1. AM asks for increase from 1G to 2G, and RM approves it + // 2. AM asks for increase from 2G to 4G, and RM approves it + // 3. AM only uses the 2G token to increase on NM, but never uses the + // 4G token + // 4. NM reports 2G to RM, but RM sees its own allocation as 4G + // In this case: + // - Set the lastConfirmedResource as the maximum of + // nmContainerResource and lastConfirmedResource + // - Do NOT unregister the allocation expirer + // When the increase allocation expires, resource will be rolled back to + // the last confirmed resource. + container.lastConfirmedResource = Resources.componentwiseMax( + nmContainerResource, container.lastConfirmedResource); + } else { + // Something wrong happened, kill the container + LOG.warn("Something wrong happened, container size reported by NM" + + " is not expected, ContainerID=" + container.containerId + + " rm-size-resource:" + rmContainerResource + " nm-size-reosurce:" + + nmContainerResource); + container.eventHandler.handle(new RMNodeCleanContainerEvent( + container.nodeId, container.containerId)); + + } } } @@ -577,20 +637,22 @@ private static final class ChangeResourceTransition extends BaseTransition { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { RMContainerChangeResourceEvent changeEvent = (RMContainerChangeResourceEvent)event; - - // Register with containerAllocationExpirer. - // For now, we assume timeout for increase is as same as container - // allocation. + + Resource targetResource = changeEvent.getTargetResource(); + Resource lastConfirmedResource = container.lastConfirmedResource; + if (!changeEvent.isIncrease()) { - // if this is a decrease request, if container was increased but not - // told to NM, we can consider previous increase is cancelled, - // unregister from the containerAllocationExpirer - container.containerAllocationExpirer.unregister(container - .getContainerId()); + // Only unregister from the containerAllocationExpirer when target + // resource is less than or equal to the last confirmed resource. + if (Resources.fitsIn(targetResource, lastConfirmedResource)) { + container.lastConfirmedResource = targetResource; + container.containerAllocationExpirer.unregister( + new AllocationExpirationInfo(event.getContainerId())); + } } - - container.container.setResource(changeEvent.getTargetResource()); - + + container.container.setResource(targetResource); + // We reach here means we either allocated increase reservation OR // decreased container, reservation will be cancelled anyway. container.hasIncreaseReservation = false; @@ -662,8 +724,8 @@ private static final class KillTransition extends FinishedTransition { public void transition(RMContainerImpl container, RMContainerEvent event) { // Unregister from containerAllocationExpirer. - container.containerAllocationExpirer.unregister(container - .getContainerId()); + container.containerAllocationExpirer.unregister( + new AllocationExpirationInfo(container.getContainerId())); // Inform node container.eventHandler.handle(new RMNodeCleanContainerEvent( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerNMDoneChangeResourceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerNMDoneChangeResourceEvent.java new file mode 100644 index 00000000000..362c8ea5a09 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerNMDoneChangeResourceEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; + +public class RMContainerNMDoneChangeResourceEvent extends RMContainerEvent { + + private final Resource nmContainerResource; + + public RMContainerNMDoneChangeResourceEvent( + ContainerId containerId, Resource nmContainerResource) { + super(containerId, RMContainerEventType.NM_DONE_CHANGE_RESOURCE); + this.nmContainerResource = nmContainerResource; + } + + public Resource getNMContainerResource() { + return nmContainerResource; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index b87a0428294..c44c385ee94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.AllocationExpirationInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -1308,14 +1309,16 @@ private void handleContainerStatus(List containerStatuses) { launchedContainers.add(containerId); newlyLaunchedContainers.add(remoteContainer); // Unregister from containerAllocationExpirer. - containerAllocationExpirer.unregister(containerId); + containerAllocationExpirer.unregister( + new AllocationExpirationInfo(containerId)); } } else { // A finished container launchedContainers.remove(containerId); completedContainers.add(remoteContainer); // Unregister from containerAllocationExpirer. - containerAllocationExpirer.unregister(containerId); + containerAllocationExpirer.unregister( + new AllocationExpirationInfo(containerId)); } } if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index b0cdd330f7e..851a8a8dab9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; import java.util.List; @@ -68,18 +67,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer + .RMContainerNMDoneChangeResourceEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; - import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.util.resource.Resources; - import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; @@ -245,7 +244,7 @@ protected synchronized void containerLaunchedOnNode( application.containerLaunchedOnNode(containerId, node.getNodeID()); } - protected synchronized void containerIncreasedOnNode(ContainerId containerId, + protected void containerIncreasedOnNode(ContainerId containerId, SchedulerNode node, Container increasedContainerReportedByNM) { // Get the application for the finished container SchedulerApplicationAttempt application = @@ -258,39 +257,18 @@ protected synchronized void containerIncreasedOnNode(ContainerId containerId, .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); return; } - - RMContainer rmContainer = getRMContainer(containerId); - Resource rmContainerResource = rmContainer.getAllocatedResource(); - Resource nmContainerResource = increasedContainerReportedByNM.getResource(); - - - if (Resources.equals(nmContainerResource, rmContainerResource)){ - // NM reported expected container size, tell RMContainer. Which will stop - // container expire monitor - rmContainer.handle(new RMContainerEvent(containerId, - RMContainerEventType.NM_DONE_CHANGE_RESOURCE)); - } else if (Resources.fitsIn(getResourceCalculator(), clusterResource, - nmContainerResource, rmContainerResource)) { - // when rmContainerResource >= nmContainerResource, we won't do anything, - // it is possible a container increased is issued by RM, but AM hasn't - // told NM. - } else if (Resources.fitsIn(getResourceCalculator(), clusterResource, - rmContainerResource, nmContainerResource)) { - // When rmContainerResource <= nmContainerResource, it could happen when a - // container decreased by RM before it is increased in NM. - - // Tell NM to decrease the container - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeDecreaseContainerEvent(node.getNodeID(), - Arrays.asList(rmContainer.getContainer()))); - } else { - // Something wrong happened, kill the container - LOG.warn("Something wrong happened, container size reported by NM" - + " is not expected, ContainerID=" + containerId - + " rm-size-resource:" + rmContainerResource + " nm-size-reosurce:" - + nmContainerResource); - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); + LeafQueue leafQueue = (LeafQueue) application.getQueue(); + synchronized (leafQueue) { + RMContainer rmContainer = getRMContainer(containerId); + if (rmContainer == null) { + // Some unknown container sneaked into the system. Kill it. + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent( + node.getNodeID(), containerId)); + return; + } + rmContainer.handle(new RMContainerNMDoneChangeResourceEvent( + containerId, increasedContainerReportedByNM.getResource())); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 9e1825ac68a..e13c6eb6392 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -104,6 +104,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; @@ -1389,11 +1390,15 @@ public void handle(SchedulerEvent event) { ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; ContainerId containerId = containerExpiredEvent.getContainerId(); - super.completedContainer(getRMContainer(containerId), - SchedulerUtils.createAbnormalContainerStatus( - containerId, - SchedulerUtils.EXPIRED_CONTAINER), - RMContainerEventType.EXPIRE); + if (containerExpiredEvent.isIncrease()) { + rollbackContainerResource(containerId); + } else { + completedContainer(getRMContainer(containerId), + SchedulerUtils.createAbnormalContainerStatus( + containerId, + SchedulerUtils.EXPIRED_CONTAINER), + RMContainerEventType.EXPIRE); + } } break; case KILL_RESERVED_CONTAINER: @@ -1495,7 +1500,33 @@ private synchronized void removeNode(RMNode nodeInfo) { LOG.info("Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " + clusterResource); } - + + private void rollbackContainerResource( + ContainerId containerId) { + RMContainer rmContainer = getRMContainer(containerId); + if (rmContainer == null) { + LOG.info("Cannot rollback resource for container " + containerId + + ". The container does not exist."); + return; + } + FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId); + if (application == null) { + LOG.info("Cannot rollback resource for container " + containerId + + ". The application that the container belongs to does not exist."); + return; + } + LOG.info("Roll back resource for container " + containerId); + LeafQueue leafQueue = (LeafQueue) application.getQueue(); + synchronized(leafQueue) { + SchedulerNode schedulerNode = + getSchedulerNode(rmContainer.getAllocatedNode()); + SchedContainerChangeRequest decreaseRequest = + new SchedContainerChangeRequest(this.rmContext, schedulerNode, + rmContainer, rmContainer.getLastConfirmedResource()); + decreaseContainer(decreaseRequest, application); + } + } + @Lock(CapacityScheduler.class) @Override protected synchronized void completedContainerInternal( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java index 4a999c82a18..c80fc4f2dcf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java @@ -29,14 +29,24 @@ public class ContainerExpiredSchedulerEvent extends SchedulerEvent { private final ContainerId containerId; - + private final boolean increase; + public ContainerExpiredSchedulerEvent(ContainerId containerId) { + this(containerId, false); + } + + public ContainerExpiredSchedulerEvent( + ContainerId containerId, boolean increase) { super(SchedulerEventType.CONTAINER_EXPIRED); this.containerId = containerId; + this.increase = increase; } public ContainerId getContainerId() { return containerId; } + public boolean isIncrease() { + return increase; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 4233cd4b253..4407fe922a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -20,12 +20,14 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -103,6 +105,17 @@ public void containerStatus(ContainerStatus containerStatus) throws Exception { nodeHeartbeat(conts, true); } + public void containerIncreaseStatus(Container container) throws Exception { + Map> conts = new HashMap<>(); + ContainerStatus containerStatus = BuilderUtils.newContainerStatus( + container.getId(), ContainerState.RUNNING, "Success", 0, + container.getResource()); + conts.put(container.getId().getApplicationAttemptId().getApplicationId(), + Collections.singletonList(containerStatus)); + List increasedConts = Collections.singletonList(container); + nodeHeartbeat(conts, increasedConts, true, ++responseId); + } + public RegisterNodeManagerResponse registerNode() throws Exception { return registerNode(null, null); } @@ -159,6 +172,12 @@ public NodeHeartbeatResponse nodeHeartbeat(Map> conts, boolean isHealthy, int resId) throws Exception { + return nodeHeartbeat(conts, new ArrayList(), isHealthy, resId); + } + + public NodeHeartbeatResponse nodeHeartbeat(Map> conts, List increasedConts, + boolean isHealthy, int resId) throws Exception { NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class); NodeStatus status = Records.newRecord(NodeStatus.class); status.setResponseId(resId); @@ -167,6 +186,7 @@ public NodeHeartbeatResponse nodeHeartbeat(Map appAttemptEventHandler = - mock(EventHandler.class); - EventHandler generic = mock(EventHandler.class); - drainDispatcher.register(RMAppAttemptEventType.class, - appAttemptEventHandler); - drainDispatcher.register(RMNodeEventType.class, generic); - drainDispatcher.init(new YarnConfiguration()); - drainDispatcher.start(); - NodeId nodeId = BuilderUtils.newNodeId("host", 3425); - ApplicationId appId = BuilderUtils.newApplicationId(1, 1); - ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( - appId, 1); - ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); - ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class); - - Resource resource = BuilderUtils.newResource(512, 1); - Priority priority = BuilderUtils.newPriority(5); - - Container container = BuilderUtils.newContainer(containerId, nodeId, - "host:3465", resource, priority, null); - - RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); - SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); - RMContext rmContext = mock(RMContext.class); - when(rmContext.getDispatcher()).thenReturn(drainDispatcher); - when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); - when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); - when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); - when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration()); - ConcurrentMap apps = - new ConcurrentHashMap(); - apps.put(appId, mock(RMApp.class)); - when(rmContext.getRMApps()).thenReturn(apps); - RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, - nodeId, "user", rmContext); - - assertEquals(RMContainerState.NEW, rmContainer.getState()); - assertEquals(resource, rmContainer.getAllocatedResource()); - assertEquals(nodeId, rmContainer.getAllocatedNode()); - assertEquals(priority, rmContainer.getAllocatedPriority()); - verify(writer).containerStarted(any(RMContainer.class)); - verify(publisher).containerCreated(any(RMContainer.class), anyLong()); - - rmContainer.handle(new RMContainerEvent(containerId, - RMContainerEventType.START)); - drainDispatcher.await(); - assertEquals(RMContainerState.ALLOCATED, rmContainer.getState()); - - rmContainer.handle(new RMContainerEvent(containerId, - RMContainerEventType.ACQUIRED)); - drainDispatcher.await(); - assertEquals(RMContainerState.ACQUIRED, rmContainer.getState()); - - rmContainer.handle(new RMContainerEvent(containerId, - RMContainerEventType.LAUNCHED)); - drainDispatcher.await(); assertEquals(RMContainerState.RUNNING, rmContainer.getState()); - assertEquals( - "http://host:3465/node/containerlogs/container_1_0001_01_000001/user", - rmContainer.getLogURL()); - - // newResource is more than the old resource - Resource newResource = BuilderUtils.newResource(1024, 2); - rmContainer.handle(new RMContainerChangeResourceEvent(containerId, - newResource, true)); - - if (acquired) { - rmContainer - .handle(new RMContainerUpdatesAcquiredEvent(containerId, true)); - drainDispatcher.await(); - // status is still RUNNING since this is a increased container acquired by - // AM - assertEquals(RMContainerState.RUNNING, rmContainer.getState()); - } - - // In RUNNING state. Verify EXPIRE and associated actions. - reset(appAttemptEventHandler); - ContainerStatus containerStatus = SchedulerUtils - .createAbnormalContainerStatus(containerId, - SchedulerUtils.EXPIRED_CONTAINER); - rmContainer.handle(new RMContainerFinishedEvent(containerId, - containerStatus, RMContainerEventType.EXPIRE)); - drainDispatcher.await(); - assertEquals(RMContainerState.EXPIRED, rmContainer.getState()); - - // Container will be finished only when it is acquired by AM after increase, - // we will only notify expirer when it is acquired by AM. - verify(writer, times(1)).containerFinished(any(RMContainer.class)); - verify(publisher, times(1)).containerFinished(any(RMContainer.class), + verify(writer, never()).containerFinished(any(RMContainer.class)); + verify(publisher, never()).containerFinished(any(RMContainer.class), anyLong()); } - @Test - public void testExpireAfterContainerResourceIncreased() throws Exception { - // expire after increased and acquired by AM - testExpireAfterIncreased(true); - // expire after increased but not acquired by AM - testExpireAfterIncreased(false); - } - @Test public void testExistenceOfResourceRequestInRMContainer() throws Exception { Configuration conf = new Configuration(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index c08af9d8430..9e298427315 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -28,7 +28,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; @@ -143,7 +142,8 @@ public RMNodeLabelsManager createNodeLabelManager() { .newInstance(containerId1, Resources.createResource(3 * GB))), null); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); checkPendingResource(rm1, "default", 2 * GB, null); Assert.assertEquals(2 * GB, @@ -183,7 +183,8 @@ public RMNodeLabelsManager createNodeLabelManager() { // app1 -> a1 RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); checkUsedResource(rm1, "default", 3 * GB, null); Assert.assertEquals(3 * GB, @@ -242,7 +243,8 @@ public RMNodeLabelsManager createNodeLabelManager() { RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); // Allocate two more containers am1.allocate( @@ -346,7 +348,8 @@ public RMNodeLabelsManager createNodeLabelManager() { RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); // Allocate 1 container am1.allocate( @@ -422,7 +425,8 @@ public RMNodeLabelsManager createNodeLabelManager() { RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); // Allocate two more containers am1.allocate( @@ -532,7 +536,8 @@ public RMNodeLabelsManager createNodeLabelManager() { RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); // Allocate two more containers am1.allocate( @@ -593,8 +598,8 @@ public RMNodeLabelsManager createNodeLabelManager() { am1.allocate(null, Arrays.asList(containerId2)); // am1 asks to change its AM container from 2G to 1G (decrease) am1.sendContainerResizingRequest(null, Arrays.asList( - ContainerResourceChangeRequest - .newInstance(containerId1, Resources.createResource(1 * GB)))); + ContainerResourceChangeRequest + .newInstance(containerId1, Resources.createResource(1 * GB)))); // Trigger a node heartbeat.. cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); @@ -643,7 +648,8 @@ public RMNodeLabelsManager createNodeLabelManager() { RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); // Allocate two more containers am1.allocate( @@ -740,7 +746,8 @@ public RMNodeLabelsManager createNodeLabelManager() { RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); // Allocate two more containers am1.allocate( @@ -862,7 +869,8 @@ public RMNodeLabelsManager createNodeLabelManager() { RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); ApplicationAttemptId attemptId = am1.getApplicationAttemptId(); // Container 2, 3 (priority=3) @@ -942,7 +950,8 @@ public RMNodeLabelsManager createNodeLabelManager() { RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); ApplicationAttemptId attemptId = am1.getApplicationAttemptId(); // Container 2, 3 (priority=3) @@ -1021,7 +1030,8 @@ public ResourceScheduler createScheduler() { MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); // making sure resource is allocated checkUsedResource(rm, "default", 3 * GB, null); - FiCaSchedulerApp app = getFiCaSchedulerApp(rm, app1.getApplicationId()); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm, app1.getApplicationId()); Assert.assertEquals(3 * GB, app.getAppAttemptResourceUsage().getUsed().getMemory()); // making sure container is launched @@ -1113,10 +1123,4 @@ private void verifyAvailableResourceOfSchedulerNode(MockRM rm, NodeId nodeId, Assert .assertEquals(expectedMemory, node.getAvailableResource().getMemory()); } - - private FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm, - ApplicationId appId) { - CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); - return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt(); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java new file mode 100644 index 00000000000..d7ac0b25036 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java @@ -0,0 +1,443 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class TestIncreaseAllocationExpirer { + private final int GB = 1024; + private YarnConfiguration conf; + RMNodeLabelsManager mgr; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + } + + @Test + public void testContainerIsRemovedFromAllocationExpirer() + throws Exception { + /** + * 1. Allocate 1 container: containerId2 (1G) + * 2. Increase resource of containerId2: 1G -> 3G + * 3. AM acquires the token + * 4. AM uses the token + * 5. Verify containerId2 is removed from allocation expirer such + * that it still runs fine after allocation expiration interval + */ + // Set the allocation expiration to 5 seconds + conf.setLong( + YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000); + MockRM rm1 = new MockRM(conf); + rm1.start(); + // Submit an application + MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB); + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + // Report AM container status RUNNING to remove it from expirer + nm1.nodeHeartbeat( + app1.getCurrentAppAttempt() + .getAppAttemptId(), 1, ContainerState.RUNNING); + // AM request a new container + am1.allocate("127.0.0.1", 1 * GB, 1, new ArrayList()); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + // AM acquire a new container to start container allocation expirer + List containers = + am1.allocate(null, null).getAllocatedContainers(); + Assert.assertEquals(containerId2, containers.get(0).getId()); + Assert.assertNotNull(containers.get(0).getContainerToken()); + checkUsedResource(rm1, "default", 2 * GB, null); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); + Assert.assertEquals(2 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 18 * GB); + // Report container status + nm1.nodeHeartbeat( + app1.getCurrentAppAttempt() + .getAppAttemptId(), 2, ContainerState.RUNNING); + // Wait until container status is RUNNING, and is removed from + // allocation expirer + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + // am1 asks to increase containerId2 from 1GB to 3GB + am1.sendContainerResizingRequest(Collections.singletonList( + ContainerResourceChangeRequest.newInstance( + containerId2, Resources.createResource(3 * GB))), null); + // Kick off scheduling and sleep for 1 second; + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + // Start container increase allocation expirer" + am1.allocate(null, null); + // Remember the resource in order to report status + Resource resource = Resources.clone( + rm1.getResourceScheduler().getRMContainer(containerId2) + .getAllocatedResource()); + nm1.containerIncreaseStatus(getContainer(rm1, containerId2, resource)); + // Wait long enough and verify that the container was removed + // from allocation expirer, and the container is still running + Thread.sleep(10000); + Assert.assertEquals(RMContainerState.RUNNING, + rm1.getResourceScheduler().getRMContainer(containerId2).getState()); + // Verify container size is 3G + Assert.assertEquals( + 3 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) + .getAllocatedResource().getMemory()); + // Verify total resource usage + checkUsedResource(rm1, "default", 4 * GB, null); + Assert.assertEquals(4 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + // Verify available resource + verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16 * GB); + rm1.stop(); + } + + @Test + public void testContainerIncreaseAllocationExpiration() + throws Exception { + /** + * 1. Allocate 1 container: containerId2 (1G) + * 2. Increase resource of containerId2: 1G -> 3G + * 3. AM acquires the token + * 4. AM does not use the token + * 5. Verify containerId2's resource usage falls back to + * 1G after the increase token expires + */ + // Set the allocation expiration to 5 seconds + conf.setLong(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB); + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + nm1.nodeHeartbeat( + app1.getCurrentAppAttempt() + .getAppAttemptId(), 1, ContainerState.RUNNING); + am1.allocate("127.0.0.1", 1 * GB, 1, new ArrayList()); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + List containers = + am1.allocate(null, null).getAllocatedContainers(); + Assert.assertEquals(containerId2, containers.get(0).getId()); + Assert.assertNotNull(containers.get(0).getContainerToken()); + checkUsedResource(rm1, "default", 2 * GB, null); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); + Assert.assertEquals(2 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 18 * GB); + nm1.nodeHeartbeat( + app1.getCurrentAppAttempt() + .getAppAttemptId(), 2, ContainerState.RUNNING); + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + // am1 asks to increase containerId2 from 1GB to 3GB + am1.sendContainerResizingRequest(Collections.singletonList( + ContainerResourceChangeRequest.newInstance( + containerId2, Resources.createResource(3 * GB))), null); + // Kick off scheduling and wait for 1 second; + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + // Start container increase allocation expirer + am1.allocate(null, null); + // Verify resource usage + checkUsedResource(rm1, "default", 4 * GB, null); + Assert.assertEquals(4 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16 * GB); + // Wait long enough for the increase token to expire, and for the roll + // back action to complete + Thread.sleep(10000); + // Verify container size is 1G + Assert.assertEquals( + 1 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) + .getAllocatedResource().getMemory()); + // Verify total resource usage is 2G + checkUsedResource(rm1, "default", 2 * GB, null); + Assert.assertEquals(2 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + // Verify available resource is rolled back to 18GB + verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 18 * GB); + rm1.stop(); + } + + @Test + public void testConsecutiveContainerIncreaseAllocationExpiration() + throws Exception { + /** + * 1. Allocate 1 container: containerId2 (1G) + * 2. Increase resource of containerId2: 1G -> 3G + * 3. AM acquires the token + * 4. Increase resource of containerId2 again: 3G -> 5G + * 5. AM acquires the token + * 6. AM uses the first token to increase the container in NM to 3G + * 7. AM NEVER uses the second token + * 8. Verify containerId2 eventually is allocated 3G after token expires + * 9. Verify NM eventually uses 3G for containerId2 + */ + // Set the allocation expiration to 5 seconds + conf.setLong(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000); + MockRM rm1 = new MockRM(conf); + rm1.start(); + // Submit an application + MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB); + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + nm1.nodeHeartbeat( + app1.getCurrentAppAttempt() + .getAppAttemptId(), 1, ContainerState.RUNNING); + // AM request a new container + am1.allocate("127.0.0.1", 1 * GB, 1, new ArrayList()); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + // AM acquire a new container to start container allocation expirer + am1.allocate(null, null).getAllocatedContainers(); + // Report container status + nm1.nodeHeartbeat( + app1.getCurrentAppAttempt() + .getAppAttemptId(), 2, ContainerState.RUNNING); + // Wait until container status is RUNNING, and is removed from + // allocation expirer + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + // am1 asks to change containerId2 from 1GB to 3GB + am1.sendContainerResizingRequest(Collections.singletonList( + ContainerResourceChangeRequest.newInstance( + containerId2, Resources.createResource(3 * GB))), null); + // Kick off scheduling and sleep for 1 second to + // make sure the allocation is done + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + // Start container increase allocation expirer + am1.allocate(null, null); + // Remember the resource (3G) in order to report status + Resource resource1 = Resources.clone( + rm1.getResourceScheduler().getRMContainer(containerId2) + .getAllocatedResource()); + // am1 asks to change containerId2 from 3GB to 5GB + am1.sendContainerResizingRequest(Collections.singletonList( + ContainerResourceChangeRequest.newInstance( + containerId2, Resources.createResource(5 * GB))), null); + // Kick off scheduling and sleep for 1 second to + // make sure the allocation is done + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + // Reset container increase allocation expirer + am1.allocate(null, null); + // Verify current resource allocation in RM + checkUsedResource(rm1, "default", 6 * GB, null); + FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp( + rm1, app1.getApplicationId()); + Assert.assertEquals(6 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + // Verify available resource is now reduced to 14GB + verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 14 * GB); + // Use the first token (3G) + nm1.containerIncreaseStatus(getContainer(rm1, containerId2, resource1)); + // Wait long enough for the second token (5G) to expire, and verify that + // the roll back action is completed as expected + Thread.sleep(10000); + // Verify container size is rolled back to 3G + Assert.assertEquals( + 3 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) + .getAllocatedResource().getMemory()); + // Verify total resource usage is 4G + checkUsedResource(rm1, "default", 4 * GB, null); + Assert.assertEquals(4 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + // Verify available resource is rolled back to 14GB + verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16 * GB); + // Verify NM receives the decrease message (3G) + List containersToDecrease = + nm1.nodeHeartbeat(true).getContainersToDecrease(); + Assert.assertEquals(1, containersToDecrease.size()); + Assert.assertEquals( + 3 * GB, containersToDecrease.get(0).getResource().getMemory()); + rm1.stop(); + } + + @Test + public void testDecreaseAfterIncreaseWithAllocationExpiration() + throws Exception { + /** + * 1. Allocate three containers: containerId2, containerId3, containerId4 + * 2. Increase resource of containerId2: 3G -> 6G + * 3. Increase resource of containerId3: 3G -> 6G + * 4. Increase resource of containerId4: 3G -> 6G + * 5. Do NOT use the increase tokens for containerId2 and containerId3 + * 6. Decrease containerId2: 6G -> 2G (i.e., below last confirmed resource) + * 7. Decrease containerId3: 6G -> 4G (i.e., above last confirmed resource) + * 8. Decrease containerId4: 6G -> 4G (i.e., above last confirmed resource) + * 9. Use token for containerId4 to increase containerId4 on NM to 6G + * 10. Verify containerId2 eventually uses 2G (removed from expirer) + * 11. verify containerId3 eventually uses 3G (increase token expires) + * 12. Verify containerId4 eventually uses 4G (removed from expirer) + * 13. Verify NM evetually uses 3G for containerId3, 4G for containerId4 + */ + // Set the allocation expiration to 5 seconds + conf.setLong(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000); + MockRM rm1 = new MockRM(conf); + rm1.start(); + // Submit an application + MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB); + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + nm1.nodeHeartbeat( + app1.getCurrentAppAttempt() + .getAppAttemptId(), 1, ContainerState.RUNNING); + // AM request two new continers + am1.allocate("127.0.0.1", 3 * GB, 3, new ArrayList()); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + ContainerId containerId3 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); + rm1.waitForState(nm1, containerId3, RMContainerState.ALLOCATED); + ContainerId containerId4 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 4); + rm1.waitForState(nm1, containerId4, RMContainerState.ALLOCATED); + // AM acquires tokens to start container allocation expirer + List containers = + am1.allocate(null, null).getAllocatedContainers(); + Assert.assertEquals(3, containers.size()); + Assert.assertNotNull(containers.get(0).getContainerToken()); + Assert.assertNotNull(containers.get(1).getContainerToken()); + Assert.assertNotNull(containers.get(2).getContainerToken()); + // Report container status + nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(), + 2, ContainerState.RUNNING); + nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(), + 3, ContainerState.RUNNING); + nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(), + 4, ContainerState.RUNNING); + // Wait until container status becomes RUNNING + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING); + rm1.waitForState(nm1, containerId4, RMContainerState.RUNNING); + // am1 asks to change containerId2 and containerId3 from 1GB to 3GB + List increaseRequests = new ArrayList<>(); + increaseRequests.add(ContainerResourceChangeRequest.newInstance( + containerId2, Resources.createResource(6 * GB))); + increaseRequests.add(ContainerResourceChangeRequest.newInstance( + containerId3, Resources.createResource(6 * GB))); + increaseRequests.add(ContainerResourceChangeRequest.newInstance( + containerId4, Resources.createResource(6 * GB))); + am1.sendContainerResizingRequest(increaseRequests, null); + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + // Start container increase allocation expirer + am1.allocate(null, null); + // Decrease containers + List decreaseRequests = new ArrayList<>(); + decreaseRequests.add(ContainerResourceChangeRequest.newInstance( + containerId2, Resources.createResource(2 * GB))); + decreaseRequests.add(ContainerResourceChangeRequest.newInstance( + containerId3, Resources.createResource(4 * GB))); + decreaseRequests.add(ContainerResourceChangeRequest.newInstance( + containerId4, Resources.createResource(4 * GB))); + AllocateResponse response = + am1.sendContainerResizingRequest(null, decreaseRequests); + // Verify containers are decreased in scheduler + Assert.assertEquals(3, response.getDecreasedContainers().size()); + // Use the token for containerId4 on NM (6G). This should set the last + // confirmed resource to 4G, and cancel the allocation expirer + nm1.containerIncreaseStatus(getContainer( + rm1, containerId4, Resources.createResource(6 * GB))); + // Wait for containerId3 token to expire, + Thread.sleep(10000); + Assert.assertEquals( + 2 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) + .getAllocatedResource().getMemory()); + Assert.assertEquals( + 3 * GB, rm1.getResourceScheduler().getRMContainer(containerId3) + .getAllocatedResource().getMemory()); + Assert.assertEquals( + 4 * GB, rm1.getResourceScheduler().getRMContainer(containerId4) + .getAllocatedResource().getMemory()); + // Verify NM receives 2 decrease message + List containersToDecrease = + nm1.nodeHeartbeat(true).getContainersToDecrease(); + Assert.assertEquals(2, containersToDecrease.size()); + // Sort the list to make sure containerId3 is the first + Collections.sort(containersToDecrease); + Assert.assertEquals( + 3 * GB, containersToDecrease.get(0).getResource().getMemory()); + Assert.assertEquals( + 4 * GB, containersToDecrease.get(1).getResource().getMemory()); + rm1.stop(); + } + + private void checkUsedResource(MockRM rm, String queueName, int memory, + String label) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + CSQueue queue = cs.getQueue(queueName); + Assert.assertEquals(memory, + queue.getQueueResourceUsage() + .getUsed(label == null ? RMNodeLabelsManager.NO_LABEL : label) + .getMemory()); + } + + private void verifyAvailableResourceOfSchedulerNode(MockRM rm, NodeId nodeId, + int expectedMemory) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + SchedulerNode node = cs.getNode(nodeId); + Assert + .assertEquals(expectedMemory, node.getAvailableResource().getMemory()); + } + + private Container getContainer( + MockRM rm, ContainerId containerId, Resource resource) { + RMContainer rmContainer = rm.getResourceScheduler() + .getRMContainer(containerId); + return Container.newInstance( + containerId, rmContainer.getAllocatedNode(), null, + resource, null, null); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 489ef7711d4..17860690b9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; @@ -91,10 +92,10 @@ public EventHandler getEventHandler() { } }; - // No op - ContainerAllocationExpirer cae = + // No op + ContainerAllocationExpirer cae = new ContainerAllocationExpirer(nullDispatcher); - + Configuration conf = new Configuration(); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMContextImpl rmContext = @@ -122,7 +123,7 @@ public Resource answer(InvocationOnMock invocation) throws Throwable { return (Resource) args[1]; } }); - + rmContext.setNodeLabelManager(nlm); rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class)); rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class)); @@ -349,4 +350,10 @@ public static Configuration getConfigurationWithDefaultQueueLabels( conf.setDefaultNodeLabelExpression(B, "y"); return conf; } + + public static FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm, + ApplicationId appId) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt(); + } }