diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index cf42057cce1..865cd09503a 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -112,6 +112,9 @@ Release 2.9.0 - UNRELEASED
YARN-4633. Fix random test failure in TestRMRestart#testRMRestartAfterPreemption
(Bibin A Chundatt via rohithsharmaks)
+ YARN-4519. Potential deadlock of CapacityScheduler between decrease container
+ and assign containers. (Meng Ding via jianhe)
+
Release 2.8.0 - UNRELEASED
INCOMPATIBLE CHANGES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index cc305931dcb..e19d55ee81d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -53,9 +53,10 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -114,43 +115,25 @@ public class RMServerUtils {
queueName, scheduler, rmContext, queueInfo);
}
}
-
+
/**
- * Normalize container increase/decrease request, it will normalize and update
- * ContainerResourceChangeRequest.targetResource
+ * Validate increase/decrease request. This function must be called under
+ * the queue lock to make sure that the access to container resource is
+ * atomic. Refer to LeafQueue.decreaseContainer() and
+ * CapacityScheduelr.updateIncreaseRequests()
+ *
*
*
* - Throw exception when any other error happens
*
*/
- public static void checkAndNormalizeContainerChangeRequest(
- RMContext rmContext, ContainerResourceChangeRequest request,
- boolean increase) throws InvalidResourceRequestException {
+ public static void checkSchedContainerChangeRequest(
+ SchedContainerChangeRequest request, boolean increase)
+ throws InvalidResourceRequestException {
+ RMContext rmContext = request.getRmContext();
ContainerId containerId = request.getContainerId();
- ResourceScheduler scheduler = rmContext.getScheduler();
- RMContainer rmContainer = scheduler.getRMContainer(containerId);
- ResourceCalculator rc = scheduler.getResourceCalculator();
-
- if (null == rmContainer) {
- String msg =
- "Failed to get rmContainer for "
- + (increase ? "increase" : "decrease")
- + " request, with container-id=" + containerId;
- throw new InvalidResourceRequestException(msg);
- }
-
- if (rmContainer.getState() != RMContainerState.RUNNING) {
- String msg =
- "rmContainer's state is not RUNNING, for "
- + (increase ? "increase" : "decrease")
- + " request, with container-id=" + containerId;
- throw new InvalidResourceRequestException(msg);
- }
-
- Resource targetResource = Resources.normalize(rc, request.getCapability(),
- scheduler.getMinimumResourceCapability(),
- scheduler.getMaximumResourceCapability(),
- scheduler.getMinimumResourceCapability());
+ RMContainer rmContainer = request.getRMContainer();
+ Resource targetResource = request.getTargetCapacity();
// Compare targetResource and original resource
Resource originalResource = rmContainer.getAllocatedResource();
@@ -181,10 +164,10 @@ public class RMServerUtils {
throw new InvalidResourceRequestException(msg);
}
}
-
- RMNode rmNode = rmContext.getRMNodes().get(rmContainer.getAllocatedNode());
-
+
// Target resource of the increase request is more than NM can offer
+ ResourceScheduler scheduler = rmContext.getScheduler();
+ RMNode rmNode = request.getSchedulerNode().getRMNode();
if (!Resources.fitsIn(scheduler.getResourceCalculator(),
scheduler.getClusterResource(), targetResource,
rmNode.getTotalCapability())) {
@@ -193,9 +176,6 @@ public class RMServerUtils {
+ rmNode.getTotalCapability();
throw new InvalidResourceRequestException(msg);
}
-
- // Update normalized target resource
- request.setCapability(targetResource);
}
/*
@@ -253,7 +233,8 @@ public class RMServerUtils {
}
}
}
-
+
+ // Sanity check and normalize target resource
private static void validateIncreaseDecreaseRequest(RMContext rmContext,
List requests, Resource maximumAllocation,
boolean increase)
@@ -283,8 +264,23 @@ public class RMServerUtils {
+ request.getCapability().getVirtualCores() + ", maxVirtualCores="
+ maximumAllocation.getVirtualCores());
}
-
- checkAndNormalizeContainerChangeRequest(rmContext, request, increase);
+ ContainerId containerId = request.getContainerId();
+ ResourceScheduler scheduler = rmContext.getScheduler();
+ RMContainer rmContainer = scheduler.getRMContainer(containerId);
+ if (null == rmContainer) {
+ String msg =
+ "Failed to get rmContainer for "
+ + (increase ? "increase" : "decrease")
+ + " request, with container-id=" + containerId;
+ throw new InvalidResourceRequestException(msg);
+ }
+ ResourceCalculator rc = scheduler.getResourceCalculator();
+ Resource targetResource = Resources.normalize(rc, request.getCapability(),
+ scheduler.getMinimumResourceCapability(),
+ scheduler.getMaximumResourceCapability(),
+ scheduler.getMinimumResourceCapability());
+ // Update normalized target resource
+ request.setCapability(targetResource);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 41a04f23da7..27d4f91bc64 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -55,13 +54,13 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -74,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
+
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
@@ -618,28 +618,20 @@ public abstract class AbstractYarnScheduler
SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
}
}
-
+
protected void decreaseContainers(
- List decreaseRequests,
+ List decreaseRequests,
SchedulerApplicationAttempt attempt) {
- for (SchedContainerChangeRequest request : decreaseRequests) {
+ if (null == decreaseRequests || decreaseRequests.isEmpty()) {
+ return;
+ }
+ // Pre-process decrease requests
+ List schedDecreaseRequests =
+ createSchedContainerChangeRequests(decreaseRequests, false);
+ for (SchedContainerChangeRequest request : schedDecreaseRequests) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing decrease request:" + request);
}
-
- boolean hasIncreaseRequest =
- attempt.removeIncreaseRequest(request.getNodeId(),
- request.getPriority(), request.getContainerId());
-
- if (hasIncreaseRequest) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("While processing decrease request, found a increase request "
- + "for the same container "
- + request.getContainerId()
- + ", removed the increase request");
- }
- }
-
// handle decrease request
decreaseContainer(request, attempt);
}
@@ -877,7 +869,7 @@ public abstract class AbstractYarnScheduler
}
/**
- * Normalize container increase/decrease request, and return
+ * Sanity check increase/decrease request, and return
* SchedulerContainerResourceChangeRequest according to given
* ContainerResourceChangeRequest.
*
@@ -886,37 +878,34 @@ public abstract class AbstractYarnScheduler
* - Throw exception when any other error happens
*
*/
- private SchedContainerChangeRequest
- checkAndNormalizeContainerChangeRequest(
- ContainerResourceChangeRequest request, boolean increase)
- throws YarnException {
- // We have done a check in ApplicationMasterService, but RMContainer status
- // / Node resource could change since AMS won't acquire lock of scheduler.
- RMServerUtils.checkAndNormalizeContainerChangeRequest(rmContext, request,
- increase);
+ private SchedContainerChangeRequest createSchedContainerChangeRequest(
+ ContainerResourceChangeRequest request, boolean increase)
+ throws YarnException {
ContainerId containerId = request.getContainerId();
RMContainer rmContainer = getRMContainer(containerId);
+ if (null == rmContainer) {
+ String msg =
+ "Failed to get rmContainer for "
+ + (increase ? "increase" : "decrease")
+ + " request, with container-id=" + containerId;
+ throw new InvalidResourceRequestException(msg);
+ }
SchedulerNode schedulerNode =
getSchedulerNode(rmContainer.getAllocatedNode());
-
- return new SchedContainerChangeRequest(schedulerNode, rmContainer,
- request.getCapability());
+ return new SchedContainerChangeRequest(
+ this.rmContext, schedulerNode, rmContainer, request.getCapability());
}
protected List
- checkAndNormalizeContainerChangeRequests(
+ createSchedContainerChangeRequests(
List changeRequests,
boolean increase) {
- if (null == changeRequests || changeRequests.isEmpty()) {
- return Collections.EMPTY_LIST;
- }
-
List schedulerChangeRequests =
new ArrayList();
for (ContainerResourceChangeRequest r : changeRequests) {
SchedContainerChangeRequest sr = null;
try {
- sr = checkAndNormalizeContainerChangeRequest(r, increase);
+ sr = createSchedContainerChangeRequest(r, increase);
} catch (YarnException e) {
LOG.warn("Error happens when checking increase request, Ignoring.."
+ " exception=", e);
@@ -924,7 +913,6 @@ public abstract class AbstractYarnScheduler
}
schedulerChangeRequests.add(sr);
}
-
return schedulerChangeRequests;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 07f3d8baae9..a61001e8bea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -148,6 +150,18 @@ public class AppSchedulingInfo {
boolean resourceUpdated = false;
for (SchedContainerChangeRequest r : increaseRequests) {
+ if (r.getRMContainer().getState() != RMContainerState.RUNNING) {
+ LOG.warn("rmContainer's state is not RUNNING, for increase request with"
+ + " container-id=" + r.getContainerId());
+ continue;
+ }
+ try {
+ RMServerUtils.checkSchedContainerChangeRequest(r, true);
+ } catch (YarnException e) {
+ LOG.warn("Error happens when checking increase request, Ignoring.."
+ + " exception=", e);
+ continue;
+ }
NodeId nodeId = r.getRMContainer().getAllocatedNode();
Map> requestsOnNode =
@@ -221,7 +235,7 @@ public class AppSchedulingInfo {
if (LOG.isDebugEnabled()) {
LOG.debug("Added increase request:" + request.getContainerId()
- + " delta=" + request.getDeltaCapacity());
+ + " delta=" + delta);
}
// update priorities
@@ -520,24 +534,20 @@ public class AppSchedulingInfo {
NodeId nodeId = increaseRequest.getNodeId();
Priority priority = increaseRequest.getPriority();
ContainerId containerId = increaseRequest.getContainerId();
-
+ Resource deltaCapacity = increaseRequest.getDeltaCapacity();
+
if (LOG.isDebugEnabled()) {
LOG.debug("allocated increase request : applicationId=" + applicationId
+ " container=" + containerId + " host="
+ increaseRequest.getNodeId() + " user=" + user + " resource="
- + increaseRequest.getDeltaCapacity());
+ + deltaCapacity);
}
-
// Set queue metrics
- queue.getMetrics().allocateResources(user,
- increaseRequest.getDeltaCapacity());
-
+ queue.getMetrics().allocateResources(user, deltaCapacity);
// remove the increase request from pending increase request map
removeIncreaseRequest(nodeId, priority, containerId);
-
// update usage
- appResourceUsage.incUsed(increaseRequest.getNodePartition(),
- increaseRequest.getDeltaCapacity());
+ appResourceUsage.incUsed(increaseRequest.getNodePartition(), deltaCapacity);
}
public synchronized void decreaseContainer(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java
index ea109fddf1c..e4ab3a28d23 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -32,18 +33,19 @@ import org.apache.hadoop.yarn.util.resource.Resources;
*/
public class SchedContainerChangeRequest implements
Comparable {
- RMContainer rmContainer;
- Resource targetCapacity;
- SchedulerNode schedulerNode;
- Resource deltaCapacity;
+ private RMContext rmContext;
+ private RMContainer rmContainer;
+ private Resource targetCapacity;
+ private SchedulerNode schedulerNode;
+ private Resource deltaCapacity;
- public SchedContainerChangeRequest(SchedulerNode schedulerNode,
+ public SchedContainerChangeRequest(
+ RMContext rmContext, SchedulerNode schedulerNode,
RMContainer rmContainer, Resource targetCapacity) {
+ this.rmContext = rmContext;
this.rmContainer = rmContainer;
this.targetCapacity = targetCapacity;
this.schedulerNode = schedulerNode;
- deltaCapacity = Resources.subtract(targetCapacity,
- rmContainer.getAllocatedResource());
}
public NodeId getNodeId() {
@@ -58,11 +60,19 @@ public class SchedContainerChangeRequest implements
return this.targetCapacity;
}
+ public RMContext getRmContext() {
+ return this.rmContext;
+ }
/**
- * Delta capacity = before - target, so if it is a decrease request, delta
+ * Delta capacity = target - before, so if it is a decrease request, delta
* capacity will be negative
*/
- public Resource getDeltaCapacity() {
+ public synchronized Resource getDeltaCapacity() {
+ // Only calculate deltaCapacity once
+ if (deltaCapacity == null) {
+ deltaCapacity = Resources.subtract(
+ targetCapacity, rmContainer.getAllocatedResource());
+ }
return deltaCapacity;
}
@@ -81,7 +91,7 @@ public class SchedContainerChangeRequest implements
public SchedulerNode getSchedulerNode() {
return schedulerNode;
}
-
+
@Override
public int hashCode() {
return (getContainerId().hashCode() << 16) + targetCapacity.hashCode();
@@ -112,7 +122,6 @@ public class SchedContainerChangeRequest implements
@Override
public String toString() {
return "";
+ + targetCapacity + ", node=" + getNodeId().toString() + ">";
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 6ffba024391..daf77903fe5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -332,7 +333,7 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
*/
public void decreaseContainer(Resource clusterResource,
SchedContainerChangeRequest decreaseRequest,
- FiCaSchedulerApp app);
+ FiCaSchedulerApp app) throws InvalidResourceRequestException;
/**
* Get valid Node Labels for this queue
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index e773384d895..dcb60fcc5ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -21,8 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@@ -895,9 +896,36 @@ public class CapacityScheduler extends
}
}
+ // It is crucial to acquire leaf queue lock first to prevent:
+ // 1. Race condition when calculating the delta resource in
+ // SchedContainerChangeRequest
+ // 2. Deadlock with the scheduling thread.
+ private LeafQueue updateIncreaseRequests(
+ List increaseRequests,
+ FiCaSchedulerApp app) {
+ if (null == increaseRequests || increaseRequests.isEmpty()) {
+ return null;
+ }
+ // Pre-process increase requests
+ List schedIncreaseRequests =
+ createSchedContainerChangeRequests(increaseRequests, true);
+ LeafQueue leafQueue = (LeafQueue) app.getQueue();
+ synchronized(leafQueue) {
+ // make sure we aren't stopping/removing the application
+ // when the allocate comes in
+ if (app.isStopped()) {
+ return null;
+ }
+ // Process increase resource requests
+ if (app.updateIncreaseRequests(schedIncreaseRequests)) {
+ return leafQueue;
+ }
+ return null;
+ }
+ }
+
@Override
- // Note: when AM asks to decrease container or release container, we will
- // acquire scheduler lock
+ // Note: when AM asks to release container, we will acquire scheduler lock
@Lock(Lock.NoLock.class)
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
List ask, List release,
@@ -909,26 +937,23 @@ public class CapacityScheduler extends
if (application == null) {
return EMPTY_ALLOCATION;
}
-
- // Sanity check
- SchedulerUtils.normalizeRequests(
- ask, getResourceCalculator(), getClusterResource(),
- getMinimumResourceCapability(), getMaximumResourceCapability());
-
- // Pre-process increase requests
- List normalizedIncreaseRequests =
- checkAndNormalizeContainerChangeRequests(increaseRequests, true);
-
- // Pre-process decrease requests
- List normalizedDecreaseRequests =
- checkAndNormalizeContainerChangeRequests(decreaseRequests, false);
// Release containers
releaseContainers(release, application);
- Allocation allocation;
+ // update increase requests
+ LeafQueue updateDemandForQueue =
+ updateIncreaseRequests(increaseRequests, application);
- LeafQueue updateDemandForQueue = null;
+ // Decrease containers
+ decreaseContainers(decreaseRequests, application);
+
+ // Sanity check for new allocation requests
+ SchedulerUtils.normalizeRequests(
+ ask, getResourceCalculator(), getClusterResource(),
+ getMinimumResourceCapability(), getMaximumResourceCapability());
+
+ Allocation allocation;
synchronized (application) {
@@ -947,7 +972,8 @@ public class CapacityScheduler extends
}
// Update application requests
- if (application.updateResourceRequests(ask)) {
+ if (application.updateResourceRequests(ask)
+ && (updateDemandForQueue == null)) {
updateDemandForQueue = (LeafQueue) application.getQueue();
}
@@ -957,12 +983,6 @@ public class CapacityScheduler extends
}
}
- // Process increase resource requests
- if (application.updateIncreaseRequests(normalizedIncreaseRequests)
- && (updateDemandForQueue == null)) {
- updateDemandForQueue = (LeafQueue) application.getQueue();
- }
-
if (application.isWaitingForAMContainer()) {
// Allocate is for AM and update AM blacklist for this
application.updateAMBlacklist(
@@ -971,8 +991,6 @@ public class CapacityScheduler extends
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
}
- // Decrease containers
- decreaseContainers(normalizedDecreaseRequests, application);
allocation = application.getAllocation(getResourceCalculator(),
clusterResource, getMinimumResourceCapability());
@@ -1167,7 +1185,8 @@ public class CapacityScheduler extends
.getAssignmentInformation().getReserved());
}
- private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
+ @VisibleForTesting
+ protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
if (rmContext.isWorkPreservingRecoveryEnabled()
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
return;
@@ -1517,48 +1536,30 @@ public class CapacityScheduler extends
}
}
- @Lock(CapacityScheduler.class)
@Override
- protected synchronized void decreaseContainer(
- SchedContainerChangeRequest decreaseRequest,
+ protected void decreaseContainer(SchedContainerChangeRequest decreaseRequest,
SchedulerApplicationAttempt attempt) {
RMContainer rmContainer = decreaseRequest.getRMContainer();
-
// Check container status before doing decrease
if (rmContainer.getState() != RMContainerState.RUNNING) {
LOG.info("Trying to decrease a container not in RUNNING state, container="
+ rmContainer + " state=" + rmContainer.getState().name());
return;
}
-
- // Delta capacity of this decrease request is 0, this decrease request may
- // just to cancel increase request
- if (Resources.equals(decreaseRequest.getDeltaCapacity(), Resources.none())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Decrease target resource equals to existing resource for container:"
- + decreaseRequest.getContainerId()
- + " ignore this decrease request.");
- }
- return;
- }
-
- // Save resource before decrease
- Resource resourceBeforeDecrease =
- Resources.clone(rmContainer.getContainer().getResource());
-
FiCaSchedulerApp app = (FiCaSchedulerApp)attempt;
LeafQueue queue = (LeafQueue) attempt.getQueue();
- queue.decreaseContainer(clusterResource, decreaseRequest, app);
-
- // Notify RMNode the container will be decreased
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMNodeDecreaseContainerEvent(decreaseRequest.getNodeId(),
- Arrays.asList(rmContainer.getContainer())));
-
- LOG.info("Application attempt " + app.getApplicationAttemptId()
- + " decreased container:" + decreaseRequest.getContainerId() + " from "
- + resourceBeforeDecrease + " to "
- + decreaseRequest.getTargetCapacity());
+ try {
+ queue.decreaseContainer(clusterResource, decreaseRequest, app);
+ // Notify RMNode that the container can be pulled by NodeManager in the
+ // next heartbeat
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMNodeDecreaseContainerEvent(
+ decreaseRequest.getNodeId(),
+ Collections.singletonList(rmContainer.getContainer())));
+ } catch (InvalidResourceRequestException e) {
+ LOG.warn("Error happens when checking decrease request, Ignoring.."
+ + " exception=", e);
+ }
}
@Lock(Lock.NoLock.class)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 9e64b42ff9a..56e450247d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -47,10 +47,12 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -1676,11 +1678,17 @@ public class LeafQueue extends AbstractCSQueue {
public Priority getDefaultApplicationPriority() {
return defaultAppPriorityPerQueue;
}
-
+
+ /**
+ *
+ * @param clusterResource Total cluster resource
+ * @param decreaseRequest The decrease request
+ * @param app The application of interest
+ */
@Override
public void decreaseContainer(Resource clusterResource,
SchedContainerChangeRequest decreaseRequest,
- FiCaSchedulerApp app) {
+ FiCaSchedulerApp app) throws InvalidResourceRequestException {
// If the container being decreased is reserved, we need to unreserve it
// first.
RMContainer rmContainer = decreaseRequest.getRMContainer();
@@ -1688,25 +1696,62 @@ public class LeafQueue extends AbstractCSQueue {
unreserveIncreasedContainer(clusterResource, app,
(FiCaSchedulerNode)decreaseRequest.getSchedulerNode(), rmContainer);
}
-
- // Delta capacity is negative when it's a decrease request
- Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
-
+ boolean resourceDecreased = false;
+ Resource resourceBeforeDecrease;
+ // Grab queue lock to avoid race condition when getting container resource
synchronized (this) {
- // Delta is negative when it's a decrease request
- releaseResource(clusterResource, app, absDelta,
- decreaseRequest.getNodePartition(), decreaseRequest.getRMContainer(),
- true);
- // Notify application
- app.decreaseContainer(decreaseRequest);
- // Notify node
- decreaseRequest.getSchedulerNode()
- .decreaseContainer(decreaseRequest.getContainerId(), absDelta);
+ // Make sure the decrease request is valid in terms of current resource
+ // and target resource. This must be done under the leaf queue lock.
+ // Throws exception if the check fails.
+ RMServerUtils.checkSchedContainerChangeRequest(decreaseRequest, false);
+ // Save resource before decrease for debug log
+ resourceBeforeDecrease =
+ Resources.clone(rmContainer.getAllocatedResource());
+ // Do we have increase request for the same container? If so, remove it
+ boolean hasIncreaseRequest =
+ app.removeIncreaseRequest(decreaseRequest.getNodeId(),
+ decreaseRequest.getPriority(), decreaseRequest.getContainerId());
+ if (hasIncreaseRequest) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("While processing decrease requests, found an increase"
+ + " request for the same container "
+ + decreaseRequest.getContainerId()
+ + ", removed the increase request");
+ }
+ }
+ // Delta capacity is negative when it's a decrease request
+ Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
+ if (Resources.equals(absDelta, Resources.none())) {
+ // If delta capacity of this decrease request is 0, this decrease
+ // request serves the purpose of cancelling an existing increase request
+ // if any
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Decrease target resource equals to existing resource for"
+ + " container:" + decreaseRequest.getContainerId()
+ + " ignore this decrease request.");
+ }
+ } else {
+ // Release the delta resource
+ releaseResource(clusterResource, app, absDelta,
+ decreaseRequest.getNodePartition(),
+ decreaseRequest.getRMContainer(),
+ true);
+ // Notify application
+ app.decreaseContainer(decreaseRequest);
+ // Notify node
+ decreaseRequest.getSchedulerNode()
+ .decreaseContainer(decreaseRequest.getContainerId(), absDelta);
+ resourceDecreased = true;
+ }
}
- // Notify parent
- if (getParent() != null) {
+ if (resourceDecreased) {
+ // Notify parent queue outside of leaf queue lock
getParent().decreaseContainer(clusterResource, decreaseRequest, app);
+ LOG.info("Application attempt " + app.getApplicationAttemptId()
+ + " decreased container:" + decreaseRequest.getContainerId()
+ + " from " + resourceBeforeDecrease + " to "
+ + decreaseRequest.getTargetCapacity());
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index badab723af0..a7d8796eb12 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessType;
@@ -656,7 +657,8 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public void decreaseContainer(Resource clusterResource,
- SchedContainerChangeRequest decreaseRequest, FiCaSchedulerApp app) {
+ SchedContainerChangeRequest decreaseRequest, FiCaSchedulerApp app)
+ throws InvalidResourceRequestException {
// delta capacity is negative when it's a decrease request
Resource absDeltaCapacity =
Resources.negate(decreaseRequest.getDeltaCapacity());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
index 672af645577..c08af9d8430 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
@@ -21,8 +21,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -47,8 +50,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
+ .FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -57,12 +64,48 @@ import org.junit.Before;
import org.junit.Test;
public class TestContainerResizing {
+ private static final Log LOG = LogFactory.getLog(TestContainerResizing.class);
private final int GB = 1024;
private YarnConfiguration conf;
RMNodeLabelsManager mgr;
+ class MyScheduler extends CapacityScheduler {
+ /*
+ * A Mock Scheduler to simulate the potential effect of deadlock between:
+ * 1. The AbstractYarnScheduler.decreaseContainers() call (from
+ * ApplicationMasterService thread)
+ * 2. The CapacityScheduler.allocateContainersToNode() call (from the
+ * scheduler thread)
+ */
+ MyScheduler() {
+ super();
+ }
+
+ @Override
+ protected void decreaseContainers(
+ List decreaseRequests,
+ SchedulerApplicationAttempt attempt) {
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException e) {
+ LOG.debug("Thread interrupted.");
+ }
+ super.decreaseContainers(decreaseRequests, attempt);
+ }
+
+ @Override
+ public synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException e) {
+ LOG.debug("Thread interrupted.");
+ }
+ super.allocateContainersToNode(node);
+ }
+ }
+
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
@@ -958,6 +1001,50 @@ public class TestContainerResizing {
rm1.close();
}
+ @Test (timeout = 60000)
+ public void testDecreaseContainerWillNotDeadlockContainerAllocation()
+ throws Exception {
+ // create and start MockRM with our MyScheduler
+ MockRM rm = new MockRM() {
+ @Override
+ public ResourceScheduler createScheduler() {
+ CapacityScheduler cs = new MyScheduler();
+ cs.setConf(conf);
+ return cs;
+ }
+ };
+ rm.start();
+ // register a node
+ MockNM nm = rm.registerNode("h1:1234", 20 * GB);
+ // submit an application -> app1
+ RMApp app1 = rm.submitApp(3 * GB, "app", "user", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+ // making sure resource is allocated
+ checkUsedResource(rm, "default", 3 * GB, null);
+ FiCaSchedulerApp app = getFiCaSchedulerApp(rm, app1.getApplicationId());
+ Assert.assertEquals(3 * GB,
+ app.getAppAttemptResourceUsage().getUsed().getMemory());
+ // making sure container is launched
+ ContainerId containerId1 =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+ sentRMContainerLaunched(rm, containerId1);
+ // submit allocation request for a new container
+ am1.allocate(Collections.singletonList(ResourceRequest.newInstance(
+ Priority.newInstance(1), "*", Resources.createResource(2 * GB), 1)),
+ null);
+ // nm reports status update and triggers container allocation
+ nm.nodeHeartbeat(true);
+ // *In the mean time*, am1 asks to decrease its AM container resource from
+ // 3GB to 1GB
+ AllocateResponse response = am1.sendContainerResizingRequest(null,
+ Collections.singletonList(ContainerResourceChangeRequest
+ .newInstance(containerId1, Resources.createResource(GB))));
+ // verify that the containe resource is decreased
+ verifyContainerDecreased(response, containerId1, GB);
+
+ rm.close();
+ }
+
private void checkPendingResource(MockRM rm, String queueName, int memory,
String label) {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();