From c3375175d616e0380560f89d491b6b9753a8f3e1 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Wed, 12 Apr 2017 14:17:13 -0700 Subject: [PATCH] YARN-6432. FairScheduler: Reserve preempted resources for corresponding applications. (Miklos Szegedi via kasha) --- .../rmcontainer/RMContainer.java | 3 +- .../rmcontainer/RMContainerImpl.java | 2 +- .../scheduler/SchedulerNode.java | 2 +- .../scheduler/fair/FSAppAttempt.java | 8 +- .../scheduler/fair/FSPreemptionThread.java | 25 +- .../scheduler/fair/FSSchedulerNode.java | 133 +++++- .../scheduler/fair/FairScheduler.java | 39 +- .../scheduler/fair/TestFSSchedulerNode.java | 403 ++++++++++++++++++ .../fair/TestFairSchedulerPreemption.java | 19 + 9 files changed, 596 insertions(+), 38 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index 7ad381ed84f..29680e5d87b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -42,7 +42,8 @@ import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; * when resources are being reserved to fill space for a future container * allocation. */ -public interface RMContainer extends EventHandler { +public interface RMContainer extends EventHandler, + Comparable { ContainerId getContainerId(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 12fbbea72ac..1e9463a3e01 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -63,7 +63,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @SuppressWarnings({"unchecked", "rawtypes"}) -public class RMContainerImpl implements RMContainer, Comparable { +public class RMContainerImpl implements RMContainer { private static final Log LOG = LogFactory.getLog(RMContainerImpl.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index af4a0012826..272537c8bf6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -160,7 +160,7 @@ public abstract class SchedulerNode { * @param rmContainer Allocated container * @param launchedOnNode True if the container has been launched */ - private synchronized void allocateContainer(RMContainer rmContainer, + protected synchronized void allocateContainer(RMContainer rmContainer, boolean launchedOnNode) { Container container = rmContainer.getContainer(); if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index e0dfb73f13c..a1c4b4b1027 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -647,7 +647,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt Container reservedContainer, NodeType type, SchedulerRequestKey schedulerKey) { - if (!reservationExceedsThreshold(node, type)) { + RMContainer nodeReservedContainer = node.getReservedContainer(); + boolean reservableForThisApp = nodeReservedContainer == null || + nodeReservedContainer.getApplicationAttemptId() + .equals(getApplicationAttemptId()); + if (reservableForThisApp &&!reservationExceedsThreshold(node, type)) { LOG.info("Making reservation: node=" + node.getNodeName() + " app_id=" + getApplicationId()); if (reservedContainer == null) { @@ -1139,7 +1143,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt /** * Is application starved for fairshare or minshare */ - private boolean isStarved() { + boolean isStarved() { return isStarvedForFairShare() || !Resources.isNone(minshareStarvation); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index 65df0c2ea82..efe36a66af5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -113,11 +113,6 @@ class FSPreemptionThread extends Thread { List potentialNodes = scheduler.getNodeTracker() .getNodesByResourceName(rr.getResourceName()); for (FSSchedulerNode node : potentialNodes) { - // TODO (YARN-5829): Attempt to reserve the node for starved app. - if (isNodeAlreadyReserved(node, starvedApp)) { - continue; - } - int maxAMContainers = bestContainers == null ? Integer.MAX_VALUE : bestContainers.numAMContainers; PreemptableContainers preemptableContainers = @@ -134,7 +129,8 @@ class FSPreemptionThread extends Thread { if (bestContainers != null && bestContainers.containers.size() > 0) { containersToPreempt.addAll(bestContainers.containers); - trackPreemptionsAgainstNode(bestContainers.containers); + // Reserve the containers for the starved app + trackPreemptionsAgainstNode(bestContainers.containers, starvedApp); } } } // End of iteration over RRs @@ -163,8 +159,10 @@ class FSPreemptionThread extends Thread { node.getRunningContainersWithAMsAtTheEnd(); containersToCheck.removeAll(node.getContainersForPreemption()); - // Initialize potential with unallocated resources - Resource potential = Resources.clone(node.getUnallocatedResource()); + // Initialize potential with unallocated but not reserved resources + Resource potential = Resources.subtractFromNonNegative( + Resources.clone(node.getUnallocatedResource()), + node.getTotalReserved()); for (RMContainer container : containersToCheck) { FSAppAttempt app = @@ -182,8 +180,6 @@ class FSPreemptionThread extends Thread { // Check if we have already identified enough containers if (Resources.fitsIn(request, potential)) { return preemptableContainers; - } else { - // TODO (YARN-5829): Unreserve the node for the starved app. } } return null; @@ -195,10 +191,11 @@ class FSPreemptionThread extends Thread { return nodeReservedApp != null && !nodeReservedApp.equals(app); } - private void trackPreemptionsAgainstNode(List containers) { + private void trackPreemptionsAgainstNode(List containers, + FSAppAttempt app) { FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker() .getNode(containers.get(0).getNodeId()); - node.addContainersForPreemption(containers); + node.addContainersForPreemption(containers, app); } private void preemptContainers(List containers) { @@ -232,10 +229,6 @@ class FSPreemptionThread extends Thread { LOG.info("Killing container " + container); scheduler.completedContainer( container, status, RMContainerEventType.KILL); - - FSSchedulerNode containerNode = (FSSchedulerNode) - scheduler.getNodeTracker().getNode(container.getAllocatedNode()); - containerNode.removeContainerForPreemption(container); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index d983ea08237..663e3c8b3a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -18,18 +18,26 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.util.resource.Resources; import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; @@ -38,15 +46,38 @@ import java.util.concurrent.ConcurrentSkipListSet; public class FSSchedulerNode extends SchedulerNode { private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class); - private FSAppAttempt reservedAppSchedulable; - private final Set containersForPreemption = + // Stores list of containers still to be preempted + @VisibleForTesting + final Set containersForPreemption = new ConcurrentSkipListSet<>(); + // Stores amount of resources preempted and reserved for each app + @VisibleForTesting + final Map + resourcesPreemptedForApp = new LinkedHashMap<>(); + private final Map appIdToAppMap = + new HashMap<>(); + // Sum of resourcesPreemptedForApp values, total resources that are + // slated for preemption + private Resource totalResourcesPreempted = Resource.newInstance(0, 0); public FSSchedulerNode(RMNode node, boolean usePortForNodeName) { super(node, usePortForNodeName); } + /** + * Total amount of reserved resources including reservations and preempted + * containers. + * @return total resources reserved + */ + Resource getTotalReserved() { + Resource totalReserved = Resources.clone(getReservedContainer() != null + ? getReservedContainer().getAllocatedResource() + : Resource.newInstance(0, 0)); + Resources.addTo(totalReserved, totalResourcesPreempted); + return totalReserved; + } + @Override public synchronized void reserveResource( SchedulerApplicationAttempt application, SchedulerRequestKey schedulerKey, @@ -109,17 +140,56 @@ public class FSSchedulerNode extends SchedulerNode { return reservedAppSchedulable; } + /** + * List reserved resources after preemption and assign them to the + * appropriate applications in a FIFO order. + * @return if any resources were allocated + */ + @VisibleForTesting + synchronized LinkedHashMap getPreemptionList() { + cleanupPreemptionList(); + return new LinkedHashMap<>(resourcesPreemptedForApp); + } + + /** + * Remove apps that have their preemption requests fulfilled. + */ + private synchronized void cleanupPreemptionList() { + Iterator iterator = + resourcesPreemptedForApp.keySet().iterator(); + while (iterator.hasNext()) { + FSAppAttempt app = iterator.next(); + if (app.isStopped() || !app.isStarved()) { + // App does not need more resources + Resources.subtractFrom(totalResourcesPreempted, + resourcesPreemptedForApp.get(app)); + appIdToAppMap.remove(app.getApplicationAttemptId()); + iterator.remove(); + } + } + } + /** * Mark {@code containers} as being considered for preemption so they are * not considered again. A call to this requires a corresponding call to - * {@link #removeContainerForPreemption} to ensure we do not mark a - * container for preemption and never consider it again and avoid memory - * leaks. + * {@code releaseContainer} to ensure we do not mark a container for + * preemption and never consider it again and avoid memory leaks. * * @param containers container to mark */ - void addContainersForPreemption(Collection containers) { - containersForPreemption.addAll(containers); + void addContainersForPreemption(Collection containers, + FSAppAttempt app) { + + appIdToAppMap.putIfAbsent(app.getApplicationAttemptId(), app); + resourcesPreemptedForApp.putIfAbsent(app, Resource.newInstance(0, 0)); + Resource appReserved = resourcesPreemptedForApp.get(app); + + for(RMContainer container : containers) { + containersForPreemption.add(container); + Resources.addTo(appReserved, container.getAllocatedResource()); + Resources.addTo(totalResourcesPreempted, + container.getAllocatedResource()); + } } /** @@ -130,11 +200,50 @@ public class FSSchedulerNode extends SchedulerNode { } /** - * Remove container from the set of containers marked for preemption. - * - * @param container container to remove + * The Scheduler has allocated containers on this node to the given + * application. + * @param rmContainer Allocated container + * @param launchedOnNode True if the container has been launched */ - void removeContainerForPreemption(RMContainer container) { - containersForPreemption.remove(container); + @Override + protected synchronized void allocateContainer(RMContainer rmContainer, + boolean launchedOnNode) { + super.allocateContainer(rmContainer, launchedOnNode); + Resource allocated = rmContainer.getAllocatedResource(); + if (!Resources.isNone(allocated)) { + // check for satisfied preemption request and update bookkeeping + FSAppAttempt app = + appIdToAppMap.get(rmContainer.getApplicationAttemptId()); + if (app != null) { + Resource reserved = resourcesPreemptedForApp.get(app); + Resource fulfilled = Resources.componentwiseMin(reserved, allocated); + Resources.subtractFrom(reserved, fulfilled); + Resources.subtractFrom(totalResourcesPreempted, fulfilled); + if (Resources.isNone(reserved)) { + // No more preempted containers + resourcesPreemptedForApp.remove(app); + appIdToAppMap.remove(rmContainer.getApplicationAttemptId()); + } + } + } else { + LOG.error("Allocated empty container" + rmContainer.getContainerId()); + } + } + + /** + * Release an allocated container on this node. + * It also releases from the reservation list to trigger preemption + * allocations. + * @param containerId ID of container to be released. + * @param releasedByNode whether the release originates from a node update. + */ + @Override + public synchronized void releaseContainer(ContainerId containerId, + boolean releasedByNode) { + RMContainer container = getContainer(containerId); + super.releaseContainer(containerId, releasedByNode); + if (container != null) { + containersForPreemption.remove(container); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 98c14ace6a3..d1a237ada97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -71,9 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -972,6 +970,31 @@ public class FairScheduler extends } } + /** + * Assign preempted containers to the applications that have reserved + * resources for preempted containers. + * @param node Node to check + * @return assignment has occurred + */ + static boolean assignPreemptedContainers(FSSchedulerNode node) { + boolean assignedAny = false; + for (Entry entry : + node.getPreemptionList().entrySet()) { + FSAppAttempt app = entry.getKey(); + Resource preemptionPending = Resources.clone(entry.getValue()); + while (!app.isStopped() && !Resources.isNone(preemptionPending)) { + Resource assigned = app.assignContainer(node); + if (Resources.isNone(assigned)) { + // Fail to assign, let's not try further + break; + } + assignedAny = true; + Resources.subtractFromNonNegative(preemptionPending, assigned); + } + } + return assignedAny; + } + @VisibleForTesting void attemptScheduling(FSSchedulerNode node) { try { @@ -991,11 +1014,17 @@ public class FairScheduler extends } // Assign new containers... - // 1. Check for reserved applications - // 2. Schedule if there are no reservations + // 1. Ensure containers are assigned to the apps that preempted + // 2. Check for reserved applications + // 3. Schedule if there are no reservations - boolean validReservation = false; + // Apps may wait for preempted containers + // We have to satisfy these first to avoid cases, when we preempt + // a container for A from B and C gets the preempted containers, + // when C does not qualify for preemption itself. + assignPreemptedContainers(node); FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); + boolean validReservation = false; if (reservedAppSchedulable != null) { validReservation = reservedAppSchedulable.assignReservedContainer(node); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java new file mode 100644 index 00000000000..3927b00f68e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java @@ -0,0 +1,403 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.ArrayList; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test scheduler node, especially preemption reservations. + */ +public class TestFSSchedulerNode { + private final ArrayList containers = new ArrayList<>(); + + private RMNode createNode() { + RMNode node = mock(RMNode.class); + when(node.getTotalCapability()).thenReturn(Resource.newInstance(8192, 8)); + when(node.getHostName()).thenReturn("host.domain.com"); + return node; + } + + private void createDefaultContainer() { + createContainer(Resource.newInstance(1024, 1), null); + } + + private RMContainer createContainer( + Resource request, ApplicationAttemptId appAttemptId) { + RMContainer container = mock(RMContainer.class); + Container containerInner = mock(Container.class); + ContainerId id = mock(ContainerId.class); + when(id.getContainerId()).thenReturn((long)containers.size()); + when(containerInner.getResource()). + thenReturn(Resources.clone(request)); + when(containerInner.getId()).thenReturn(id); + when(containerInner.getExecutionType()). + thenReturn(ExecutionType.GUARANTEED); + when(container.getApplicationAttemptId()).thenReturn(appAttemptId); + when(container.getContainerId()).thenReturn(id); + when(container.getContainer()).thenReturn(containerInner); + when(container.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); + when(container.getAllocatedResource()). + thenReturn(Resources.clone(request)); + containers.add(container); + return container; + } + + private void saturateCluster(FSSchedulerNode schedulerNode) { + while (!Resources.isNone(schedulerNode.getUnallocatedResource())) { + createDefaultContainer(); + schedulerNode.allocateContainer(containers.get(containers.size() - 1)); + schedulerNode.containerStarted(containers.get(containers.size() - 1). + getContainerId()); + } + } + + private FSAppAttempt createStarvingApp(FSSchedulerNode schedulerNode, + Resource request) { + FSAppAttempt starvingApp = mock(FSAppAttempt.class); + ApplicationAttemptId appAttemptId = + mock(ApplicationAttemptId.class); + when(starvingApp.getApplicationAttemptId()).thenReturn(appAttemptId); + when(starvingApp.assignContainer(schedulerNode)).thenAnswer( + new Answer() { + @Override + public Resource answer(InvocationOnMock invocationOnMock) + throws Throwable { + Resource response = Resource.newInstance(0, 0); + while (!Resources.isNone(request) && + !Resources.isNone(schedulerNode.getUnallocatedResource())) { + RMContainer container = createContainer(request, appAttemptId); + schedulerNode.allocateContainer(container); + Resources.addTo(response, container.getAllocatedResource()); + Resources.subtractFrom(request, + container.getAllocatedResource()); + } + return response; + } + }); + when(starvingApp.isStarved()).thenAnswer( + new Answer() { + @Override + public Boolean answer(InvocationOnMock invocationOnMock) + throws Throwable { + return !Resources.isNone(request); + } + } + ); + when(starvingApp.getPendingDemand()).thenReturn(request); + return starvingApp; + } + + private void finalValidation(FSSchedulerNode schedulerNode) { + assertEquals("Everything should have been released", + Resources.none(), schedulerNode.getAllocatedResource()); + assertTrue("No containers should be reserved for preemption", + schedulerNode.containersForPreemption.isEmpty()); + assertTrue("No resources should be reserved for preemptors", + schedulerNode.resourcesPreemptedForApp.isEmpty()); + assertEquals( + "No amount of resource should be reserved for preemptees", + Resources.none(), + schedulerNode.getTotalReserved()); + } + + private void allocateContainers(FSSchedulerNode schedulerNode) { + FairScheduler.assignPreemptedContainers(schedulerNode); + } + + /** + * Allocate and release a single container. + */ + @Test + public void testSimpleAllocation() { + RMNode node = createNode(); + FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false); + + createDefaultContainer(); + assertEquals("Nothing should have been allocated, yet", + Resources.none(), schedulerNode.getAllocatedResource()); + schedulerNode.allocateContainer(containers.get(0)); + assertEquals("Container should be allocated", + containers.get(0).getContainer().getResource(), + schedulerNode.getAllocatedResource()); + schedulerNode.releaseContainer(containers.get(0).getContainerId(), true); + assertEquals("Everything should have been released", + Resources.none(), schedulerNode.getAllocatedResource()); + + // Check that we are error prone + schedulerNode.releaseContainer(containers.get(0).getContainerId(), true); + finalValidation(schedulerNode); + } + + /** + * Allocate and release three containers with launch. + */ + @Test + public void testMultipleAllocations() { + RMNode node = createNode(); + FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false); + + createDefaultContainer(); + createDefaultContainer(); + createDefaultContainer(); + assertEquals("Nothing should have been allocated, yet", + Resources.none(), schedulerNode.getAllocatedResource()); + schedulerNode.allocateContainer(containers.get(0)); + schedulerNode.containerStarted(containers.get(0).getContainerId()); + schedulerNode.allocateContainer(containers.get(1)); + schedulerNode.containerStarted(containers.get(1).getContainerId()); + schedulerNode.allocateContainer(containers.get(2)); + assertEquals("Container should be allocated", + Resources.multiply(containers.get(0).getContainer().getResource(), 3.0), + schedulerNode.getAllocatedResource()); + schedulerNode.releaseContainer(containers.get(1).getContainerId(), true); + schedulerNode.releaseContainer(containers.get(2).getContainerId(), true); + schedulerNode.releaseContainer(containers.get(0).getContainerId(), true); + finalValidation(schedulerNode); + } + + /** + * Allocate and release a single container. + */ + @Test + public void testSimplePreemption() { + RMNode node = createNode(); + FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false); + + // Launch containers and saturate the cluster + saturateCluster(schedulerNode); + assertEquals("Container should be allocated", + Resources.multiply(containers.get(0).getContainer().getResource(), + containers.size()), + schedulerNode.getAllocatedResource()); + + // Request preemption + FSAppAttempt starvingApp = createStarvingApp(schedulerNode, + Resource.newInstance(1024, 1)); + schedulerNode.addContainersForPreemption( + Collections.singletonList(containers.get(0)), starvingApp); + assertEquals( + "No resource amount should be reserved for preemptees", + containers.get(0).getAllocatedResource(), + schedulerNode.getTotalReserved()); + + // Preemption occurs release one container + schedulerNode.releaseContainer(containers.get(0).getContainerId(), true); + allocateContainers(schedulerNode); + assertEquals("Container should be allocated", + schedulerNode.getTotalResource(), + schedulerNode.getAllocatedResource()); + + // Release all remaining containers + for (int i = 1; i < containers.size(); ++i) { + schedulerNode.releaseContainer(containers.get(i).getContainerId(), true); + } + finalValidation(schedulerNode); + } + + /** + * Allocate and release three containers requested by two apps. + */ + @Test + public void testComplexPreemption() { + RMNode node = createNode(); + FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false); + + // Launch containers and saturate the cluster + saturateCluster(schedulerNode); + assertEquals("Container should be allocated", + Resources.multiply(containers.get(0).getContainer().getResource(), + containers.size()), + schedulerNode.getAllocatedResource()); + + // Preempt a container + FSAppAttempt starvingApp1 = createStarvingApp(schedulerNode, + Resource.newInstance(2048, 2)); + FSAppAttempt starvingApp2 = createStarvingApp(schedulerNode, + Resource.newInstance(1024, 1)); + + // Preemption thread kicks in + schedulerNode.addContainersForPreemption( + Collections.singletonList(containers.get(0)), starvingApp1); + schedulerNode.addContainersForPreemption( + Collections.singletonList(containers.get(1)), starvingApp1); + schedulerNode.addContainersForPreemption( + Collections.singletonList(containers.get(2)), starvingApp2); + + // Preemption happens + schedulerNode.releaseContainer(containers.get(0).getContainerId(), true); + schedulerNode.releaseContainer(containers.get(2).getContainerId(), true); + schedulerNode.releaseContainer(containers.get(1).getContainerId(), true); + + allocateContainers(schedulerNode); + assertEquals("Container should be allocated", + schedulerNode.getTotalResource(), + schedulerNode.getAllocatedResource()); + + // Release all containers + for (int i = 3; i < containers.size(); ++i) { + schedulerNode.releaseContainer(containers.get(i).getContainerId(), true); + } + finalValidation(schedulerNode); + } + + /** + * Allocate and release three containers requested by two apps in two rounds. + */ + @Test + public void testMultiplePreemptionEvents() { + RMNode node = createNode(); + FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false); + + // Launch containers and saturate the cluster + saturateCluster(schedulerNode); + assertEquals("Container should be allocated", + Resources.multiply(containers.get(0).getContainer().getResource(), + containers.size()), + schedulerNode.getAllocatedResource()); + + // Preempt a container + FSAppAttempt starvingApp1 = createStarvingApp(schedulerNode, + Resource.newInstance(2048, 2)); + FSAppAttempt starvingApp2 = createStarvingApp(schedulerNode, + Resource.newInstance(1024, 1)); + + // Preemption thread kicks in + schedulerNode.addContainersForPreemption( + Collections.singletonList(containers.get(0)), starvingApp1); + schedulerNode.addContainersForPreemption( + Collections.singletonList(containers.get(1)), starvingApp1); + schedulerNode.addContainersForPreemption( + Collections.singletonList(containers.get(2)), starvingApp2); + + // Preemption happens + schedulerNode.releaseContainer(containers.get(1).getContainerId(), true); + allocateContainers(schedulerNode); + + schedulerNode.releaseContainer(containers.get(2).getContainerId(), true); + schedulerNode.releaseContainer(containers.get(0).getContainerId(), true); + allocateContainers(schedulerNode); + + assertEquals("Container should be allocated", + schedulerNode.getTotalResource(), + schedulerNode.getAllocatedResource()); + + // Release all containers + for (int i = 3; i < containers.size(); ++i) { + schedulerNode.releaseContainer(containers.get(i).getContainerId(), true); + } + finalValidation(schedulerNode); + } + + /** + * Allocate and release a single container and delete the app in between. + */ + @Test + public void testPreemptionToCompletedApp() { + RMNode node = createNode(); + FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false); + + // Launch containers and saturate the cluster + saturateCluster(schedulerNode); + assertEquals("Container should be allocated", + Resources.multiply(containers.get(0).getContainer().getResource(), + containers.size()), + schedulerNode.getAllocatedResource()); + + // Preempt a container + FSAppAttempt starvingApp = createStarvingApp(schedulerNode, + Resource.newInstance(1024, 1)); + schedulerNode.addContainersForPreemption( + Collections.singletonList(containers.get(0)), starvingApp); + + schedulerNode.releaseContainer(containers.get(0).getContainerId(), true); + + // Stop the application then try to satisfy the reservation + // and observe that there are still free resources not allocated to + // the deleted app + when(starvingApp.isStopped()).thenReturn(true); + allocateContainers(schedulerNode); + assertNotEquals("Container should be allocated", + schedulerNode.getTotalResource(), + schedulerNode.getAllocatedResource()); + + // Release all containers + for (int i = 1; i < containers.size(); ++i) { + schedulerNode.releaseContainer(containers.get(i).getContainerId(), true); + } + finalValidation(schedulerNode); + } + + /** + * Preempt a bigger container than the preemption request. + */ + @Test + public void testPartialReservedPreemption() { + RMNode node = createNode(); + FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false); + + // Launch containers and saturate the cluster + saturateCluster(schedulerNode); + assertEquals("Container should be allocated", + Resources.multiply(containers.get(0).getContainer().getResource(), + containers.size()), + schedulerNode.getAllocatedResource()); + + // Preempt a container + Resource originalStarvingAppDemand = Resource.newInstance(512, 1); + FSAppAttempt starvingApp = createStarvingApp(schedulerNode, + originalStarvingAppDemand); + schedulerNode.addContainersForPreemption( + Collections.singletonList(containers.get(0)), starvingApp); + + // Preemption occurs + schedulerNode.releaseContainer(containers.get(0).getContainerId(), true); + + // Container partially reassigned + allocateContainers(schedulerNode); + assertEquals("Container should be allocated", + Resources.subtract(schedulerNode.getTotalResource(), + Resource.newInstance(512, 0)), + schedulerNode.getAllocatedResource()); + + // Cleanup simulating node update + schedulerNode.getPreemptionList(); + + // Release all containers + for (int i = 1; i < containers.size(); ++i) { + schedulerNode.releaseContainer(containers.get(i).getContainerId(), true); + } + finalValidation(schedulerNode); + } + +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 3940a47aded..59d243bc690 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -294,11 +294,30 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { 8 - 2 * numStarvedAppContainers, greedyApp.getQueue().getMetrics().getAggregatePreemptedContainers()); + // Verify the node is reserved for the starvingApp + for (RMNode rmNode : rmNodes) { + FSSchedulerNode node = (FSSchedulerNode) + scheduler.getNodeTracker().getNode(rmNode.getNodeID()); + if (node.getContainersForPreemption().size() > 0) { + assertTrue("node should be reserved for the starvingApp", + node.getPreemptionList().keySet().contains(starvingApp)); + } + } + sendEnoughNodeUpdatesToAssignFully(); // Verify the preempted containers are assigned to starvingApp assertEquals("Starved app is not assigned the right # of containers", numStarvedAppContainers, starvingApp.getLiveContainers().size()); + + // Verify the node is not reserved for the starvingApp anymore + for (RMNode rmNode : rmNodes) { + FSSchedulerNode node = (FSSchedulerNode) + scheduler.getNodeTracker().getNode(rmNode.getNodeID()); + if (node.getContainersForPreemption().size() > 0) { + assertFalse(node.getPreemptionList().keySet().contains(starvingApp)); + } + } } private void verifyNoPreemption() throws InterruptedException {