YARN-4519. Potential deadlock of CapacityScheduler between decrease container and assign containers. Contributed Meng Ding

This commit is contained in:
Jian He 2016-01-27 15:38:32 -08:00
parent 47746f65b0
commit 145add1aec
9 changed files with 318 additions and 178 deletions

View File

@ -53,9 +53,10 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -116,41 +117,23 @@ public class RMServerUtils {
}
/**
* Normalize container increase/decrease request, it will normalize and update
* ContainerResourceChangeRequest.targetResource
* Validate increase/decrease request. This function must be called under
* the queue lock to make sure that the access to container resource is
* atomic. Refer to LeafQueue.decreaseContainer() and
* CapacityScheduelr.updateIncreaseRequests()
*
*
* <pre>
* - Throw exception when any other error happens
* </pre>
*/
public static void checkAndNormalizeContainerChangeRequest(
RMContext rmContext, ContainerResourceChangeRequest request,
boolean increase) throws InvalidResourceRequestException {
public static void checkSchedContainerChangeRequest(
SchedContainerChangeRequest request, boolean increase)
throws InvalidResourceRequestException {
RMContext rmContext = request.getRmContext();
ContainerId containerId = request.getContainerId();
ResourceScheduler scheduler = rmContext.getScheduler();
RMContainer rmContainer = scheduler.getRMContainer(containerId);
ResourceCalculator rc = scheduler.getResourceCalculator();
if (null == rmContainer) {
String msg =
"Failed to get rmContainer for "
+ (increase ? "increase" : "decrease")
+ " request, with container-id=" + containerId;
throw new InvalidResourceRequestException(msg);
}
if (rmContainer.getState() != RMContainerState.RUNNING) {
String msg =
"rmContainer's state is not RUNNING, for "
+ (increase ? "increase" : "decrease")
+ " request, with container-id=" + containerId;
throw new InvalidResourceRequestException(msg);
}
Resource targetResource = Resources.normalize(rc, request.getCapability(),
scheduler.getMinimumResourceCapability(),
scheduler.getMaximumResourceCapability(),
scheduler.getMinimumResourceCapability());
RMContainer rmContainer = request.getRMContainer();
Resource targetResource = request.getTargetCapacity();
// Compare targetResource and original resource
Resource originalResource = rmContainer.getAllocatedResource();
@ -182,9 +165,9 @@ public class RMServerUtils {
}
}
RMNode rmNode = rmContext.getRMNodes().get(rmContainer.getAllocatedNode());
// Target resource of the increase request is more than NM can offer
ResourceScheduler scheduler = rmContext.getScheduler();
RMNode rmNode = request.getSchedulerNode().getRMNode();
if (!Resources.fitsIn(scheduler.getResourceCalculator(),
scheduler.getClusterResource(), targetResource,
rmNode.getTotalCapability())) {
@ -193,9 +176,6 @@ public class RMServerUtils {
+ rmNode.getTotalCapability();
throw new InvalidResourceRequestException(msg);
}
// Update normalized target resource
request.setCapability(targetResource);
}
/*
@ -254,6 +234,7 @@ public class RMServerUtils {
}
}
// Sanity check and normalize target resource
private static void validateIncreaseDecreaseRequest(RMContext rmContext,
List<ContainerResourceChangeRequest> requests, Resource maximumAllocation,
boolean increase)
@ -283,8 +264,23 @@ public class RMServerUtils {
+ request.getCapability().getVirtualCores() + ", maxVirtualCores="
+ maximumAllocation.getVirtualCores());
}
checkAndNormalizeContainerChangeRequest(rmContext, request, increase);
ContainerId containerId = request.getContainerId();
ResourceScheduler scheduler = rmContext.getScheduler();
RMContainer rmContainer = scheduler.getRMContainer(containerId);
if (null == rmContainer) {
String msg =
"Failed to get rmContainer for "
+ (increase ? "increase" : "decrease")
+ " request, with container-id=" + containerId;
throw new InvalidResourceRequestException(msg);
}
ResourceCalculator rc = scheduler.getResourceCalculator();
Resource targetResource = Resources.normalize(rc, request.getCapability(),
scheduler.getMinimumResourceCapability(),
scheduler.getMaximumResourceCapability(),
scheduler.getMinimumResourceCapability());
// Update normalized target resource
request.setCapability(targetResource);
}
}

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -55,13 +54,13 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -74,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
@ -605,26 +605,18 @@ public abstract class AbstractYarnScheduler
}
protected void decreaseContainers(
List<SchedContainerChangeRequest> decreaseRequests,
List<ContainerResourceChangeRequest> decreaseRequests,
SchedulerApplicationAttempt attempt) {
for (SchedContainerChangeRequest request : decreaseRequests) {
if (null == decreaseRequests || decreaseRequests.isEmpty()) {
return;
}
// Pre-process decrease requests
List<SchedContainerChangeRequest> schedDecreaseRequests =
createSchedContainerChangeRequests(decreaseRequests, false);
for (SchedContainerChangeRequest request : schedDecreaseRequests) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing decrease request:" + request);
}
boolean hasIncreaseRequest =
attempt.removeIncreaseRequest(request.getNodeId(),
request.getPriority(), request.getContainerId());
if (hasIncreaseRequest) {
if (LOG.isDebugEnabled()) {
LOG.debug("While processing decrease request, found a increase request "
+ "for the same container "
+ request.getContainerId()
+ ", removed the increase request");
}
}
// handle decrease request
decreaseContainer(request, attempt);
}
@ -862,7 +854,7 @@ public abstract class AbstractYarnScheduler
}
/**
* Normalize container increase/decrease request, and return
* Sanity check increase/decrease request, and return
* SchedulerContainerResourceChangeRequest according to given
* ContainerResourceChangeRequest.
*
@ -871,37 +863,34 @@ public abstract class AbstractYarnScheduler
* - Throw exception when any other error happens
* </pre>
*/
private SchedContainerChangeRequest
checkAndNormalizeContainerChangeRequest(
ContainerResourceChangeRequest request, boolean increase)
throws YarnException {
// We have done a check in ApplicationMasterService, but RMContainer status
// / Node resource could change since AMS won't acquire lock of scheduler.
RMServerUtils.checkAndNormalizeContainerChangeRequest(rmContext, request,
increase);
private SchedContainerChangeRequest createSchedContainerChangeRequest(
ContainerResourceChangeRequest request, boolean increase)
throws YarnException {
ContainerId containerId = request.getContainerId();
RMContainer rmContainer = getRMContainer(containerId);
if (null == rmContainer) {
String msg =
"Failed to get rmContainer for "
+ (increase ? "increase" : "decrease")
+ " request, with container-id=" + containerId;
throw new InvalidResourceRequestException(msg);
}
SchedulerNode schedulerNode =
getSchedulerNode(rmContainer.getAllocatedNode());
return new SchedContainerChangeRequest(schedulerNode, rmContainer,
request.getCapability());
return new SchedContainerChangeRequest(
this.rmContext, schedulerNode, rmContainer, request.getCapability());
}
protected List<SchedContainerChangeRequest>
checkAndNormalizeContainerChangeRequests(
createSchedContainerChangeRequests(
List<ContainerResourceChangeRequest> changeRequests,
boolean increase) {
if (null == changeRequests || changeRequests.isEmpty()) {
return Collections.EMPTY_LIST;
}
List<SchedContainerChangeRequest> schedulerChangeRequests =
new ArrayList<SchedContainerChangeRequest>();
for (ContainerResourceChangeRequest r : changeRequests) {
SchedContainerChangeRequest sr = null;
try {
sr = checkAndNormalizeContainerChangeRequest(r, increase);
sr = createSchedContainerChangeRequest(r, increase);
} catch (YarnException e) {
LOG.warn("Error happens when checking increase request, Ignoring.."
+ " exception=", e);
@ -909,7 +898,6 @@ public abstract class AbstractYarnScheduler
}
schedulerChangeRequests.add(sr);
}
return schedulerChangeRequests;
}
}

View File

@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -147,6 +149,18 @@ public class AppSchedulingInfo {
boolean resourceUpdated = false;
for (SchedContainerChangeRequest r : increaseRequests) {
if (r.getRMContainer().getState() != RMContainerState.RUNNING) {
LOG.warn("rmContainer's state is not RUNNING, for increase request with"
+ " container-id=" + r.getContainerId());
continue;
}
try {
RMServerUtils.checkSchedContainerChangeRequest(r, true);
} catch (YarnException e) {
LOG.warn("Error happens when checking increase request, Ignoring.."
+ " exception=", e);
continue;
}
NodeId nodeId = r.getRMContainer().getAllocatedNode();
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
@ -220,7 +234,7 @@ public class AppSchedulingInfo {
if (LOG.isDebugEnabled()) {
LOG.debug("Added increase request:" + request.getContainerId()
+ " delta=" + request.getDeltaCapacity());
+ " delta=" + delta);
}
// update priorities
@ -509,24 +523,20 @@ public class AppSchedulingInfo {
NodeId nodeId = increaseRequest.getNodeId();
Priority priority = increaseRequest.getPriority();
ContainerId containerId = increaseRequest.getContainerId();
Resource deltaCapacity = increaseRequest.getDeltaCapacity();
if (LOG.isDebugEnabled()) {
LOG.debug("allocated increase request : applicationId=" + applicationId
+ " container=" + containerId + " host="
+ increaseRequest.getNodeId() + " user=" + user + " resource="
+ increaseRequest.getDeltaCapacity());
+ deltaCapacity);
}
// Set queue metrics
queue.getMetrics().allocateResources(user,
increaseRequest.getDeltaCapacity());
queue.getMetrics().allocateResources(user, deltaCapacity);
// remove the increase request from pending increase request map
removeIncreaseRequest(nodeId, priority, containerId);
// update usage
appResourceUsage.incUsed(increaseRequest.getNodePartition(),
increaseRequest.getDeltaCapacity());
appResourceUsage.incUsed(increaseRequest.getNodePartition(), deltaCapacity);
}
public synchronized void decreaseContainer(

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -32,18 +33,19 @@ import org.apache.hadoop.yarn.util.resource.Resources;
*/
public class SchedContainerChangeRequest implements
Comparable<SchedContainerChangeRequest> {
RMContainer rmContainer;
Resource targetCapacity;
SchedulerNode schedulerNode;
Resource deltaCapacity;
private RMContext rmContext;
private RMContainer rmContainer;
private Resource targetCapacity;
private SchedulerNode schedulerNode;
private Resource deltaCapacity;
public SchedContainerChangeRequest(SchedulerNode schedulerNode,
public SchedContainerChangeRequest(
RMContext rmContext, SchedulerNode schedulerNode,
RMContainer rmContainer, Resource targetCapacity) {
this.rmContext = rmContext;
this.rmContainer = rmContainer;
this.targetCapacity = targetCapacity;
this.schedulerNode = schedulerNode;
deltaCapacity = Resources.subtract(targetCapacity,
rmContainer.getAllocatedResource());
}
public NodeId getNodeId() {
@ -58,11 +60,19 @@ public class SchedContainerChangeRequest implements
return this.targetCapacity;
}
public RMContext getRmContext() {
return this.rmContext;
}
/**
* Delta capacity = before - target, so if it is a decrease request, delta
* Delta capacity = target - before, so if it is a decrease request, delta
* capacity will be negative
*/
public Resource getDeltaCapacity() {
public synchronized Resource getDeltaCapacity() {
// Only calculate deltaCapacity once
if (deltaCapacity == null) {
deltaCapacity = Resources.subtract(
targetCapacity, rmContainer.getAllocatedResource());
}
return deltaCapacity;
}
@ -112,7 +122,6 @@ public class SchedContainerChangeRequest implements
@Override
public String toString() {
return "<container=" + getContainerId() + ", targetCapacity="
+ targetCapacity + ", delta=" + deltaCapacity + ", node="
+ getNodeId().toString() + ">";
+ targetCapacity + ", node=" + getNodeId().toString() + ">";
}
}

View File

@ -33,6 +33,8 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@ -329,7 +331,7 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
*/
public void decreaseContainer(Resource clusterResource,
SchedContainerChangeRequest decreaseRequest,
FiCaSchedulerApp app);
FiCaSchedulerApp app) throws InvalidResourceRequestException;
/**
* Get valid Node Labels for this queue

View File

@ -21,8 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@ -892,9 +893,36 @@ public class CapacityScheduler extends
}
}
// It is crucial to acquire leaf queue lock first to prevent:
// 1. Race condition when calculating the delta resource in
// SchedContainerChangeRequest
// 2. Deadlock with the scheduling thread.
private LeafQueue updateIncreaseRequests(
List<ContainerResourceChangeRequest> increaseRequests,
FiCaSchedulerApp app) {
if (null == increaseRequests || increaseRequests.isEmpty()) {
return null;
}
// Pre-process increase requests
List<SchedContainerChangeRequest> schedIncreaseRequests =
createSchedContainerChangeRequests(increaseRequests, true);
LeafQueue leafQueue = (LeafQueue) app.getQueue();
synchronized(leafQueue) {
// make sure we aren't stopping/removing the application
// when the allocate comes in
if (app.isStopped()) {
return null;
}
// Process increase resource requests
if (app.updateIncreaseRequests(schedIncreaseRequests)) {
return leafQueue;
}
return null;
}
}
@Override
// Note: when AM asks to decrease container or release container, we will
// acquire scheduler lock
// Note: when AM asks to release container, we will acquire scheduler lock
@Lock(Lock.NoLock.class)
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
List<ResourceRequest> ask, List<ContainerId> release,
@ -907,26 +935,23 @@ public class CapacityScheduler extends
return EMPTY_ALLOCATION;
}
// Sanity check
// Release containers
releaseContainers(release, application);
// update increase requests
LeafQueue updateDemandForQueue =
updateIncreaseRequests(increaseRequests, application);
// Decrease containers
decreaseContainers(decreaseRequests, application);
// Sanity check for new allocation requests
SchedulerUtils.normalizeRequests(
ask, getResourceCalculator(), getClusterResource(),
getMinimumResourceCapability(), getMaximumResourceCapability());
// Pre-process increase requests
List<SchedContainerChangeRequest> normalizedIncreaseRequests =
checkAndNormalizeContainerChangeRequests(increaseRequests, true);
// Pre-process decrease requests
List<SchedContainerChangeRequest> normalizedDecreaseRequests =
checkAndNormalizeContainerChangeRequests(decreaseRequests, false);
// Release containers
releaseContainers(release, application);
Allocation allocation;
LeafQueue updateDemandForQueue = null;
synchronized (application) {
// make sure we aren't stopping/removing the application
@ -944,7 +969,8 @@ public class CapacityScheduler extends
}
// Update application requests
if (application.updateResourceRequests(ask)) {
if (application.updateResourceRequests(ask)
&& (updateDemandForQueue == null)) {
updateDemandForQueue = (LeafQueue) application.getQueue();
}
@ -954,12 +980,6 @@ public class CapacityScheduler extends
}
}
// Process increase resource requests
if (application.updateIncreaseRequests(normalizedIncreaseRequests)
&& (updateDemandForQueue == null)) {
updateDemandForQueue = (LeafQueue) application.getQueue();
}
if (application.isWaitingForAMContainer()) {
// Allocate is for AM and update AM blacklist for this
application.updateAMBlacklist(
@ -968,8 +988,6 @@ public class CapacityScheduler extends
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
}
// Decrease containers
decreaseContainers(normalizedDecreaseRequests, application);
allocation = application.getAllocation(getResourceCalculator(),
clusterResource, getMinimumResourceCapability());
@ -1164,7 +1182,8 @@ public class CapacityScheduler extends
.getAssignmentInformation().getReserved());
}
private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
@VisibleForTesting
protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
if (rmContext.isWorkPreservingRecoveryEnabled()
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
return;
@ -1514,48 +1533,30 @@ public class CapacityScheduler extends
}
}
@Lock(CapacityScheduler.class)
@Override
protected synchronized void decreaseContainer(
SchedContainerChangeRequest decreaseRequest,
protected void decreaseContainer(SchedContainerChangeRequest decreaseRequest,
SchedulerApplicationAttempt attempt) {
RMContainer rmContainer = decreaseRequest.getRMContainer();
// Check container status before doing decrease
if (rmContainer.getState() != RMContainerState.RUNNING) {
LOG.info("Trying to decrease a container not in RUNNING state, container="
+ rmContainer + " state=" + rmContainer.getState().name());
return;
}
// Delta capacity of this decrease request is 0, this decrease request may
// just to cancel increase request
if (Resources.equals(decreaseRequest.getDeltaCapacity(), Resources.none())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Decrease target resource equals to existing resource for container:"
+ decreaseRequest.getContainerId()
+ " ignore this decrease request.");
}
return;
}
// Save resource before decrease
Resource resourceBeforeDecrease =
Resources.clone(rmContainer.getContainer().getResource());
FiCaSchedulerApp app = (FiCaSchedulerApp)attempt;
LeafQueue queue = (LeafQueue) attempt.getQueue();
queue.decreaseContainer(clusterResource, decreaseRequest, app);
// Notify RMNode the container will be decreased
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeDecreaseContainerEvent(decreaseRequest.getNodeId(),
Arrays.asList(rmContainer.getContainer())));
LOG.info("Application attempt " + app.getApplicationAttemptId()
+ " decreased container:" + decreaseRequest.getContainerId() + " from "
+ resourceBeforeDecrease + " to "
+ decreaseRequest.getTargetCapacity());
try {
queue.decreaseContainer(clusterResource, decreaseRequest, app);
// Notify RMNode that the container can be pulled by NodeManager in the
// next heartbeat
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeDecreaseContainerEvent(
decreaseRequest.getNodeId(),
Collections.singletonList(rmContainer.getContainer())));
} catch (InvalidResourceRequestException e) {
LOG.warn("Error happens when checking decrease request, Ignoring.."
+ " exception=", e);
}
}
@Lock(Lock.NoLock.class)

View File

@ -47,10 +47,12 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@ -1659,10 +1661,16 @@ public class LeafQueue extends AbstractCSQueue {
return defaultAppPriorityPerQueue;
}
/**
*
* @param clusterResource Total cluster resource
* @param decreaseRequest The decrease request
* @param app The application of interest
*/
@Override
public void decreaseContainer(Resource clusterResource,
SchedContainerChangeRequest decreaseRequest,
FiCaSchedulerApp app) {
FiCaSchedulerApp app) throws InvalidResourceRequestException {
// If the container being decreased is reserved, we need to unreserve it
// first.
RMContainer rmContainer = decreaseRequest.getRMContainer();
@ -1670,25 +1678,62 @@ public class LeafQueue extends AbstractCSQueue {
unreserveIncreasedContainer(clusterResource, app,
(FiCaSchedulerNode)decreaseRequest.getSchedulerNode(), rmContainer);
}
// Delta capacity is negative when it's a decrease request
Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
boolean resourceDecreased = false;
Resource resourceBeforeDecrease;
// Grab queue lock to avoid race condition when getting container resource
synchronized (this) {
// Delta is negative when it's a decrease request
releaseResource(clusterResource, app, absDelta,
decreaseRequest.getNodePartition(), decreaseRequest.getRMContainer(),
true);
// Notify application
app.decreaseContainer(decreaseRequest);
// Notify node
decreaseRequest.getSchedulerNode()
.decreaseContainer(decreaseRequest.getContainerId(), absDelta);
// Make sure the decrease request is valid in terms of current resource
// and target resource. This must be done under the leaf queue lock.
// Throws exception if the check fails.
RMServerUtils.checkSchedContainerChangeRequest(decreaseRequest, false);
// Save resource before decrease for debug log
resourceBeforeDecrease =
Resources.clone(rmContainer.getAllocatedResource());
// Do we have increase request for the same container? If so, remove it
boolean hasIncreaseRequest =
app.removeIncreaseRequest(decreaseRequest.getNodeId(),
decreaseRequest.getPriority(), decreaseRequest.getContainerId());
if (hasIncreaseRequest) {
if (LOG.isDebugEnabled()) {
LOG.debug("While processing decrease requests, found an increase"
+ " request for the same container "
+ decreaseRequest.getContainerId()
+ ", removed the increase request");
}
}
// Delta capacity is negative when it's a decrease request
Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
if (Resources.equals(absDelta, Resources.none())) {
// If delta capacity of this decrease request is 0, this decrease
// request serves the purpose of cancelling an existing increase request
// if any
if (LOG.isDebugEnabled()) {
LOG.debug("Decrease target resource equals to existing resource for"
+ " container:" + decreaseRequest.getContainerId()
+ " ignore this decrease request.");
}
} else {
// Release the delta resource
releaseResource(clusterResource, app, absDelta,
decreaseRequest.getNodePartition(),
decreaseRequest.getRMContainer(),
true);
// Notify application
app.decreaseContainer(decreaseRequest);
// Notify node
decreaseRequest.getSchedulerNode()
.decreaseContainer(decreaseRequest.getContainerId(), absDelta);
resourceDecreased = true;
}
}
// Notify parent
if (getParent() != null) {
if (resourceDecreased) {
// Notify parent queue outside of leaf queue lock
getParent().decreaseContainer(clusterResource, decreaseRequest, app);
LOG.info("Application attempt " + app.getApplicationAttemptId()
+ " decreased container:" + decreaseRequest.getContainerId()
+ " from " + resourceBeforeDecrease + " to "
+ decreaseRequest.getTargetCapacity());
}
}

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessType;
@ -656,7 +657,8 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public void decreaseContainer(Resource clusterResource,
SchedContainerChangeRequest decreaseRequest, FiCaSchedulerApp app) {
SchedContainerChangeRequest decreaseRequest, FiCaSchedulerApp app)
throws InvalidResourceRequestException {
// delta capacity is negative when it's a decrease request
Resource absDeltaCapacity =
Resources.negate(decreaseRequest.getDeltaCapacity());

View File

@ -21,8 +21,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -47,8 +50,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -57,12 +64,48 @@ import org.junit.Before;
import org.junit.Test;
public class TestContainerResizing {
private static final Log LOG = LogFactory.getLog(TestContainerResizing.class);
private final int GB = 1024;
private YarnConfiguration conf;
RMNodeLabelsManager mgr;
class MyScheduler extends CapacityScheduler {
/*
* A Mock Scheduler to simulate the potential effect of deadlock between:
* 1. The AbstractYarnScheduler.decreaseContainers() call (from
* ApplicationMasterService thread)
* 2. The CapacityScheduler.allocateContainersToNode() call (from the
* scheduler thread)
*/
MyScheduler() {
super();
}
@Override
protected void decreaseContainers(
List<ContainerResourceChangeRequest> decreaseRequests,
SchedulerApplicationAttempt attempt) {
try {
Thread.sleep(1000);
} catch(InterruptedException e) {
LOG.debug("Thread interrupted.");
}
super.decreaseContainers(decreaseRequests, attempt);
}
@Override
public synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
try {
Thread.sleep(1000);
} catch(InterruptedException e) {
LOG.debug("Thread interrupted.");
}
super.allocateContainersToNode(node);
}
}
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
@ -958,6 +1001,50 @@ public class TestContainerResizing {
rm1.close();
}
@Test (timeout = 60000)
public void testDecreaseContainerWillNotDeadlockContainerAllocation()
throws Exception {
// create and start MockRM with our MyScheduler
MockRM rm = new MockRM() {
@Override
public ResourceScheduler createScheduler() {
CapacityScheduler cs = new MyScheduler();
cs.setConf(conf);
return cs;
}
};
rm.start();
// register a node
MockNM nm = rm.registerNode("h1:1234", 20 * GB);
// submit an application -> app1
RMApp app1 = rm.submitApp(3 * GB, "app", "user", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
// making sure resource is allocated
checkUsedResource(rm, "default", 3 * GB, null);
FiCaSchedulerApp app = getFiCaSchedulerApp(rm, app1.getApplicationId());
Assert.assertEquals(3 * GB,
app.getAppAttemptResourceUsage().getUsed().getMemory());
// making sure container is launched
ContainerId containerId1 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
sentRMContainerLaunched(rm, containerId1);
// submit allocation request for a new container
am1.allocate(Collections.singletonList(ResourceRequest.newInstance(
Priority.newInstance(1), "*", Resources.createResource(2 * GB), 1)),
null);
// nm reports status update and triggers container allocation
nm.nodeHeartbeat(true);
// *In the mean time*, am1 asks to decrease its AM container resource from
// 3GB to 1GB
AllocateResponse response = am1.sendContainerResizingRequest(null,
Collections.singletonList(ContainerResourceChangeRequest
.newInstance(containerId1, Resources.createResource(GB))));
// verify that the containe resource is decreased
verifyContainerDecreased(response, containerId1, GB);
rm.close();
}
private void checkPendingResource(MockRM rm, String queueName, int memory,
String label) {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();