YARN-6432. FairScheduler: Reserve preempted resources for corresponding applications. (Miklos Szegedi via kasha)

This commit is contained in:
Karthik Kambatla 2017-04-12 14:17:13 -07:00
parent 9d9087a67a
commit c3375175d6
9 changed files with 596 additions and 38 deletions

View File

@ -42,7 +42,8 @@ import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
* when resources are being reserved to fill space for a future container
* allocation.
*/
public interface RMContainer extends EventHandler<RMContainerEvent> {
public interface RMContainer extends EventHandler<RMContainerEvent>,
Comparable<RMContainer> {
ContainerId getContainerId();

View File

@ -63,7 +63,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@SuppressWarnings({"unchecked", "rawtypes"})
public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
public class RMContainerImpl implements RMContainer {
private static final Log LOG = LogFactory.getLog(RMContainerImpl.class);

View File

@ -160,7 +160,7 @@ public abstract class SchedulerNode {
* @param rmContainer Allocated container
* @param launchedOnNode True if the container has been launched
*/
private synchronized void allocateContainer(RMContainer rmContainer,
protected synchronized void allocateContainer(RMContainer rmContainer,
boolean launchedOnNode) {
Container container = rmContainer.getContainer();
if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {

View File

@ -647,7 +647,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
Container reservedContainer, NodeType type,
SchedulerRequestKey schedulerKey) {
if (!reservationExceedsThreshold(node, type)) {
RMContainer nodeReservedContainer = node.getReservedContainer();
boolean reservableForThisApp = nodeReservedContainer == null ||
nodeReservedContainer.getApplicationAttemptId()
.equals(getApplicationAttemptId());
if (reservableForThisApp &&!reservationExceedsThreshold(node, type)) {
LOG.info("Making reservation: node=" + node.getNodeName() +
" app_id=" + getApplicationId());
if (reservedContainer == null) {
@ -1139,7 +1143,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
/**
* Is application starved for fairshare or minshare
*/
private boolean isStarved() {
boolean isStarved() {
return isStarvedForFairShare() || !Resources.isNone(minshareStarvation);
}

View File

@ -113,11 +113,6 @@ class FSPreemptionThread extends Thread {
List<FSSchedulerNode> potentialNodes = scheduler.getNodeTracker()
.getNodesByResourceName(rr.getResourceName());
for (FSSchedulerNode node : potentialNodes) {
// TODO (YARN-5829): Attempt to reserve the node for starved app.
if (isNodeAlreadyReserved(node, starvedApp)) {
continue;
}
int maxAMContainers = bestContainers == null ?
Integer.MAX_VALUE : bestContainers.numAMContainers;
PreemptableContainers preemptableContainers =
@ -134,7 +129,8 @@ class FSPreemptionThread extends Thread {
if (bestContainers != null && bestContainers.containers.size() > 0) {
containersToPreempt.addAll(bestContainers.containers);
trackPreemptionsAgainstNode(bestContainers.containers);
// Reserve the containers for the starved app
trackPreemptionsAgainstNode(bestContainers.containers, starvedApp);
}
}
} // End of iteration over RRs
@ -163,8 +159,10 @@ class FSPreemptionThread extends Thread {
node.getRunningContainersWithAMsAtTheEnd();
containersToCheck.removeAll(node.getContainersForPreemption());
// Initialize potential with unallocated resources
Resource potential = Resources.clone(node.getUnallocatedResource());
// Initialize potential with unallocated but not reserved resources
Resource potential = Resources.subtractFromNonNegative(
Resources.clone(node.getUnallocatedResource()),
node.getTotalReserved());
for (RMContainer container : containersToCheck) {
FSAppAttempt app =
@ -182,8 +180,6 @@ class FSPreemptionThread extends Thread {
// Check if we have already identified enough containers
if (Resources.fitsIn(request, potential)) {
return preemptableContainers;
} else {
// TODO (YARN-5829): Unreserve the node for the starved app.
}
}
return null;
@ -195,10 +191,11 @@ class FSPreemptionThread extends Thread {
return nodeReservedApp != null && !nodeReservedApp.equals(app);
}
private void trackPreemptionsAgainstNode(List<RMContainer> containers) {
private void trackPreemptionsAgainstNode(List<RMContainer> containers,
FSAppAttempt app) {
FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker()
.getNode(containers.get(0).getNodeId());
node.addContainersForPreemption(containers);
node.addContainersForPreemption(containers, app);
}
private void preemptContainers(List<RMContainer> containers) {
@ -232,10 +229,6 @@ class FSPreemptionThread extends Thread {
LOG.info("Killing container " + container);
scheduler.completedContainer(
container, status, RMContainerEventType.KILL);
FSSchedulerNode containerNode = (FSSchedulerNode)
scheduler.getNodeTracker().getNode(container.getAllocatedNode());
containerNode.removeContainerForPreemption(container);
}
}
}

View File

@ -18,18 +18,26 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
@ -38,15 +46,38 @@ import java.util.concurrent.ConcurrentSkipListSet;
public class FSSchedulerNode extends SchedulerNode {
private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class);
private FSAppAttempt reservedAppSchedulable;
private final Set<RMContainer> containersForPreemption =
// Stores list of containers still to be preempted
@VisibleForTesting
final Set<RMContainer> containersForPreemption =
new ConcurrentSkipListSet<>();
// Stores amount of resources preempted and reserved for each app
@VisibleForTesting
final Map<FSAppAttempt, Resource>
resourcesPreemptedForApp = new LinkedHashMap<>();
private final Map<ApplicationAttemptId, FSAppAttempt> appIdToAppMap =
new HashMap<>();
// Sum of resourcesPreemptedForApp values, total resources that are
// slated for preemption
private Resource totalResourcesPreempted = Resource.newInstance(0, 0);
public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
super(node, usePortForNodeName);
}
/**
* Total amount of reserved resources including reservations and preempted
* containers.
* @return total resources reserved
*/
Resource getTotalReserved() {
Resource totalReserved = Resources.clone(getReservedContainer() != null
? getReservedContainer().getAllocatedResource()
: Resource.newInstance(0, 0));
Resources.addTo(totalReserved, totalResourcesPreempted);
return totalReserved;
}
@Override
public synchronized void reserveResource(
SchedulerApplicationAttempt application, SchedulerRequestKey schedulerKey,
@ -109,17 +140,56 @@ public class FSSchedulerNode extends SchedulerNode {
return reservedAppSchedulable;
}
/**
* List reserved resources after preemption and assign them to the
* appropriate applications in a FIFO order.
* @return if any resources were allocated
*/
@VisibleForTesting
synchronized LinkedHashMap<FSAppAttempt, Resource> getPreemptionList() {
cleanupPreemptionList();
return new LinkedHashMap<>(resourcesPreemptedForApp);
}
/**
* Remove apps that have their preemption requests fulfilled.
*/
private synchronized void cleanupPreemptionList() {
Iterator<FSAppAttempt> iterator =
resourcesPreemptedForApp.keySet().iterator();
while (iterator.hasNext()) {
FSAppAttempt app = iterator.next();
if (app.isStopped() || !app.isStarved()) {
// App does not need more resources
Resources.subtractFrom(totalResourcesPreempted,
resourcesPreemptedForApp.get(app));
appIdToAppMap.remove(app.getApplicationAttemptId());
iterator.remove();
}
}
}
/**
* Mark {@code containers} as being considered for preemption so they are
* not considered again. A call to this requires a corresponding call to
* {@link #removeContainerForPreemption} to ensure we do not mark a
* container for preemption and never consider it again and avoid memory
* leaks.
* {@code releaseContainer} to ensure we do not mark a container for
* preemption and never consider it again and avoid memory leaks.
*
* @param containers container to mark
*/
void addContainersForPreemption(Collection<RMContainer> containers) {
containersForPreemption.addAll(containers);
void addContainersForPreemption(Collection<RMContainer> containers,
FSAppAttempt app) {
appIdToAppMap.putIfAbsent(app.getApplicationAttemptId(), app);
resourcesPreemptedForApp.putIfAbsent(app, Resource.newInstance(0, 0));
Resource appReserved = resourcesPreemptedForApp.get(app);
for(RMContainer container : containers) {
containersForPreemption.add(container);
Resources.addTo(appReserved, container.getAllocatedResource());
Resources.addTo(totalResourcesPreempted,
container.getAllocatedResource());
}
}
/**
@ -130,11 +200,50 @@ public class FSSchedulerNode extends SchedulerNode {
}
/**
* Remove container from the set of containers marked for preemption.
*
* @param container container to remove
* The Scheduler has allocated containers on this node to the given
* application.
* @param rmContainer Allocated container
* @param launchedOnNode True if the container has been launched
*/
void removeContainerForPreemption(RMContainer container) {
containersForPreemption.remove(container);
@Override
protected synchronized void allocateContainer(RMContainer rmContainer,
boolean launchedOnNode) {
super.allocateContainer(rmContainer, launchedOnNode);
Resource allocated = rmContainer.getAllocatedResource();
if (!Resources.isNone(allocated)) {
// check for satisfied preemption request and update bookkeeping
FSAppAttempt app =
appIdToAppMap.get(rmContainer.getApplicationAttemptId());
if (app != null) {
Resource reserved = resourcesPreemptedForApp.get(app);
Resource fulfilled = Resources.componentwiseMin(reserved, allocated);
Resources.subtractFrom(reserved, fulfilled);
Resources.subtractFrom(totalResourcesPreempted, fulfilled);
if (Resources.isNone(reserved)) {
// No more preempted containers
resourcesPreemptedForApp.remove(app);
appIdToAppMap.remove(rmContainer.getApplicationAttemptId());
}
}
} else {
LOG.error("Allocated empty container" + rmContainer.getContainerId());
}
}
/**
* Release an allocated container on this node.
* It also releases from the reservation list to trigger preemption
* allocations.
* @param containerId ID of container to be released.
* @param releasedByNode whether the release originates from a node update.
*/
@Override
public synchronized void releaseContainer(ContainerId containerId,
boolean releasedByNode) {
RMContainer container = getContainer(containerId);
super.releaseContainer(containerId, releasedByNode);
if (container != null) {
containersForPreemption.remove(container);
}
}
}

View File

@ -71,9 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -972,6 +970,31 @@ public class FairScheduler extends
}
}
/**
* Assign preempted containers to the applications that have reserved
* resources for preempted containers.
* @param node Node to check
* @return assignment has occurred
*/
static boolean assignPreemptedContainers(FSSchedulerNode node) {
boolean assignedAny = false;
for (Entry<FSAppAttempt, Resource> entry :
node.getPreemptionList().entrySet()) {
FSAppAttempt app = entry.getKey();
Resource preemptionPending = Resources.clone(entry.getValue());
while (!app.isStopped() && !Resources.isNone(preemptionPending)) {
Resource assigned = app.assignContainer(node);
if (Resources.isNone(assigned)) {
// Fail to assign, let's not try further
break;
}
assignedAny = true;
Resources.subtractFromNonNegative(preemptionPending, assigned);
}
}
return assignedAny;
}
@VisibleForTesting
void attemptScheduling(FSSchedulerNode node) {
try {
@ -991,11 +1014,17 @@ public class FairScheduler extends
}
// Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
// 1. Ensure containers are assigned to the apps that preempted
// 2. Check for reserved applications
// 3. Schedule if there are no reservations
boolean validReservation = false;
// Apps may wait for preempted containers
// We have to satisfy these first to avoid cases, when we preempt
// a container for A from B and C gets the preempted containers,
// when C does not qualify for preemption itself.
assignPreemptedContainers(node);
FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
boolean validReservation = false;
if (reservedAppSchedulable != null) {
validReservation = reservedAppSchedulable.assignReservedContainer(node);
}

View File

@ -0,0 +1,403 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Test scheduler node, especially preemption reservations.
*/
public class TestFSSchedulerNode {
private final ArrayList<RMContainer> containers = new ArrayList<>();
private RMNode createNode() {
RMNode node = mock(RMNode.class);
when(node.getTotalCapability()).thenReturn(Resource.newInstance(8192, 8));
when(node.getHostName()).thenReturn("host.domain.com");
return node;
}
private void createDefaultContainer() {
createContainer(Resource.newInstance(1024, 1), null);
}
private RMContainer createContainer(
Resource request, ApplicationAttemptId appAttemptId) {
RMContainer container = mock(RMContainer.class);
Container containerInner = mock(Container.class);
ContainerId id = mock(ContainerId.class);
when(id.getContainerId()).thenReturn((long)containers.size());
when(containerInner.getResource()).
thenReturn(Resources.clone(request));
when(containerInner.getId()).thenReturn(id);
when(containerInner.getExecutionType()).
thenReturn(ExecutionType.GUARANTEED);
when(container.getApplicationAttemptId()).thenReturn(appAttemptId);
when(container.getContainerId()).thenReturn(id);
when(container.getContainer()).thenReturn(containerInner);
when(container.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
when(container.getAllocatedResource()).
thenReturn(Resources.clone(request));
containers.add(container);
return container;
}
private void saturateCluster(FSSchedulerNode schedulerNode) {
while (!Resources.isNone(schedulerNode.getUnallocatedResource())) {
createDefaultContainer();
schedulerNode.allocateContainer(containers.get(containers.size() - 1));
schedulerNode.containerStarted(containers.get(containers.size() - 1).
getContainerId());
}
}
private FSAppAttempt createStarvingApp(FSSchedulerNode schedulerNode,
Resource request) {
FSAppAttempt starvingApp = mock(FSAppAttempt.class);
ApplicationAttemptId appAttemptId =
mock(ApplicationAttemptId.class);
when(starvingApp.getApplicationAttemptId()).thenReturn(appAttemptId);
when(starvingApp.assignContainer(schedulerNode)).thenAnswer(
new Answer<Resource>() {
@Override
public Resource answer(InvocationOnMock invocationOnMock)
throws Throwable {
Resource response = Resource.newInstance(0, 0);
while (!Resources.isNone(request) &&
!Resources.isNone(schedulerNode.getUnallocatedResource())) {
RMContainer container = createContainer(request, appAttemptId);
schedulerNode.allocateContainer(container);
Resources.addTo(response, container.getAllocatedResource());
Resources.subtractFrom(request,
container.getAllocatedResource());
}
return response;
}
});
when(starvingApp.isStarved()).thenAnswer(
new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocationOnMock)
throws Throwable {
return !Resources.isNone(request);
}
}
);
when(starvingApp.getPendingDemand()).thenReturn(request);
return starvingApp;
}
private void finalValidation(FSSchedulerNode schedulerNode) {
assertEquals("Everything should have been released",
Resources.none(), schedulerNode.getAllocatedResource());
assertTrue("No containers should be reserved for preemption",
schedulerNode.containersForPreemption.isEmpty());
assertTrue("No resources should be reserved for preemptors",
schedulerNode.resourcesPreemptedForApp.isEmpty());
assertEquals(
"No amount of resource should be reserved for preemptees",
Resources.none(),
schedulerNode.getTotalReserved());
}
private void allocateContainers(FSSchedulerNode schedulerNode) {
FairScheduler.assignPreemptedContainers(schedulerNode);
}
/**
* Allocate and release a single container.
*/
@Test
public void testSimpleAllocation() {
RMNode node = createNode();
FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
createDefaultContainer();
assertEquals("Nothing should have been allocated, yet",
Resources.none(), schedulerNode.getAllocatedResource());
schedulerNode.allocateContainer(containers.get(0));
assertEquals("Container should be allocated",
containers.get(0).getContainer().getResource(),
schedulerNode.getAllocatedResource());
schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
assertEquals("Everything should have been released",
Resources.none(), schedulerNode.getAllocatedResource());
// Check that we are error prone
schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
finalValidation(schedulerNode);
}
/**
* Allocate and release three containers with launch.
*/
@Test
public void testMultipleAllocations() {
RMNode node = createNode();
FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
createDefaultContainer();
createDefaultContainer();
createDefaultContainer();
assertEquals("Nothing should have been allocated, yet",
Resources.none(), schedulerNode.getAllocatedResource());
schedulerNode.allocateContainer(containers.get(0));
schedulerNode.containerStarted(containers.get(0).getContainerId());
schedulerNode.allocateContainer(containers.get(1));
schedulerNode.containerStarted(containers.get(1).getContainerId());
schedulerNode.allocateContainer(containers.get(2));
assertEquals("Container should be allocated",
Resources.multiply(containers.get(0).getContainer().getResource(), 3.0),
schedulerNode.getAllocatedResource());
schedulerNode.releaseContainer(containers.get(1).getContainerId(), true);
schedulerNode.releaseContainer(containers.get(2).getContainerId(), true);
schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
finalValidation(schedulerNode);
}
/**
* Allocate and release a single container.
*/
@Test
public void testSimplePreemption() {
RMNode node = createNode();
FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
// Launch containers and saturate the cluster
saturateCluster(schedulerNode);
assertEquals("Container should be allocated",
Resources.multiply(containers.get(0).getContainer().getResource(),
containers.size()),
schedulerNode.getAllocatedResource());
// Request preemption
FSAppAttempt starvingApp = createStarvingApp(schedulerNode,
Resource.newInstance(1024, 1));
schedulerNode.addContainersForPreemption(
Collections.singletonList(containers.get(0)), starvingApp);
assertEquals(
"No resource amount should be reserved for preemptees",
containers.get(0).getAllocatedResource(),
schedulerNode.getTotalReserved());
// Preemption occurs release one container
schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
allocateContainers(schedulerNode);
assertEquals("Container should be allocated",
schedulerNode.getTotalResource(),
schedulerNode.getAllocatedResource());
// Release all remaining containers
for (int i = 1; i < containers.size(); ++i) {
schedulerNode.releaseContainer(containers.get(i).getContainerId(), true);
}
finalValidation(schedulerNode);
}
/**
* Allocate and release three containers requested by two apps.
*/
@Test
public void testComplexPreemption() {
RMNode node = createNode();
FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
// Launch containers and saturate the cluster
saturateCluster(schedulerNode);
assertEquals("Container should be allocated",
Resources.multiply(containers.get(0).getContainer().getResource(),
containers.size()),
schedulerNode.getAllocatedResource());
// Preempt a container
FSAppAttempt starvingApp1 = createStarvingApp(schedulerNode,
Resource.newInstance(2048, 2));
FSAppAttempt starvingApp2 = createStarvingApp(schedulerNode,
Resource.newInstance(1024, 1));
// Preemption thread kicks in
schedulerNode.addContainersForPreemption(
Collections.singletonList(containers.get(0)), starvingApp1);
schedulerNode.addContainersForPreemption(
Collections.singletonList(containers.get(1)), starvingApp1);
schedulerNode.addContainersForPreemption(
Collections.singletonList(containers.get(2)), starvingApp2);
// Preemption happens
schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
schedulerNode.releaseContainer(containers.get(2).getContainerId(), true);
schedulerNode.releaseContainer(containers.get(1).getContainerId(), true);
allocateContainers(schedulerNode);
assertEquals("Container should be allocated",
schedulerNode.getTotalResource(),
schedulerNode.getAllocatedResource());
// Release all containers
for (int i = 3; i < containers.size(); ++i) {
schedulerNode.releaseContainer(containers.get(i).getContainerId(), true);
}
finalValidation(schedulerNode);
}
/**
* Allocate and release three containers requested by two apps in two rounds.
*/
@Test
public void testMultiplePreemptionEvents() {
RMNode node = createNode();
FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
// Launch containers and saturate the cluster
saturateCluster(schedulerNode);
assertEquals("Container should be allocated",
Resources.multiply(containers.get(0).getContainer().getResource(),
containers.size()),
schedulerNode.getAllocatedResource());
// Preempt a container
FSAppAttempt starvingApp1 = createStarvingApp(schedulerNode,
Resource.newInstance(2048, 2));
FSAppAttempt starvingApp2 = createStarvingApp(schedulerNode,
Resource.newInstance(1024, 1));
// Preemption thread kicks in
schedulerNode.addContainersForPreemption(
Collections.singletonList(containers.get(0)), starvingApp1);
schedulerNode.addContainersForPreemption(
Collections.singletonList(containers.get(1)), starvingApp1);
schedulerNode.addContainersForPreemption(
Collections.singletonList(containers.get(2)), starvingApp2);
// Preemption happens
schedulerNode.releaseContainer(containers.get(1).getContainerId(), true);
allocateContainers(schedulerNode);
schedulerNode.releaseContainer(containers.get(2).getContainerId(), true);
schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
allocateContainers(schedulerNode);
assertEquals("Container should be allocated",
schedulerNode.getTotalResource(),
schedulerNode.getAllocatedResource());
// Release all containers
for (int i = 3; i < containers.size(); ++i) {
schedulerNode.releaseContainer(containers.get(i).getContainerId(), true);
}
finalValidation(schedulerNode);
}
/**
* Allocate and release a single container and delete the app in between.
*/
@Test
public void testPreemptionToCompletedApp() {
RMNode node = createNode();
FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
// Launch containers and saturate the cluster
saturateCluster(schedulerNode);
assertEquals("Container should be allocated",
Resources.multiply(containers.get(0).getContainer().getResource(),
containers.size()),
schedulerNode.getAllocatedResource());
// Preempt a container
FSAppAttempt starvingApp = createStarvingApp(schedulerNode,
Resource.newInstance(1024, 1));
schedulerNode.addContainersForPreemption(
Collections.singletonList(containers.get(0)), starvingApp);
schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
// Stop the application then try to satisfy the reservation
// and observe that there are still free resources not allocated to
// the deleted app
when(starvingApp.isStopped()).thenReturn(true);
allocateContainers(schedulerNode);
assertNotEquals("Container should be allocated",
schedulerNode.getTotalResource(),
schedulerNode.getAllocatedResource());
// Release all containers
for (int i = 1; i < containers.size(); ++i) {
schedulerNode.releaseContainer(containers.get(i).getContainerId(), true);
}
finalValidation(schedulerNode);
}
/**
* Preempt a bigger container than the preemption request.
*/
@Test
public void testPartialReservedPreemption() {
RMNode node = createNode();
FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
// Launch containers and saturate the cluster
saturateCluster(schedulerNode);
assertEquals("Container should be allocated",
Resources.multiply(containers.get(0).getContainer().getResource(),
containers.size()),
schedulerNode.getAllocatedResource());
// Preempt a container
Resource originalStarvingAppDemand = Resource.newInstance(512, 1);
FSAppAttempt starvingApp = createStarvingApp(schedulerNode,
originalStarvingAppDemand);
schedulerNode.addContainersForPreemption(
Collections.singletonList(containers.get(0)), starvingApp);
// Preemption occurs
schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
// Container partially reassigned
allocateContainers(schedulerNode);
assertEquals("Container should be allocated",
Resources.subtract(schedulerNode.getTotalResource(),
Resource.newInstance(512, 0)),
schedulerNode.getAllocatedResource());
// Cleanup simulating node update
schedulerNode.getPreemptionList();
// Release all containers
for (int i = 1; i < containers.size(); ++i) {
schedulerNode.releaseContainer(containers.get(i).getContainerId(), true);
}
finalValidation(schedulerNode);
}
}

View File

@ -294,11 +294,30 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
8 - 2 * numStarvedAppContainers,
greedyApp.getQueue().getMetrics().getAggregatePreemptedContainers());
// Verify the node is reserved for the starvingApp
for (RMNode rmNode : rmNodes) {
FSSchedulerNode node = (FSSchedulerNode)
scheduler.getNodeTracker().getNode(rmNode.getNodeID());
if (node.getContainersForPreemption().size() > 0) {
assertTrue("node should be reserved for the starvingApp",
node.getPreemptionList().keySet().contains(starvingApp));
}
}
sendEnoughNodeUpdatesToAssignFully();
// Verify the preempted containers are assigned to starvingApp
assertEquals("Starved app is not assigned the right # of containers",
numStarvedAppContainers, starvingApp.getLiveContainers().size());
// Verify the node is not reserved for the starvingApp anymore
for (RMNode rmNode : rmNodes) {
FSSchedulerNode node = (FSSchedulerNode)
scheduler.getNodeTracker().getNode(rmNode.getNodeID());
if (node.getContainersForPreemption().size() > 0) {
assertFalse(node.getPreemptionList().keySet().contains(starvingApp));
}
}
}
private void verifyNoPreemption() throws InterruptedException {