YARN-6216. Unify Container Resizing code paths with Container Updates making it scheduler agnostic. (Arun Suresh via wangda)

(cherry picked from commit eac6b4c35c)
This commit is contained in:
Wangda Tan 2017-02-28 10:35:50 -08:00 committed by Arun Suresh
parent 408d23477f
commit 5756256280
29 changed files with 499 additions and 1750 deletions

View File

@ -970,13 +970,5 @@ public class ResourceSchedulerWrapper
return Priority.newInstance(0); return Priority.newInstance(0);
} }
@Override
protected void decreaseContainer(
SchedContainerChangeRequest decreaseRequest,
SchedulerApplicationAttempt attempt) {
// TODO Auto-generated method stub
}
} }

View File

@ -116,7 +116,17 @@ public final class SchedulerRequestKey implements
if (priorityCompare != 0) { if (priorityCompare != 0) {
return priorityCompare; return priorityCompare;
} }
return Long.compare(allocationRequestId, o.getAllocationRequestId()); int allocReqCompare = Long.compare(
allocationRequestId, o.getAllocationRequestId());
if (allocReqCompare != 0) {
return allocReqCompare;
}
if (this.containerToUpdate != null && o.containerToUpdate != null) {
return (this.containerToUpdate.compareTo(o.containerToUpdate));
}
return 0;
} }
@Override @Override

View File

@ -156,26 +156,16 @@ public class RMServerUtils {
if (msg == null) { if (msg == null) {
if ((updateType != ContainerUpdateType.PROMOTE_EXECUTION_TYPE) && if ((updateType != ContainerUpdateType.PROMOTE_EXECUTION_TYPE) &&
(updateType !=ContainerUpdateType.DEMOTE_EXECUTION_TYPE)) { (updateType !=ContainerUpdateType.DEMOTE_EXECUTION_TYPE)) {
Resource original = rmContainer.getContainer().getResource(); if (validateIncreaseDecreaseRequest(
Resource target = updateReq.getCapability(); rmContext, updateReq, maximumAllocation)) {
if (Resources.fitsIn(target, original)) { if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
// This is a decrease request
if (validateIncreaseDecreaseRequest(rmContext, updateReq,
maximumAllocation, false)) {
updateRequests.getDecreaseRequests().add(updateReq);
outstandingUpdate.add(updateReq.getContainerId());
} else {
msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
}
} else {
// This is an increase request
if (validateIncreaseDecreaseRequest(rmContext, updateReq,
maximumAllocation, true)) {
updateRequests.getIncreaseRequests().add(updateReq); updateRequests.getIncreaseRequests().add(updateReq);
outstandingUpdate.add(updateReq.getContainerId());
} else { } else {
msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; updateRequests.getDecreaseRequests().add(updateReq);
} }
outstandingUpdate.add(updateReq.getContainerId());
} else {
msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
} }
} else { } else {
ExecutionType original = rmContainer.getExecutionType(); ExecutionType original = rmContainer.getExecutionType();
@ -329,8 +319,7 @@ public class RMServerUtils {
// Sanity check and normalize target resource // Sanity check and normalize target resource
private static boolean validateIncreaseDecreaseRequest(RMContext rmContext, private static boolean validateIncreaseDecreaseRequest(RMContext rmContext,
UpdateContainerRequest request, Resource maximumAllocation, UpdateContainerRequest request, Resource maximumAllocation) {
boolean increase) {
if (request.getCapability().getMemorySize() < 0 if (request.getCapability().getMemorySize() < 0
|| request.getCapability().getMemorySize() > maximumAllocation || request.getCapability().getMemorySize() > maximumAllocation
.getMemorySize()) { .getMemorySize()) {

View File

@ -281,7 +281,8 @@ public class QueuePriorityContainerCandidateSelector
private boolean preChecksForMovingReservedContainerToNode( private boolean preChecksForMovingReservedContainerToNode(
RMContainer reservedContainer, FiCaSchedulerNode newNode) { RMContainer reservedContainer, FiCaSchedulerNode newNode) {
// Don't do this if it has hard-locality preferences // Don't do this if it has hard-locality preferences
if (reservedContainer.hasIncreaseReservation()) { if (reservedContainer.getReservedSchedulerKey().getContainerToUpdate()
!= null) {
// This means a container update request (like increase / promote) // This means a container update request (like increase / promote)
return false; return false;
} }

View File

@ -91,10 +91,6 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
String getNodeHttpAddress(); String getNodeHttpAddress();
String getNodeLabelExpression(); String getNodeLabelExpression();
boolean hasIncreaseReservation();
void cancelIncreaseReservation();
String getQueueName(); String getQueueName();

View File

@ -1,44 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
public class RMContainerChangeResourceEvent extends RMContainerEvent {
final Resource targetResource;
final boolean increase;
public RMContainerChangeResourceEvent(ContainerId containerId,
Resource targetResource, boolean increase) {
super(containerId, RMContainerEventType.CHANGE_RESOURCE);
this.targetResource = targetResource;
this.increase = increase;
}
public Resource getTargetResource() {
return targetResource;
}
public boolean isIncrease() {
return increase;
}
}

View File

@ -130,8 +130,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
RMContainerEventType.ACQUIRED) RMContainerEventType.ACQUIRED)
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
RMContainerEventType.RESERVED, new ContainerReservedTransition()) RMContainerEventType.RESERVED, new ContainerReservedTransition())
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition())
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
RMContainerEventType.ACQUIRE_UPDATED_CONTAINER, RMContainerEventType.ACQUIRE_UPDATED_CONTAINER,
new ContainerAcquiredWhileRunningTransition()) new ContainerAcquiredWhileRunningTransition())
@ -183,7 +181,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
private boolean isAMContainer; private boolean isAMContainer;
private List<ResourceRequest> resourceRequests; private List<ResourceRequest> resourceRequests;
private volatile boolean hasIncreaseReservation = false;
// Only used for container resource increase and decrease. This is the // Only used for container resource increase and decrease. This is the
// resource to rollback to should container resource increase token expires. // resource to rollback to should container resource increase token expires.
private Resource lastConfirmedResource; private Resource lastConfirmedResource;
@ -561,12 +558,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
if (c != null) { if (c != null) {
c.setNodeId(container.reservedNode); c.setNodeId(container.reservedNode);
} }
if (!EnumSet.of(RMContainerState.NEW, RMContainerState.RESERVED)
.contains(container.getState())) {
// When container's state != NEW/RESERVED, it is an increase reservation
container.hasIncreaseReservation = true;
}
} }
} }
@ -681,33 +672,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
} }
} }
} }
private static final class ChangeResourceTransition extends BaseTransition {
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
RMContainerChangeResourceEvent changeEvent = (RMContainerChangeResourceEvent)event;
Resource targetResource = changeEvent.getTargetResource();
Resource lastConfirmedResource = container.lastConfirmedResource;
if (!changeEvent.isIncrease()) {
// Only unregister from the containerAllocationExpirer when target
// resource is less than or equal to the last confirmed resource.
if (Resources.fitsIn(targetResource, lastConfirmedResource)) {
container.lastConfirmedResource = targetResource;
container.containerAllocationExpirer.unregister(
new AllocationExpirationInfo(event.getContainerId()));
}
}
container.container.setResource(targetResource);
// We reach here means we either allocated increase reservation OR
// decreased container, reservation will be cancelled anyway.
container.hasIncreaseReservation = false;
}
}
private static class FinishedTransition extends BaseTransition { private static class FinishedTransition extends BaseTransition {
@ -858,16 +822,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
return -1; return -1;
} }
@Override
public boolean hasIncreaseReservation() {
return hasIncreaseReservation;
}
@Override
public void cancelIncreaseReservation() {
hasIncreaseReservation = false;
}
public void setQueueName(String queueName) { public void setQueueName(String queueName) {
this.queueName = queueName; this.queueName = queueName;
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
@ -85,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -597,6 +600,8 @@ public abstract class AbstractYarnScheduler
if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
completedContainerInternal(rmContainer, containerStatus, event); completedContainerInternal(rmContainer, containerStatus, event);
completeOustandingUpdatesWhichAreReserved(
rmContainer, containerStatus, event);
} else { } else {
ContainerId containerId = rmContainer.getContainerId(); ContainerId containerId = rmContainer.getContainerId();
// Inform the container // Inform the container
@ -622,6 +627,33 @@ public abstract class AbstractYarnScheduler
recoverResourceRequestForContainer(rmContainer); recoverResourceRequestForContainer(rmContainer);
} }
// Optimization:
// Check if there are in-flight container updates and complete the
// associated temp containers. These are removed when the app completes,
// but removing them when the actual container completes would allow the
// scheduler to reallocate those resources sooner.
private void completeOustandingUpdatesWhichAreReserved(
RMContainer rmContainer, ContainerStatus containerStatus,
RMContainerEventType event) {
N schedulerNode = getSchedulerNode(rmContainer.getNodeId());
if (schedulerNode != null &&
schedulerNode.getReservedContainer() != null) {
RMContainer resContainer = schedulerNode.getReservedContainer();
if (resContainer.getReservedSchedulerKey() != null) {
ContainerId containerToUpdate = resContainer
.getReservedSchedulerKey().getContainerToUpdate();
if (containerToUpdate != null &&
containerToUpdate.equals(containerStatus.getContainerId())) {
completedContainerInternal(resContainer,
ContainerStatus.newInstance(resContainer.getContainerId(),
containerStatus.getState(), containerStatus
.getDiagnostics(),
containerStatus.getExitStatus()), event);
}
}
}
}
// clean up a completed container // clean up a completed container
protected abstract void completedContainerInternal(RMContainer rmContainer, protected abstract void completedContainerInternal(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event); ContainerStatus containerStatus, RMContainerEventType event);
@ -650,28 +682,6 @@ public abstract class AbstractYarnScheduler
} }
} }
protected void decreaseContainers(
List<UpdateContainerRequest> decreaseRequests,
SchedulerApplicationAttempt attempt) {
if (null == decreaseRequests || decreaseRequests.isEmpty()) {
return;
}
// Pre-process decrease requests
List<SchedContainerChangeRequest> schedDecreaseRequests =
createSchedContainerChangeRequests(decreaseRequests, false);
for (SchedContainerChangeRequest request : schedDecreaseRequests) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing decrease request:" + request);
}
// handle decrease request
decreaseContainer(request, attempt);
}
}
protected abstract void decreaseContainer(
SchedContainerChangeRequest decreaseRequest,
SchedulerApplicationAttempt attempt);
@Override @Override
public N getSchedulerNode(NodeId nodeId) { public N getSchedulerNode(NodeId nodeId) {
return nodeTracker.getNode(nodeId); return nodeTracker.getNode(nodeId);
@ -1074,21 +1084,39 @@ public abstract class AbstractYarnScheduler
} }
} }
protected void handleExecutionTypeUpdates( protected void handleContainerUpdates(
SchedulerApplicationAttempt appAttempt, SchedulerApplicationAttempt appAttempt, ContainerUpdates updates) {
List<UpdateContainerRequest> promotionRequests, List<UpdateContainerRequest> promotionRequests =
List<UpdateContainerRequest> demotionRequests) { updates.getPromotionRequests();
if (promotionRequests != null && !promotionRequests.isEmpty()) { if (promotionRequests != null && !promotionRequests.isEmpty()) {
LOG.info("Promotion Update requests : " + promotionRequests); LOG.info("Promotion Update requests : " + promotionRequests);
handlePromotionRequests(appAttempt, promotionRequests); // Promotion is technically an increase request from
// 0 resources to target resources.
handleIncreaseRequests(appAttempt, promotionRequests);
} }
List<UpdateContainerRequest> increaseRequests =
updates.getIncreaseRequests();
if (increaseRequests != null && !increaseRequests.isEmpty()) {
LOG.info("Resource increase requests : " + increaseRequests);
handleIncreaseRequests(appAttempt, increaseRequests);
}
List<UpdateContainerRequest> demotionRequests =
updates.getDemotionRequests();
if (demotionRequests != null && !demotionRequests.isEmpty()) { if (demotionRequests != null && !demotionRequests.isEmpty()) {
LOG.info("Demotion Update requests : " + demotionRequests); LOG.info("Demotion Update requests : " + demotionRequests);
handleDemotionRequests(appAttempt, demotionRequests); // Demotion is technically a decrease request from initial
// to 0 resources
handleDecreaseRequests(appAttempt, demotionRequests);
}
List<UpdateContainerRequest> decreaseRequests =
updates.getDecreaseRequests();
if (decreaseRequests != null && !decreaseRequests.isEmpty()) {
LOG.info("Resource decrease requests : " + decreaseRequests);
handleDecreaseRequests(appAttempt, decreaseRequests);
} }
} }
private void handlePromotionRequests( private void handleIncreaseRequests(
SchedulerApplicationAttempt applicationAttempt, SchedulerApplicationAttempt applicationAttempt,
List<UpdateContainerRequest> updateContainerRequests) { List<UpdateContainerRequest> updateContainerRequests) {
for (UpdateContainerRequest uReq : updateContainerRequests) { for (UpdateContainerRequest uReq : updateContainerRequests) {
@ -1118,7 +1146,7 @@ public abstract class AbstractYarnScheduler
} }
} }
private void handleDemotionRequests(SchedulerApplicationAttempt appAttempt, private void handleDecreaseRequests(SchedulerApplicationAttempt appAttempt,
List<UpdateContainerRequest> demotionRequests) { List<UpdateContainerRequest> demotionRequests) {
OpportunisticContainerContext oppCntxt = OpportunisticContainerContext oppCntxt =
appAttempt.getOpportunisticContainerContext(); appAttempt.getOpportunisticContainerContext();
@ -1126,24 +1154,59 @@ public abstract class AbstractYarnScheduler
RMContainer rmContainer = RMContainer rmContainer =
rmContext.getScheduler().getRMContainer(uReq.getContainerId()); rmContext.getScheduler().getRMContainer(uReq.getContainerId());
if (rmContainer != null) { if (rmContainer != null) {
if (appAttempt.getUpdateContext().checkAndAddToOutstandingDecreases( SchedulerNode schedulerNode = rmContext.getScheduler()
rmContainer.getContainer())) { .getSchedulerNode(rmContainer.getContainer().getNodeId());
RMContainer demotedRMContainer = if (appAttempt.getUpdateContext()
createDemotedRMContainer(appAttempt, oppCntxt, rmContainer); .checkAndAddToOutstandingDecreases(uReq, schedulerNode,
appAttempt.addToNewlyDemotedContainers( rmContainer.getContainer())) {
uReq.getContainerId(), demotedRMContainer); if (ContainerUpdateType.DEMOTE_EXECUTION_TYPE ==
uReq.getContainerUpdateType()) {
RMContainer demotedRMContainer =
createDemotedRMContainer(appAttempt, oppCntxt, rmContainer);
appAttempt.addToNewlyDemotedContainers(
uReq.getContainerId(), demotedRMContainer);
} else {
RMContainer demotedRMContainer = createDecreasedRMContainer(
appAttempt, uReq, rmContainer);
appAttempt.addToNewlyDecreasedContainers(
uReq.getContainerId(), demotedRMContainer);
}
} else { } else {
appAttempt.addToUpdateContainerErrors( appAttempt.addToUpdateContainerErrors(
UpdateContainerError.newInstance( UpdateContainerError.newInstance(
RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq)); RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq));
} }
} else { } else {
LOG.warn("Cannot demote non-existent (or completed) Container [" LOG.warn("Cannot demote/decrease non-existent (or completed) " +
+ uReq.getContainerId() + "]"); "Container [" + uReq.getContainerId() + "]");
} }
} }
} }
private RMContainer createDecreasedRMContainer(
SchedulerApplicationAttempt appAttempt, UpdateContainerRequest uReq,
RMContainer rmContainer) {
SchedulerRequestKey sk =
SchedulerRequestKey.extractFrom(rmContainer.getContainer());
Container decreasedContainer = BuilderUtils.newContainer(
ContainerId.newContainerId(appAttempt.getApplicationAttemptId(),
appAttempt.getNewContainerId()),
rmContainer.getContainer().getNodeId(),
rmContainer.getContainer().getNodeHttpAddress(),
Resources.none(),
sk.getPriority(), null, rmContainer.getExecutionType(),
sk.getAllocationRequestId());
decreasedContainer.setVersion(rmContainer.getContainer().getVersion());
RMContainer newRmContainer = new RMContainerImpl(decreasedContainer,
sk, appAttempt.getApplicationAttemptId(),
decreasedContainer.getNodeId(), appAttempt.getUser(), rmContext,
rmContainer.isRemotelyAllocated());
appAttempt.addRMContainer(decreasedContainer.getId(), rmContainer);
((AbstractYarnScheduler) rmContext.getScheduler()).getNode(
decreasedContainer.getNodeId()).allocateContainer(newRmContainer);
return newRmContainer;
}
private RMContainer createDemotedRMContainer( private RMContainer createDemotedRMContainer(
SchedulerApplicationAttempt appAttempt, SchedulerApplicationAttempt appAttempt,
OpportunisticContainerContext oppCntxt, OpportunisticContainerContext oppCntxt,
@ -1163,8 +1226,41 @@ public abstract class AbstractYarnScheduler
rmContext, demotedContainer, false); rmContext, demotedContainer, false);
} }
@Override @Override
public List<NodeId> getNodeIds(String resourceName) { public List<NodeId> getNodeIds(String resourceName) {
return nodeTracker.getNodeIdsByResourceName(resourceName); return nodeTracker.getNodeIdsByResourceName(resourceName);
} }
/**
* Rollback container update after expiry.
* @param containerId ContainerId.
*/
protected void rollbackContainerUpdate(
ContainerId containerId) {
RMContainer rmContainer = getRMContainer(containerId);
if (rmContainer == null) {
LOG.info("Cannot rollback resource for container " + containerId
+ ". The container does not exist.");
return;
}
T app = getCurrentAttemptForContainer(containerId);
if (getCurrentAttemptForContainer(containerId) == null) {
LOG.info("Cannot rollback resource for container " + containerId
+ ". The application that the container "
+ "belongs to does not exist.");
return;
}
if (Resources.fitsIn(rmContainer.getLastConfirmedResource(),
rmContainer.getContainer().getResource())) {
LOG.info("Roll back resource for container " + containerId);
handleDecreaseRequests(app, Arrays.asList(
UpdateContainerRequest.newInstance(
rmContainer.getContainer().getVersion(),
rmContainer.getContainerId(),
ContainerUpdateType.DECREASE_RESOURCE,
rmContainer.getLastConfirmedResource(), null)));
}
}
} }

View File

@ -90,9 +90,7 @@ public class AppSchedulingInfo {
schedulerKeys = new ConcurrentSkipListMap<>(); schedulerKeys = new ConcurrentSkipListMap<>();
final Map<SchedulerRequestKey, SchedulingPlacementSet<SchedulerNode>> final Map<SchedulerRequestKey, SchedulingPlacementSet<SchedulerNode>>
schedulerKeyToPlacementSets = new ConcurrentHashMap<>(); schedulerKeyToPlacementSets = new ConcurrentHashMap<>();
final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId,
SchedContainerChangeRequest>>> containerIncreaseRequestMap =
new ConcurrentHashMap<>();
private final ReentrantReadWriteLock.ReadLock readLock; private final ReentrantReadWriteLock.ReadLock readLock;
private final ReentrantReadWriteLock.WriteLock writeLock; private final ReentrantReadWriteLock.WriteLock writeLock;
@ -158,137 +156,6 @@ public class AppSchedulingInfo {
LOG.info("Application " + applicationId + " requests cleared"); LOG.info("Application " + applicationId + " requests cleared");
} }
public boolean hasIncreaseRequest(NodeId nodeId) {
try {
this.readLock.lock();
Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
requestsOnNode = containerIncreaseRequestMap.get(nodeId);
return requestsOnNode == null ? false : requestsOnNode.size() > 0;
} finally {
this.readLock.unlock();
}
}
public Map<ContainerId, SchedContainerChangeRequest>
getIncreaseRequests(NodeId nodeId, SchedulerRequestKey schedulerKey) {
try {
this.readLock.lock();
Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
requestsOnNode = containerIncreaseRequestMap.get(nodeId);
return requestsOnNode == null ? null : requestsOnNode.get(
schedulerKey);
} finally {
this.readLock.unlock();
}
}
/**
* return true if any of the existing increase requests are updated,
* false if none of them are updated
*/
public boolean updateIncreaseRequests(
List<SchedContainerChangeRequest> increaseRequests) {
boolean resourceUpdated = false;
try {
this.writeLock.lock();
for (SchedContainerChangeRequest r : increaseRequests) {
if (r.getRMContainer().getState() != RMContainerState.RUNNING) {
LOG.warn("rmContainer's state is not RUNNING, for increase request"
+ " with container-id=" + r.getContainerId());
continue;
}
try {
RMServerUtils.checkSchedContainerChangeRequest(r, true);
} catch (YarnException e) {
LOG.warn("Error happens when checking increase request, Ignoring.."
+ " exception=", e);
continue;
}
NodeId nodeId = r.getRMContainer().getAllocatedNode();
Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
requestsOnNode = containerIncreaseRequestMap.get(nodeId);
if (null == requestsOnNode) {
requestsOnNode = new TreeMap<>();
containerIncreaseRequestMap.put(nodeId, requestsOnNode);
}
SchedContainerChangeRequest prevChangeRequest =
getIncreaseRequest(nodeId,
r.getRMContainer().getAllocatedSchedulerKey(),
r.getContainerId());
if (null != prevChangeRequest) {
if (Resources.equals(prevChangeRequest.getTargetCapacity(),
r.getTargetCapacity())) {
// increase request hasn't changed
continue;
}
// remove the old one, as we will use the new one going forward
removeIncreaseRequest(nodeId,
prevChangeRequest.getRMContainer().getAllocatedSchedulerKey(),
prevChangeRequest.getContainerId());
}
if (Resources.equals(r.getTargetCapacity(),
r.getRMContainer().getAllocatedResource())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to increase container " + r.getContainerId()
+ ", target capacity = previous capacity = " + prevChangeRequest
+ ". Will ignore this increase request.");
}
continue;
}
// add the new one
resourceUpdated = true;
insertIncreaseRequest(r);
}
return resourceUpdated;
} finally {
this.writeLock.unlock();
}
}
/**
* Insert increase request, adding any missing items in the data-structure
* hierarchy.
*/
private void insertIncreaseRequest(SchedContainerChangeRequest request) {
NodeId nodeId = request.getNodeId();
SchedulerRequestKey schedulerKey =
request.getRMContainer().getAllocatedSchedulerKey();
ContainerId containerId = request.getContainerId();
Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
requestsOnNode = containerIncreaseRequestMap.get(nodeId);
if (null == requestsOnNode) {
requestsOnNode = new HashMap<>();
containerIncreaseRequestMap.put(nodeId, requestsOnNode);
}
Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
requestsOnNode.get(schedulerKey);
if (null == requestsOnNodeWithPriority) {
requestsOnNodeWithPriority = new TreeMap<>();
requestsOnNode.put(schedulerKey, requestsOnNodeWithPriority);
incrementSchedulerKeyReference(schedulerKey);
}
requestsOnNodeWithPriority.put(containerId, request);
// update resources
String partition = request.getRMContainer().getNodeLabelExpression();
Resource delta = request.getDeltaCapacity();
appResourceUsage.incPending(partition, delta);
queue.incPendingResource(partition, delta);
if (LOG.isDebugEnabled()) {
LOG.debug("Added increase request:" + request.getContainerId()
+ " delta=" + delta);
}
}
private void incrementSchedulerKeyReference( private void incrementSchedulerKeyReference(
SchedulerRequestKey schedulerKey) { SchedulerRequestKey schedulerKey) {
@ -312,73 +179,6 @@ public class AppSchedulingInfo {
} }
} }
public boolean removeIncreaseRequest(NodeId nodeId,
SchedulerRequestKey schedulerKey, ContainerId containerId) {
try {
this.writeLock.lock();
Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
requestsOnNode = containerIncreaseRequestMap.get(nodeId);
if (null == requestsOnNode) {
return false;
}
Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
requestsOnNode.get(schedulerKey);
if (null == requestsOnNodeWithPriority) {
return false;
}
SchedContainerChangeRequest request =
requestsOnNodeWithPriority.remove(containerId);
// remove hierarchies if it becomes empty
if (requestsOnNodeWithPriority.isEmpty()) {
requestsOnNode.remove(schedulerKey);
decrementSchedulerKeyReference(schedulerKey);
}
if (requestsOnNode.isEmpty()) {
containerIncreaseRequestMap.remove(nodeId);
}
if (request == null) {
return false;
}
// update queue's pending resource if request exists
String partition = request.getRMContainer().getNodeLabelExpression();
Resource delta = request.getDeltaCapacity();
appResourceUsage.decPending(partition, delta);
queue.decPendingResource(partition, delta);
if (LOG.isDebugEnabled()) {
LOG.debug("remove increase request:" + request);
}
return true;
} finally {
this.writeLock.unlock();
}
}
public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
SchedulerRequestKey schedulerKey, ContainerId containerId) {
try {
this.readLock.lock();
Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
requestsOnNode = containerIncreaseRequestMap.get(nodeId);
if (null == requestsOnNode) {
return null;
}
Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
requestsOnNode.get(schedulerKey);
return requestsOnNodeWithPriority == null ? null
: requestsOnNodeWithPriority.get(containerId);
} finally {
this.readLock.unlock();
}
}
public ContainerUpdateContext getUpdateContext() { public ContainerUpdateContext getUpdateContext() {
return updateContext; return updateContext;
} }
@ -515,21 +315,6 @@ public class AppSchedulingInfo {
appResourceUsage.decPending(partition, toDecrease); appResourceUsage.decPending(partition, toDecrease);
} }
private boolean hasRequestLabelChanged(ResourceRequest requestOne,
ResourceRequest requestTwo) {
String requestOneLabelExp = requestOne.getNodeLabelExpression();
String requestTwoLabelExp = requestTwo.getNodeLabelExpression();
// First request label expression can be null and second request
// is not null then we have to consider it as changed.
if ((null == requestOneLabelExp) && (null != requestTwoLabelExp)) {
return true;
}
// If the label is not matching between both request when
// requestOneLabelExp is not null.
return ((null != requestOneLabelExp) && !(requestOneLabelExp
.equals(requestTwoLabelExp)));
}
/** /**
* The ApplicationMaster is updating the placesBlacklistedByApp used for * The ApplicationMaster is updating the placesBlacklistedByApp used for
* containers other than AMs. * containers other than AMs.
@ -602,22 +387,6 @@ public class AppSchedulingInfo {
return ret; return ret;
} }
public SchedulingPlacementSet getFirstSchedulingPlacementSet() {
try {
readLock.lock();
for (SchedulerRequestKey key : schedulerKeys.keySet()) {
SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(key);
if (null != ps) {
return ps;
}
}
return null;
} finally {
readLock.unlock();
}
}
public PendingAsk getNextPendingAsk() { public PendingAsk getNextPendingAsk() {
try { try {
readLock.lock(); readLock.lock();
@ -687,56 +456,6 @@ public class AppSchedulingInfo {
} }
} }
public void increaseContainer(SchedContainerChangeRequest increaseRequest) {
NodeId nodeId = increaseRequest.getNodeId();
SchedulerRequestKey schedulerKey =
increaseRequest.getRMContainer().getAllocatedSchedulerKey();
ContainerId containerId = increaseRequest.getContainerId();
Resource deltaCapacity = increaseRequest.getDeltaCapacity();
if (LOG.isDebugEnabled()) {
LOG.debug("allocated increase request : applicationId=" + applicationId
+ " container=" + containerId + " host="
+ increaseRequest.getNodeId() + " user=" + user + " resource="
+ deltaCapacity);
}
try {
this.writeLock.lock();
// Set queue metrics
queue.getMetrics().allocateResources(user, deltaCapacity);
// remove the increase request from pending increase request map
removeIncreaseRequest(nodeId, schedulerKey, containerId);
// update usage
appResourceUsage.incUsed(increaseRequest.getNodePartition(),
deltaCapacity);
} finally {
this.writeLock.unlock();
}
}
public void decreaseContainer(SchedContainerChangeRequest decreaseRequest) {
// Delta is negative when it's a decrease request
Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
if (LOG.isDebugEnabled()) {
LOG.debug("Decrease container : applicationId=" + applicationId
+ " container=" + decreaseRequest.getContainerId() + " host="
+ decreaseRequest.getNodeId() + " user=" + user + " resource="
+ absDelta);
}
try {
this.writeLock.lock();
// Set queue metrics
queue.getMetrics().releaseResources(user, absDelta);
// update usage
appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta);
} finally {
this.writeLock.unlock();
}
}
public List<ResourceRequest> allocate(NodeType type, public List<ResourceRequest> allocate(NodeType type,
SchedulerNode node, SchedulerRequestKey schedulerKey, SchedulerNode node, SchedulerRequestKey schedulerKey,
Container containerAllocated) { Container containerAllocated) {

View File

@ -32,7 +32,12 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -55,43 +60,37 @@ public class ContainerUpdateContext {
private final Map<SchedulerRequestKey, Map<Resource, private final Map<SchedulerRequestKey, Map<Resource,
Map<NodeId, Set<ContainerId>>>> outstandingIncreases = new HashMap<>(); Map<NodeId, Set<ContainerId>>>> outstandingIncreases = new HashMap<>();
private final Set<ContainerId> outstandingDecreases = new HashSet<>(); private final Map<ContainerId, Resource> outstandingDecreases =
new HashMap<>();
private final AppSchedulingInfo appSchedulingInfo; private final AppSchedulingInfo appSchedulingInfo;
ContainerUpdateContext(AppSchedulingInfo appSchedulingInfo) { ContainerUpdateContext(AppSchedulingInfo appSchedulingInfo) {
this.appSchedulingInfo = appSchedulingInfo; this.appSchedulingInfo = appSchedulingInfo;
} }
private synchronized boolean isBeingIncreased(Container container) {
Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
outstandingIncreases.get(
new SchedulerRequestKey(container.getPriority(),
container.getAllocationRequestId(), container.getId()));
if (resourceMap != null) {
Map<NodeId, Set<ContainerId>> locationMap =
resourceMap.get(container.getResource());
if (locationMap != null) {
Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
if (containerIds != null && !containerIds.isEmpty()) {
return containerIds.contains(container.getId());
}
}
}
return false;
}
/** /**
* Add the container to outstanding decreases. * Add the container to outstanding decreases.
* @param updateReq UpdateContainerRequest.
* @param schedulerNode SchedulerNode.
* @param container Container. * @param container Container.
* @return true if updated to outstanding decreases was successful. * @return If it was possible to decrease the container.
*/ */
public synchronized boolean checkAndAddToOutstandingDecreases( public synchronized boolean checkAndAddToOutstandingDecreases(
UpdateContainerRequest updateReq, SchedulerNode schedulerNode,
Container container) { Container container) {
if (isBeingIncreased(container) if (outstandingDecreases.containsKey(container.getId())) {
|| outstandingDecreases.contains(container.getId())) {
return false; return false;
} }
outstandingDecreases.add(container.getId()); if (ContainerUpdateType.DECREASE_RESOURCE ==
updateReq.getContainerUpdateType()) {
SchedulerRequestKey updateKey = new SchedulerRequestKey
(container.getPriority(),
container.getAllocationRequestId(), container.getId());
cancelPreviousRequest(schedulerNode, updateKey);
outstandingDecreases.put(container.getId(), updateReq.getCapability());
} else {
outstandingDecreases.put(container.getId(), container.getResource());
}
return true; return true;
} }
@ -114,35 +113,63 @@ public class ContainerUpdateContext {
if (resourceMap == null) { if (resourceMap == null) {
resourceMap = new HashMap<>(); resourceMap = new HashMap<>();
outstandingIncreases.put(schedulerKey, resourceMap); outstandingIncreases.put(schedulerKey, resourceMap);
} else {
// Updating Resource for and existing increase container
if (ContainerUpdateType.INCREASE_RESOURCE ==
updateRequest.getContainerUpdateType()) {
cancelPreviousRequest(schedulerNode, schedulerKey);
} else {
return false;
}
} }
Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer);
Map<NodeId, Set<ContainerId>> locationMap = Map<NodeId, Set<ContainerId>> locationMap =
resourceMap.get(container.getResource()); resourceMap.get(resToIncrease);
if (locationMap == null) { if (locationMap == null) {
locationMap = new HashMap<>(); locationMap = new HashMap<>();
resourceMap.put(container.getResource(), locationMap); resourceMap.put(resToIncrease, locationMap);
} }
Set<ContainerId> containerIds = locationMap.get(container.getNodeId()); Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
if (containerIds == null) { if (containerIds == null) {
containerIds = new HashSet<>(); containerIds = new HashSet<>();
locationMap.put(container.getNodeId(), containerIds); locationMap.put(container.getNodeId(), containerIds);
} }
if (containerIds.contains(container.getId()) if (outstandingDecreases.containsKey(container.getId())) {
|| outstandingDecreases.contains(container.getId())) {
return false; return false;
} }
containerIds.add(container.getId());
Map<SchedulerRequestKey, Map<String, ResourceRequest>> updateResReqs = containerIds.add(container.getId());
new HashMap<>(); if (!Resources.isNone(resToIncrease)) {
Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer); Map<SchedulerRequestKey, Map<String, ResourceRequest>> updateResReqs =
Map<String, ResourceRequest> resMap = new HashMap<>();
createResourceRequests(rmContainer, schedulerNode, Map<String, ResourceRequest> resMap =
schedulerKey, resToIncrease); createResourceRequests(rmContainer, schedulerNode,
updateResReqs.put(schedulerKey, resMap); schedulerKey, resToIncrease);
appSchedulingInfo.addToPlacementSets(false, updateResReqs); updateResReqs.put(schedulerKey, resMap);
appSchedulingInfo.addToPlacementSets(false, updateResReqs);
}
return true; return true;
} }
private void cancelPreviousRequest(SchedulerNode schedulerNode,
SchedulerRequestKey schedulerKey) {
SchedulingPlacementSet<SchedulerNode> schedulingPlacementSet =
appSchedulingInfo.getSchedulingPlacementSet(schedulerKey);
if (schedulingPlacementSet != null) {
Map<String, ResourceRequest> resourceRequests = schedulingPlacementSet
.getResourceRequests();
ResourceRequest prevReq = resourceRequests.get(ResourceRequest.ANY);
// Decrement the pending using a dummy RR with
// resource = prev update req capability
if (prevReq != null) {
appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode,
schedulerKey, Container.newInstance(UNDEFINED,
schedulerNode.getNodeID(), "host:port",
prevReq.getCapability(), schedulerKey.getPriority(), null));
}
}
}
private Map<String, ResourceRequest> createResourceRequests( private Map<String, ResourceRequest> createResourceRequests(
RMContainer rmContainer, SchedulerNode schedulerNode, RMContainer rmContainer, SchedulerNode schedulerNode,
SchedulerRequestKey schedulerKey, Resource resToIncrease) { SchedulerRequestKey schedulerKey, Resource resToIncrease) {
@ -168,10 +195,16 @@ public class ContainerUpdateContext {
ContainerUpdateType.PROMOTE_EXECUTION_TYPE) { ContainerUpdateType.PROMOTE_EXECUTION_TYPE) {
return rmContainer.getContainer().getResource(); return rmContainer.getContainer().getResource();
} }
// TODO: Fix this for container increase.. if (updateReq.getContainerUpdateType() ==
// This has to equal the Resources in excess of fitsIn() ContainerUpdateType.INCREASE_RESOURCE) {
// for container increase and is equal to the container total // This has to equal the Resources in excess of fitsIn()
// resource for Promotion. // for container increase and is equal to the container total
// resource for Promotion.
Resource maxCap = Resources.componentwiseMax(updateReq.getCapability(),
rmContainer.getContainer().getResource());
return Resources.add(maxCap,
Resources.negate(rmContainer.getContainer().getResource()));
}
return null; return null;
} }
@ -262,4 +295,80 @@ public class ContainerUpdateContext {
} }
return retVal; return retVal;
} }
/**
* Swaps the existing RMContainer's and the temp RMContainers internal
* container references after adjusting the resources in each.
* @param tempRMContainer Temp RMContainer.
* @param existingRMContainer Existing RMContainer.
* @param updateType Update Type.
* @return Existing RMContainer after swapping the container references.
*/
public RMContainer swapContainer(RMContainer tempRMContainer,
RMContainer existingRMContainer, ContainerUpdateType updateType) {
ContainerId matchedContainerId = existingRMContainer.getContainerId();
// Swap updated container with the existing container
Container tempContainer = tempRMContainer.getContainer();
Resource updatedResource = createUpdatedResource(
tempContainer, existingRMContainer.getContainer(), updateType);
Resource resourceToRelease = createResourceToRelease(
existingRMContainer.getContainer(), updateType);
Container newContainer = Container.newInstance(matchedContainerId,
existingRMContainer.getContainer().getNodeId(),
existingRMContainer.getContainer().getNodeHttpAddress(),
updatedResource,
existingRMContainer.getContainer().getPriority(), null,
tempContainer.getExecutionType());
newContainer.setAllocationRequestId(
existingRMContainer.getContainer().getAllocationRequestId());
newContainer.setVersion(existingRMContainer.getContainer().getVersion());
tempRMContainer.getContainer().setResource(resourceToRelease);
tempRMContainer.getContainer().setExecutionType(
existingRMContainer.getContainer().getExecutionType());
((RMContainerImpl)existingRMContainer).setContainer(newContainer);
return existingRMContainer;
}
/**
* Returns the resource that the container will finally be assigned with
* at the end of the update operation.
* @param tempContainer Temporary Container created for the operation.
* @param existingContainer Existing Container.
* @param updateType Update Type.
* @return Final Resource.
*/
private Resource createUpdatedResource(Container tempContainer,
Container existingContainer, ContainerUpdateType updateType) {
if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
return Resources.add(existingContainer.getResource(),
tempContainer.getResource());
} else if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
return outstandingDecreases.get(existingContainer.getId());
} else {
return existingContainer.getResource();
}
}
/**
* Returns the resources that need to be released at the end of the update
* operation.
* @param existingContainer Existing Container.
* @param updateType Updated type.
* @return Resources to be released.
*/
private Resource createResourceToRelease(Container existingContainer,
ContainerUpdateType updateType) {
if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
return Resources.none();
} else if (ContainerUpdateType.DECREASE_RESOURCE == updateType){
return Resources.add(existingContainer.getResource(),
Resources.negate(
outstandingDecreases.get(existingContainer.getId())));
} else {
return existingContainer.getResource();
}
}
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
@ -65,7 +66,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppR
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerChangeResourceEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
@ -73,6 +73,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRese
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
.RMNodeDecreaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
@ -136,9 +139,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private AtomicLong firstContainerAllocatedTime = new AtomicLong(0); private AtomicLong firstContainerAllocatedTime = new AtomicLong(0);
protected List<RMContainer> newlyAllocatedContainers = new ArrayList<>(); protected List<RMContainer> newlyAllocatedContainers = new ArrayList<>();
protected List<RMContainer> tempContainerToKill = new ArrayList<>();
protected Map<ContainerId, RMContainer> newlyPromotedContainers = new HashMap<>(); protected Map<ContainerId, RMContainer> newlyPromotedContainers = new HashMap<>();
protected Map<ContainerId, RMContainer> newlyDemotedContainers = new HashMap<>(); protected Map<ContainerId, RMContainer> newlyDemotedContainers = new HashMap<>();
protected List<RMContainer> tempContainerToKill = new ArrayList<>();
protected Map<ContainerId, RMContainer> newlyDecreasedContainers = new HashMap<>(); protected Map<ContainerId, RMContainer> newlyDecreasedContainers = new HashMap<>();
protected Map<ContainerId, RMContainer> newlyIncreasedContainers = new HashMap<>(); protected Map<ContainerId, RMContainer> newlyIncreasedContainers = new HashMap<>();
protected Set<NMToken> updatedNMTokens = new HashSet<>(); protected Set<NMToken> updatedNMTokens = new HashSet<>();
@ -670,6 +673,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
rmContainer.handle(new RMContainerUpdatesAcquiredEvent( rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
rmContainer.getContainerId(), rmContainer.getContainerId(),
ContainerUpdateType.INCREASE_RESOURCE == updateType)); ContainerUpdateType.INCREASE_RESOURCE == updateType));
if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeDecreaseContainerEvent(rmContainer.getNodeId(),
Collections.singletonList(rmContainer.getContainer())));
}
} }
return container; return container;
} }
@ -717,11 +725,16 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
} }
} }
public void addToNewlyDemotedContainers(ContainerId containerId, public synchronized void addToNewlyDemotedContainers(ContainerId containerId,
RMContainer rmContainer) { RMContainer rmContainer) {
newlyDemotedContainers.put(containerId, rmContainer); newlyDemotedContainers.put(containerId, rmContainer);
} }
public synchronized void addToNewlyDecreasedContainers(
ContainerId containerId, RMContainer rmContainer) {
newlyDecreasedContainers.put(containerId, rmContainer);
}
protected synchronized void addToUpdateContainerErrors( protected synchronized void addToUpdateContainerErrors(
UpdateContainerError error) { UpdateContainerError error) {
updateContainerErrors.add(error); updateContainerErrors.add(error);
@ -729,10 +742,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
protected synchronized void addToNewlyAllocatedContainers( protected synchronized void addToNewlyAllocatedContainers(
SchedulerNode node, RMContainer rmContainer) { SchedulerNode node, RMContainer rmContainer) {
if (oppContainerContext == null) {
newlyAllocatedContainers.add(rmContainer);
return;
}
ContainerId matchedContainerId = ContainerId matchedContainerId =
getUpdateContext().matchContainerToOutstandingIncreaseReq( getUpdateContext().matchContainerToOutstandingIncreaseReq(
node, rmContainer.getAllocatedSchedulerKey(), rmContainer); node, rmContainer.getAllocatedSchedulerKey(), rmContainer);
@ -745,7 +754,21 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
// occurs when using MiniYARNCluster to test). // occurs when using MiniYARNCluster to test).
tempContainerToKill.add(rmContainer); tempContainerToKill.add(rmContainer);
} else { } else {
newlyPromotedContainers.put(matchedContainerId, rmContainer); RMContainer existingContainer = getRMContainer(matchedContainerId);
// If this container was already GUARANTEED, then it is an
// increase, else its a promotion
if (existingContainer == null ||
EnumSet.of(RMContainerState.COMPLETED, RMContainerState.KILLED,
RMContainerState.EXPIRED, RMContainerState.RELEASED).contains(
existingContainer.getState())) {
tempContainerToKill.add(rmContainer);
} else {
if (ExecutionType.GUARANTEED == existingContainer.getExecutionType()) {
newlyIncreasedContainers.put(matchedContainerId, rmContainer);
} else {
newlyPromotedContainers.put(matchedContainerId, rmContainer);
}
}
} }
} else { } else {
newlyAllocatedContainers.add(rmContainer); newlyAllocatedContainers.add(rmContainer);
@ -753,15 +776,25 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
} }
public List<Container> pullNewlyPromotedContainers() { public List<Container> pullNewlyPromotedContainers() {
return pullContainersWithUpdatedExecType(newlyPromotedContainers, return pullNewlyUpdatedContainers(newlyPromotedContainers,
ContainerUpdateType.PROMOTE_EXECUTION_TYPE); ContainerUpdateType.PROMOTE_EXECUTION_TYPE);
} }
public List<Container> pullNewlyDemotedContainers() { public List<Container> pullNewlyDemotedContainers() {
return pullContainersWithUpdatedExecType(newlyDemotedContainers, return pullNewlyUpdatedContainers(newlyDemotedContainers,
ContainerUpdateType.DEMOTE_EXECUTION_TYPE); ContainerUpdateType.DEMOTE_EXECUTION_TYPE);
} }
public List<Container> pullNewlyIncreasedContainers() {
return pullNewlyUpdatedContainers(newlyIncreasedContainers,
ContainerUpdateType.INCREASE_RESOURCE);
}
public List<Container> pullNewlyDecreasedContainers() {
return pullNewlyUpdatedContainers(newlyDecreasedContainers,
ContainerUpdateType.DECREASE_RESOURCE);
}
public List<UpdateContainerError> pullUpdateContainerErrors() { public List<UpdateContainerError> pullUpdateContainerErrors() {
List<UpdateContainerError> errors = List<UpdateContainerError> errors =
new ArrayList<>(updateContainerErrors); new ArrayList<>(updateContainerErrors);
@ -775,11 +808,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
* GUARANTEED to OPPORTUNISTIC. * GUARANTEED to OPPORTUNISTIC.
* @return Newly Promoted and Demoted containers * @return Newly Promoted and Demoted containers
*/ */
private List<Container> pullContainersWithUpdatedExecType( private List<Container> pullNewlyUpdatedContainers(
Map<ContainerId, RMContainer> newlyUpdatedContainers, Map<ContainerId, RMContainer> newlyUpdatedContainers,
ContainerUpdateType updateTpe) { ContainerUpdateType updateTpe) {
List<Container> updatedContainers = new ArrayList<>(); List<Container> updatedContainers = new ArrayList<>();
if (oppContainerContext == null) { if (oppContainerContext == null &&
(ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateTpe
|| ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateTpe)) {
return updatedContainers; return updatedContainers;
} }
try { try {
@ -789,19 +824,22 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
while (i.hasNext()) { while (i.hasNext()) {
Map.Entry<ContainerId, RMContainer> entry = i.next(); Map.Entry<ContainerId, RMContainer> entry = i.next();
ContainerId matchedContainerId = entry.getKey(); ContainerId matchedContainerId = entry.getKey();
RMContainer rmContainer = entry.getValue(); RMContainer tempRMContainer = entry.getValue();
// swap containers RMContainer existingRMContainer =
RMContainer existingRMContainer = swapContainer( getRMContainer(matchedContainerId);
rmContainer, matchedContainerId); if (existingRMContainer != null) {
getUpdateContext().removeFromOutstandingUpdate( // swap containers
rmContainer.getAllocatedSchedulerKey(), existingRMContainer = getUpdateContext().swapContainer(
existingRMContainer.getContainer()); tempRMContainer, existingRMContainer, updateTpe);
Container updatedContainer = updateContainerAndNMToken( getUpdateContext().removeFromOutstandingUpdate(
existingRMContainer, updateTpe); tempRMContainer.getAllocatedSchedulerKey(),
updatedContainers.add(updatedContainer); existingRMContainer.getContainer());
Container updatedContainer = updateContainerAndNMToken(
tempContainerToKill.add(rmContainer); existingRMContainer, updateTpe);
updatedContainers.add(updatedContainer);
}
tempContainerToKill.add(tempRMContainer);
i.remove(); i.remove();
} }
// Release all temporary containers // Release all temporary containers
@ -823,68 +861,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
} }
} }
private RMContainer swapContainer(RMContainer rmContainer, ContainerId
matchedContainerId) {
RMContainer existingRMContainer =
getRMContainer(matchedContainerId);
if (existingRMContainer != null) {
// Swap updated container with the existing container
Container updatedContainer = rmContainer.getContainer();
Container newContainer = Container.newInstance(matchedContainerId,
existingRMContainer.getContainer().getNodeId(),
existingRMContainer.getContainer().getNodeHttpAddress(),
updatedContainer.getResource(),
existingRMContainer.getContainer().getPriority(), null,
updatedContainer.getExecutionType());
newContainer.setAllocationRequestId(
existingRMContainer.getContainer().getAllocationRequestId());
newContainer.setVersion(existingRMContainer.getContainer().getVersion());
rmContainer.getContainer().setResource(
existingRMContainer.getContainer().getResource());
rmContainer.getContainer().setExecutionType(
existingRMContainer.getContainer().getExecutionType());
((RMContainerImpl)existingRMContainer).setContainer(newContainer);
}
return existingRMContainer;
}
private List<Container> pullNewlyUpdatedContainers(
Map<ContainerId, RMContainer> updatedContainerMap, boolean increase) {
try {
writeLock.lock();
List <Container> returnContainerList = new ArrayList <Container>(
updatedContainerMap.size());
Iterator<Entry<ContainerId, RMContainer>> i =
updatedContainerMap.entrySet().iterator();
while (i.hasNext()) {
RMContainer rmContainer = i.next().getValue();
Container updatedContainer = updateContainerAndNMToken(rmContainer,
increase ? ContainerUpdateType.INCREASE_RESOURCE :
ContainerUpdateType.DECREASE_RESOURCE);
if (updatedContainer != null) {
returnContainerList.add(updatedContainer);
i.remove();
}
}
return returnContainerList;
} finally {
writeLock.unlock();
}
}
public List<Container> pullNewlyIncreasedContainers() {
return pullNewlyUpdatedContainers(newlyIncreasedContainers, true);
}
public List<Container> pullNewlyDecreasedContainers() {
return pullNewlyUpdatedContainers(newlyDecreasedContainers, false);
}
public List<NMToken> pullUpdatedNMTokens() { public List<NMToken> pullUpdatedNMTokens() {
try { try {
writeLock.lock(); writeLock.lock();
@ -1255,68 +1231,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public ResourceUsage getSchedulingResourceUsage() { public ResourceUsage getSchedulingResourceUsage() {
return attemptResourceUsage; return attemptResourceUsage;
} }
public boolean removeIncreaseRequest(NodeId nodeId,
SchedulerRequestKey schedulerKey, ContainerId containerId) {
try {
writeLock.lock();
return appSchedulingInfo.removeIncreaseRequest(nodeId, schedulerKey,
containerId);
} finally {
writeLock.unlock();
}
}
public boolean updateIncreaseRequests(
List<SchedContainerChangeRequest> increaseRequests) {
try {
writeLock.lock();
return appSchedulingInfo.updateIncreaseRequests(increaseRequests);
} finally {
writeLock.unlock();
}
}
private void changeContainerResource(
SchedContainerChangeRequest changeRequest, boolean increase) {
try {
writeLock.lock();
if (increase) {
appSchedulingInfo.increaseContainer(changeRequest);
} else{
appSchedulingInfo.decreaseContainer(changeRequest);
}
RMContainer changedRMContainer = changeRequest.getRMContainer();
changedRMContainer.handle(
new RMContainerChangeResourceEvent(changeRequest.getContainerId(),
changeRequest.getTargetCapacity(), increase));
// remove pending and not pulled by AM newly-increased or
// decreased-containers and add the new one
if (increase) {
newlyDecreasedContainers.remove(changeRequest.getContainerId());
newlyIncreasedContainers.put(changeRequest.getContainerId(),
changedRMContainer);
} else{
newlyIncreasedContainers.remove(changeRequest.getContainerId());
newlyDecreasedContainers.put(changeRequest.getContainerId(),
changedRMContainer);
}
} finally {
writeLock.unlock();
}
}
public void decreaseContainer(
SchedContainerChangeRequest decreaseRequest) {
changeContainerResource(decreaseRequest, false);
}
public void increaseContainer(
SchedContainerChangeRequest increaseRequest) {
changeContainerResource(increaseRequest, true);
}
public void setAppAMNodePartitionName(String partitionName) { public void setAppAMNodePartitionName(String partitionName) {
this.appAMNodePartitionName = partitionName; this.appAMNodePartitionName = partitionName;

View File

@ -180,49 +180,6 @@ public abstract class SchedulerNode {
} }
} }
/**
* Change the resources allocated for a container.
* @param containerId Identifier of the container to change.
* @param deltaResource Change in the resource allocation.
* @param increase True if the change is an increase of allocation.
*/
protected synchronized void changeContainerResource(ContainerId containerId,
Resource deltaResource, boolean increase) {
if (increase) {
deductUnallocatedResource(deltaResource);
} else {
addUnallocatedResource(deltaResource);
}
if (LOG.isDebugEnabled()) {
LOG.debug((increase ? "Increased" : "Decreased") + " container "
+ containerId + " of capacity " + deltaResource + " on host "
+ rmNode.getNodeAddress() + ", which has " + numContainers
+ " containers, " + getAllocatedResource() + " used and "
+ getUnallocatedResource() + " available after allocation");
}
}
/**
* Increase the resources allocated to a container.
* @param containerId Identifier of the container to change.
* @param deltaResource Increase of resource allocation.
*/
public synchronized void increaseContainer(ContainerId containerId,
Resource deltaResource) {
changeContainerResource(containerId, deltaResource, true);
}
/**
* Decrease the resources allocated to a container.
* @param containerId Identifier of the container to change.
* @param deltaResource Decrease of resource allocation.
*/
public synchronized void decreaseContainer(ContainerId containerId,
Resource deltaResource) {
changeContainerResource(containerId, deltaResource, false);
}
/** /**
* Get unallocated resources on the node. * Get unallocated resources on the node.
* @return Unallocated resources on the node * @return Unallocated resources on the node
@ -281,7 +238,6 @@ public abstract class SchedulerNode {
if (info == null) { if (info == null) {
return; return;
} }
if (!releasedByNode && info.launchedOnNode) { if (!releasedByNode && info.launchedOnNode) {
// wait until node reports container has completed // wait until node reports container has completed
return; return;

View File

@ -443,14 +443,13 @@ public abstract class AbstractCSQueue implements CSQueue {
} }
void allocateResource(Resource clusterResource, void allocateResource(Resource clusterResource,
Resource resource, String nodePartition, boolean changeContainerResource) { Resource resource, String nodePartition) {
try { try {
writeLock.lock(); writeLock.lock();
queueUsage.incUsed(nodePartition, resource); queueUsage.incUsed(nodePartition, resource);
if (!changeContainerResource) { ++numContainers;
++numContainers;
}
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, nodePartition); this, labelManager, nodePartition);
} finally { } finally {
@ -459,7 +458,7 @@ public abstract class AbstractCSQueue implements CSQueue {
} }
protected void releaseResource(Resource clusterResource, protected void releaseResource(Resource clusterResource,
Resource resource, String nodePartition, boolean changeContainerResource) { Resource resource, String nodePartition) {
try { try {
writeLock.lock(); writeLock.lock();
queueUsage.decUsed(nodePartition, resource); queueUsage.decUsed(nodePartition, resource);
@ -467,9 +466,7 @@ public abstract class AbstractCSQueue implements CSQueue {
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, nodePartition); this, labelManager, nodePartition);
if (!changeContainerResource) { --numContainers;
--numContainers;
}
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }

View File

@ -216,14 +216,6 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
RMContainerEventType event, CSQueue childQueue, RMContainerEventType event, CSQueue childQueue,
boolean sortQueues); boolean sortQueues);
/**
* We have a reserved increased container in the queue, we need to unreserve
* it. Since we just want to cancel the reserved increase request instead of
* stop the container, we shouldn't call completedContainer for such purpose.
*/
public void unreserveIncreasedContainer(Resource clusterResource,
FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer);
/** /**
* Get the number of applications in the queue. * Get the number of applications in the queue.
* @return number of applications * @return number of applications
@ -319,13 +311,6 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
* new resource asked * new resource asked
*/ */
public void decPendingResource(String nodeLabel, Resource resourceToDec); public void decPendingResource(String nodeLabel, Resource resourceToDec);
/**
* Decrease container resource in the queue
*/
public void decreaseContainer(Resource clusterResource,
SchedContainerChangeRequest decreaseRequest,
FiCaSchedulerApp app) throws InvalidResourceRequestException;
/** /**
* Get valid Node Labels for this queue * Get valid Node Labels for this queue

View File

@ -21,6 +21,23 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -46,9 +63,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@ -71,12 +86,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
@ -85,11 +98,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidExcep
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
@ -129,24 +140,6 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@LimitedPrivate("yarn") @LimitedPrivate("yarn")
@Evolving @Evolving
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -874,43 +867,6 @@ public class CapacityScheduler extends
} }
} }
private LeafQueue updateIncreaseRequests(
List<UpdateContainerRequest> increaseRequests, FiCaSchedulerApp app) {
// When application has some pending to-be-removed resource requests,
app.removedToBeRemovedIncreaseRequests();
if (null == increaseRequests || increaseRequests.isEmpty()) {
return null;
}
// Pre-process increase requests
List<SchedContainerChangeRequest> schedIncreaseRequests =
createSchedContainerChangeRequests(increaseRequests, true);
LeafQueue leafQueue = (LeafQueue) app.getQueue();
try {
/*
* Acquire application's lock here to make sure application won't
* finish when updateIncreaseRequest is called.
*/
app.getWriteLock().lock();
// make sure we aren't stopping/removing the application
// when the allocate comes in
if (app.isStopped()) {
return null;
}
// Process increase resource requests
if (app.updateIncreaseRequests(schedIncreaseRequests)) {
return leafQueue;
}
} finally {
app.getWriteLock().unlock();
}
return null;
}
@Override @Override
@Lock(Lock.NoLock.class) @Lock(Lock.NoLock.class)
public Allocation allocate(ApplicationAttemptId applicationAttemptId, public Allocation allocate(ApplicationAttemptId applicationAttemptId,
@ -922,21 +878,13 @@ public class CapacityScheduler extends
return EMPTY_ALLOCATION; return EMPTY_ALLOCATION;
} }
// Handle promotions and demotions // Handle all container updates
handleExecutionTypeUpdates( handleContainerUpdates(application, updateRequests);
application, updateRequests.getPromotionRequests(),
updateRequests.getDemotionRequests());
// Release containers // Release containers
releaseContainers(release, application); releaseContainers(release, application);
// update increase requests LeafQueue updateDemandForQueue = null;
LeafQueue updateDemandForQueue =
updateIncreaseRequests(updateRequests.getIncreaseRequests(),
application);
// Decrease containers
decreaseContainers(updateRequests.getDecreaseRequests(), application);
// Sanity check for new allocation requests // Sanity check for new allocation requests
normalizeRequests(ask); normalizeRequests(ask);
@ -961,8 +909,7 @@ public class CapacityScheduler extends
} }
// Update application requests // Update application requests
if (application.updateResourceRequests(ask) && (updateDemandForQueue if (application.updateResourceRequests(ask)) {
== null)) {
updateDemandForQueue = (LeafQueue) application.getQueue(); updateDemandForQueue = (LeafQueue) application.getQueue();
} }
@ -1468,7 +1415,7 @@ public class CapacityScheduler extends
(ContainerExpiredSchedulerEvent) event; (ContainerExpiredSchedulerEvent) event;
ContainerId containerId = containerExpiredEvent.getContainerId(); ContainerId containerId = containerExpiredEvent.getContainerId();
if (containerExpiredEvent.isIncrease()) { if (containerExpiredEvent.isIncrease()) {
rollbackContainerResource(containerId); rollbackContainerUpdate(containerId);
} else { } else {
completedContainer(getRMContainer(containerId), completedContainer(getRMContainer(containerId),
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
@ -1620,31 +1567,6 @@ public class CapacityScheduler extends
} }
} }
private void rollbackContainerResource(
ContainerId containerId) {
RMContainer rmContainer = getRMContainer(containerId);
if (rmContainer == null) {
LOG.info("Cannot rollback resource for container " + containerId
+ ". The container does not exist.");
return;
}
FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
if (application == null) {
LOG.info("Cannot rollback resource for container " + containerId
+ ". The application that the container "
+ "belongs to does not exist.");
return;
}
LOG.info("Roll back resource for container " + containerId);
SchedulerNode schedulerNode = getSchedulerNode(
rmContainer.getAllocatedNode());
SchedContainerChangeRequest decreaseRequest =
new SchedContainerChangeRequest(this.rmContext, schedulerNode,
rmContainer, rmContainer.getLastConfirmedResource());
decreaseContainer(decreaseRequest, application);
}
@Override @Override
protected void completedContainerInternal( protected void completedContainerInternal(
RMContainer rmContainer, ContainerStatus containerStatus, RMContainer rmContainer, ContainerStatus containerStatus,
@ -1678,32 +1600,6 @@ public class CapacityScheduler extends
rmContainer, containerStatus, event, null, true); rmContainer, containerStatus, event, null, true);
} }
@Override
protected void decreaseContainer(SchedContainerChangeRequest decreaseRequest,
SchedulerApplicationAttempt attempt) {
RMContainer rmContainer = decreaseRequest.getRMContainer();
// Check container status before doing decrease
if (rmContainer.getState() != RMContainerState.RUNNING) {
LOG.info(
"Trying to decrease a container not in RUNNING state, container="
+ rmContainer + " state=" + rmContainer.getState().name());
return;
}
FiCaSchedulerApp app = (FiCaSchedulerApp) attempt;
LeafQueue queue = (LeafQueue) attempt.getQueue();
try {
queue.decreaseContainer(getClusterResource(), decreaseRequest, app);
// Notify RMNode that the container can be pulled by NodeManager in the
// next heartbeat
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeDecreaseContainerEvent(decreaseRequest.getNodeId(),
Collections.singletonList(rmContainer.getContainer())));
} catch (InvalidResourceRequestException e) {
LOG.warn("Error happens when checking decrease request, Ignoring.."
+ " exception=", e);
}
}
@Lock(Lock.NoLock.class) @Lock(Lock.NoLock.class)
@VisibleForTesting @VisibleForTesting
@Override @Override
@ -2407,8 +2303,8 @@ public class CapacityScheduler extends
getSchedulerContainer(rmContainer, true), getSchedulerContainer(rmContainer, true),
getSchedulerContainersToRelease(csAssignment), getSchedulerContainersToRelease(csAssignment),
getSchedulerContainer(csAssignment.getFulfilledReservedContainer(), getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
false), csAssignment.isIncreasedAllocation(), false), csAssignment.getType(),
csAssignment.getType(), csAssignment.getRequestLocalityType(), csAssignment.getRequestLocalityType(),
csAssignment.getSchedulingMode() != null ? csAssignment.getSchedulingMode() != null ?
csAssignment.getSchedulingMode() : csAssignment.getSchedulingMode() :
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
@ -2424,8 +2320,8 @@ public class CapacityScheduler extends
getSchedulerContainer(rmContainer, false), getSchedulerContainer(rmContainer, false),
getSchedulerContainersToRelease(csAssignment), getSchedulerContainersToRelease(csAssignment),
getSchedulerContainer(csAssignment.getFulfilledReservedContainer(), getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
false), csAssignment.isIncreasedAllocation(), false), csAssignment.getType(),
csAssignment.getType(), csAssignment.getRequestLocalityType(), csAssignment.getRequestLocalityType(),
csAssignment.getSchedulingMode() != null ? csAssignment.getSchedulingMode() != null ?
csAssignment.getSchedulingMode() : csAssignment.getSchedulingMode() :
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,

View File

@ -1184,12 +1184,7 @@ public class LeafQueue extends AbstractCSQueue {
if (targetLeafQueue == this) { if (targetLeafQueue == this) {
// When trying to preempt containers from the same queue // When trying to preempt containers from the same queue
if (rmContainer.hasIncreaseReservation()) { if (rmContainer.getState() == RMContainerState.RESERVED) {
// Increased container reservation
unreserveIncreasedContainer(clusterResource,
schedulerContainer.getSchedulerApplicationAttempt(),
schedulerContainer.getSchedulerNode(), rmContainer);
} else if (rmContainer.getState() == RMContainerState.RESERVED) {
// For other reserved containers // For other reserved containers
// This is a reservation exchange, complete previous reserved container // This is a reservation exchange, complete previous reserved container
completedContainer(clusterResource, completedContainer(clusterResource,
@ -1262,8 +1257,7 @@ public class LeafQueue extends AbstractCSQueue {
schedulerContainer.getSchedulerApplicationAttempt(), schedulerContainer.getSchedulerApplicationAttempt(),
allocation.getAllocatedOrReservedResource(), allocation.getAllocatedOrReservedResource(),
schedulerContainer.getNodePartition(), schedulerContainer.getNodePartition(),
schedulerContainer.getRmContainer(), schedulerContainer.getRmContainer());
allocation.isIncreasedAllocation());
orderingPolicy.containerAllocated( orderingPolicy.containerAllocated(
schedulerContainer.getSchedulerApplicationAttempt(), schedulerContainer.getSchedulerApplicationAttempt(),
schedulerContainer.getRmContainer()); schedulerContainer.getRmContainer());
@ -1584,40 +1578,6 @@ public class LeafQueue extends AbstractCSQueue {
readLock.unlock(); readLock.unlock();
} }
} }
@Override
public void unreserveIncreasedContainer(Resource clusterResource,
FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) {
boolean removed = false;
Priority priority = null;
try {
writeLock.lock();
if (rmContainer.getContainer() != null) {
priority = rmContainer.getContainer().getPriority();
}
if (null != priority) {
removed = app.unreserve(rmContainer.getAllocatedSchedulerKey(), node,
rmContainer);
}
if (removed) {
// Inform the ordering policy
orderingPolicy.containerReleased(app, rmContainer);
releaseResource(clusterResource, app, rmContainer.getReservedResource(),
node.getPartition(), rmContainer, true);
}
} finally {
writeLock.unlock();
}
if (removed) {
getParent().unreserveIncreasedContainer(clusterResource, app, node,
rmContainer);
}
}
private void updateSchedulerHealthForCompletedContainer( private void updateSchedulerHealthForCompletedContainer(
RMContainer rmContainer, ContainerStatus containerStatus) { RMContainer rmContainer, ContainerStatus containerStatus) {
@ -1694,16 +1654,6 @@ public class LeafQueue extends AbstractCSQueue {
updateSchedulerHealthForCompletedContainer(rmContainer, containerStatus); updateSchedulerHealthForCompletedContainer(rmContainer, containerStatus);
if (application != null) { if (application != null) {
// unreserve container increase request if it previously reserved.
if (rmContainer.hasIncreaseReservation()) {
unreserveIncreasedContainer(clusterResource, application, node,
rmContainer);
}
// Remove container increase request if it exists
application.removeIncreaseRequest(node.getNodeID(),
rmContainer.getAllocatedSchedulerKey(), rmContainer.getContainerId());
boolean removed = false; boolean removed = false;
// Careful! Locking order is important! // Careful! Locking order is important!
@ -1732,7 +1682,7 @@ public class LeafQueue extends AbstractCSQueue {
orderingPolicy.containerReleased(application, rmContainer); orderingPolicy.containerReleased(application, rmContainer);
releaseResource(clusterResource, application, container.getResource(), releaseResource(clusterResource, application, container.getResource(),
node.getPartition(), rmContainer, false); node.getPartition(), rmContainer);
} }
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
@ -1753,12 +1703,10 @@ public class LeafQueue extends AbstractCSQueue {
void allocateResource(Resource clusterResource, void allocateResource(Resource clusterResource,
SchedulerApplicationAttempt application, Resource resource, SchedulerApplicationAttempt application, Resource resource,
String nodePartition, RMContainer rmContainer, String nodePartition, RMContainer rmContainer) {
boolean isIncreasedAllocation) {
try { try {
writeLock.lock(); writeLock.lock();
super.allocateResource(clusterResource, resource, nodePartition, super.allocateResource(clusterResource, resource, nodePartition);
isIncreasedAllocation);
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
clusterResource); clusterResource);
@ -1807,11 +1755,10 @@ public class LeafQueue extends AbstractCSQueue {
void releaseResource(Resource clusterResource, void releaseResource(Resource clusterResource,
FiCaSchedulerApp application, Resource resource, String nodePartition, FiCaSchedulerApp application, Resource resource, String nodePartition,
RMContainer rmContainer, boolean isChangeResource) { RMContainer rmContainer) {
try { try {
writeLock.lock(); writeLock.lock();
super.releaseResource(clusterResource, resource, nodePartition, super.releaseResource(clusterResource, resource, nodePartition);
isChangeResource);
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
clusterResource); clusterResource);
@ -2139,7 +2086,7 @@ public class LeafQueue extends AbstractCSQueue {
rmContainer.getContainer().getNodeId()); rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, attempt, allocateResource(clusterResource, attempt,
rmContainer.getContainer().getResource(), node.getPartition(), rmContainer.getContainer().getResource(), node.getPartition(),
rmContainer, false); rmContainer);
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
@ -2274,7 +2221,7 @@ public class LeafQueue extends AbstractCSQueue {
FiCaSchedulerNode node = FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId()); scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, application, rmContainer.getContainer() allocateResource(clusterResource, application, rmContainer.getContainer()
.getResource(), node.getPartition(), rmContainer, false); .getResource(), node.getPartition(), rmContainer);
LOG.info("movedContainer" + " container=" + rmContainer.getContainer() LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource() + " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
@ -2293,7 +2240,7 @@ public class LeafQueue extends AbstractCSQueue {
FiCaSchedulerNode node = FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId()); scheduler.getNode(rmContainer.getContainer().getNodeId());
releaseResource(clusterResource, application, rmContainer.getContainer() releaseResource(clusterResource, application, rmContainer.getContainer()
.getResource(), node.getPartition(), rmContainer, false); .getResource(), node.getPartition(), rmContainer);
LOG.info("movedContainer" + " container=" + rmContainer.getContainer() LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource() + " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
@ -2362,85 +2309,6 @@ public class LeafQueue extends AbstractCSQueue {
return defaultAppPriorityPerQueue; return defaultAppPriorityPerQueue;
} }
/**
*
* @param clusterResource Total cluster resource
* @param decreaseRequest The decrease request
* @param app The application of interest
*/
@Override
public void decreaseContainer(Resource clusterResource,
SchedContainerChangeRequest decreaseRequest,
FiCaSchedulerApp app) throws InvalidResourceRequestException {
// If the container being decreased is reserved, we need to unreserve it
// first.
RMContainer rmContainer = decreaseRequest.getRMContainer();
if (rmContainer.hasIncreaseReservation()) {
unreserveIncreasedContainer(clusterResource, app,
(FiCaSchedulerNode)decreaseRequest.getSchedulerNode(), rmContainer);
}
boolean resourceDecreased = false;
Resource resourceBeforeDecrease;
// Grab queue lock to avoid race condition when getting container resource
try {
writeLock.lock();
// Make sure the decrease request is valid in terms of current resource
// and target resource. This must be done under the leaf queue lock.
// Throws exception if the check fails.
RMServerUtils.checkSchedContainerChangeRequest(decreaseRequest, false);
// Save resource before decrease for debug log
resourceBeforeDecrease = Resources.clone(
rmContainer.getAllocatedResource());
// Do we have increase request for the same container? If so, remove it
boolean hasIncreaseRequest = app.removeIncreaseRequest(
decreaseRequest.getNodeId(),
decreaseRequest.getRMContainer().getAllocatedSchedulerKey(),
decreaseRequest.getContainerId());
if (hasIncreaseRequest) {
if (LOG.isDebugEnabled()) {
LOG.debug("While processing decrease requests, found an increase"
+ " request for the same container " + decreaseRequest
.getContainerId() + ", removed the increase request");
}
}
// Delta capacity is negative when it's a decrease request
Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
if (Resources.equals(absDelta, Resources.none())) {
// If delta capacity of this decrease request is 0, this decrease
// request serves the purpose of cancelling an existing increase request
// if any
if (LOG.isDebugEnabled()) {
LOG.debug("Decrease target resource equals to existing resource for"
+ " container:" + decreaseRequest.getContainerId()
+ " ignore this decrease request.");
}
} else{
// Release the delta resource
releaseResource(clusterResource, app, absDelta,
decreaseRequest.getNodePartition(),
decreaseRequest.getRMContainer(), true);
// Notify application
app.decreaseContainer(decreaseRequest);
// Notify node
decreaseRequest.getSchedulerNode().decreaseContainer(
decreaseRequest.getContainerId(), absDelta);
resourceDecreased = true;
}
} finally {
writeLock.unlock();
}
if (resourceDecreased) {
// Notify parent queue outside of leaf queue lock
getParent().decreaseContainer(clusterResource, decreaseRequest, app);
LOG.info("Application attempt " + app.getApplicationAttemptId()
+ " decreased container:" + decreaseRequest.getContainerId()
+ " from " + resourceBeforeDecrease + " to "
+ decreaseRequest.getTargetCapacity());
}
}
public void updateApplicationPriority(SchedulerApplication<FiCaSchedulerApp> app, public void updateApplicationPriority(SchedulerApplication<FiCaSchedulerApp> app,
Priority newAppPriority) { Priority newAppPriority) {
try { try {

View File

@ -788,12 +788,11 @@ public class ParentQueue extends AbstractCSQueue {
} }
private void internalReleaseResource(Resource clusterResource, private void internalReleaseResource(Resource clusterResource,
FiCaSchedulerNode node, Resource releasedResource, FiCaSchedulerNode node, Resource releasedResource) {
boolean changeResource) {
try { try {
writeLock.lock(); writeLock.lock();
super.releaseResource(clusterResource, releasedResource, super.releaseResource(clusterResource, releasedResource,
node.getPartition(), changeResource); node.getPartition());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug( LOG.debug(
@ -804,38 +803,6 @@ public class ParentQueue extends AbstractCSQueue {
writeLock.unlock(); writeLock.unlock();
} }
} }
@Override
public void decreaseContainer(Resource clusterResource,
SchedContainerChangeRequest decreaseRequest, FiCaSchedulerApp app)
throws InvalidResourceRequestException {
// delta capacity is negative when it's a decrease request
Resource absDeltaCapacity =
Resources.negate(decreaseRequest.getDeltaCapacity());
internalReleaseResource(clusterResource,
csContext.getNode(decreaseRequest.getNodeId()), absDeltaCapacity, false);
// Inform the parent
if (parent != null) {
parent.decreaseContainer(clusterResource, decreaseRequest, app);
}
}
@Override
public void unreserveIncreasedContainer(Resource clusterResource,
FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) {
if (app != null) {
internalReleaseResource(clusterResource, node,
rmContainer.getReservedResource(), false);
// Inform the parent
if (parent != null) {
parent.unreserveIncreasedContainer(clusterResource, app, node,
rmContainer);
}
}
}
@Override @Override
public void completedContainer(Resource clusterResource, public void completedContainer(Resource clusterResource,
@ -845,7 +812,7 @@ public class ParentQueue extends AbstractCSQueue {
boolean sortQueues) { boolean sortQueues) {
if (application != null) { if (application != null) {
internalReleaseResource(clusterResource, node, internalReleaseResource(clusterResource, node,
rmContainer.getContainer().getResource(), false); rmContainer.getContainer().getResource());
// Inform the parent // Inform the parent
if (parent != null) { if (parent != null) {
@ -901,7 +868,7 @@ public class ParentQueue extends AbstractCSQueue {
FiCaSchedulerNode node = scheduler.getNode( FiCaSchedulerNode node = scheduler.getNode(
rmContainer.getContainer().getNodeId()); rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, allocateResource(clusterResource,
rmContainer.getContainer().getResource(), node.getPartition(), false); rmContainer.getContainer().getResource(), node.getPartition());
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
@ -938,7 +905,7 @@ public class ParentQueue extends AbstractCSQueue {
FiCaSchedulerNode node = FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId()); scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, rmContainer.getContainer() allocateResource(clusterResource, rmContainer.getContainer()
.getResource(), node.getPartition(), false); .getResource(), node.getPartition());
LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster=" + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
@ -958,7 +925,7 @@ public class ParentQueue extends AbstractCSQueue {
scheduler.getNode(rmContainer.getContainer().getNodeId()); scheduler.getNode(rmContainer.getContainer().getNodeId());
super.releaseResource(clusterResource, super.releaseResource(clusterResource,
rmContainer.getContainer().getResource(), rmContainer.getContainer().getResource(),
node.getPartition(), false); node.getPartition());
LOG.info("movedContainer" + " queueMoveOut=" + getQueueName() LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster=" + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
@ -975,11 +942,10 @@ public class ParentQueue extends AbstractCSQueue {
} }
void allocateResource(Resource clusterResource, void allocateResource(Resource clusterResource,
Resource resource, String nodePartition, boolean changeContainerResource) { Resource resource, String nodePartition) {
try { try {
writeLock.lock(); writeLock.lock();
super.allocateResource(clusterResource, resource, nodePartition, super.allocateResource(clusterResource, resource, nodePartition);
changeContainerResource);
/** /**
* check if we need to kill (killable) containers if maximum resource violated. * check if we need to kill (killable) containers if maximum resource violated.
@ -1069,8 +1035,7 @@ public class ParentQueue extends AbstractCSQueue {
// Book-keeping // Book-keeping
// Note: Update headroom to account for current allocation too... // Note: Update headroom to account for current allocation too...
allocateResource(cluster, allocation.getAllocatedOrReservedResource(), allocateResource(cluster, allocation.getAllocatedOrReservedResource(),
schedulerContainer.getNodePartition(), schedulerContainer.getNodePartition());
allocation.isIncreasedAllocation());
LOG.info("assignedContainer" + " queue=" + getQueueName() LOG.info("assignedContainer" + " queue=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="

View File

@ -33,7 +33,6 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
public class ContainerAllocator extends AbstractContainerAllocator { public class ContainerAllocator extends AbstractContainerAllocator {
private AbstractContainerAllocator increaseContainerAllocator;
private AbstractContainerAllocator regularContainerAllocator; private AbstractContainerAllocator regularContainerAllocator;
public ContainerAllocator(FiCaSchedulerApp application, public ContainerAllocator(FiCaSchedulerApp application,
@ -45,8 +44,6 @@ public class ContainerAllocator extends AbstractContainerAllocator {
RMContext rmContext, ActivitiesManager activitiesManager) { RMContext rmContext, ActivitiesManager activitiesManager) {
super(application, rc, rmContext); super(application, rc, rmContext);
increaseContainerAllocator =
new IncreaseContainerAllocator(application, rc, rmContext);
regularContainerAllocator = new RegularContainerAllocator(application, rc, regularContainerAllocator = new RegularContainerAllocator(application, rc,
rmContext, activitiesManager); rmContext, activitiesManager);
} }
@ -55,32 +52,8 @@ public class ContainerAllocator extends AbstractContainerAllocator {
public CSAssignment assignContainers(Resource clusterResource, public CSAssignment assignContainers(Resource clusterResource,
PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode, PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, RMContainer reservedContainer) { ResourceLimits resourceLimits, RMContainer reservedContainer) {
if (reservedContainer != null) { return regularContainerAllocator.assignContainers(clusterResource,
if (reservedContainer.getState() == RMContainerState.RESERVED) { ps, schedulingMode, resourceLimits, reservedContainer);
// It's a regular container
return regularContainerAllocator.assignContainers(clusterResource,
ps, schedulingMode, resourceLimits, reservedContainer);
} else {
// It's a increase container
return increaseContainerAllocator.assignContainers(clusterResource,
ps, schedulingMode, resourceLimits, reservedContainer);
}
} else {
/*
* Try to allocate increase container first, and if we failed to allocate
* anything, we will try to allocate regular container
*/
CSAssignment assign =
increaseContainerAllocator.assignContainers(clusterResource, ps,
schedulingMode, resourceLimits, null);
if (Resources.greaterThan(rc, clusterResource, assign.getResource(),
Resources.none())) {
return assign;
}
return regularContainerAllocator.assignContainers(clusterResource, ps,
schedulingMode, resourceLimits, null);
}
} }
} }

View File

@ -1,337 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
public class IncreaseContainerAllocator extends AbstractContainerAllocator {
private static final Log LOG =
LogFactory.getLog(IncreaseContainerAllocator.class);
public IncreaseContainerAllocator(FiCaSchedulerApp application,
ResourceCalculator rc, RMContext rmContext) {
super(application, rc, rmContext);
}
/**
* Quick check if we can allocate anything here:
* We will not continue if:
* - Headroom doesn't support allocate minimumAllocation
* -
*/
private boolean checkHeadroom(Resource clusterResource,
ResourceLimits currentResourceLimits, Resource required) {
return Resources.greaterThanOrEqual(rc, clusterResource,
currentResourceLimits.getHeadroom(), required);
}
private CSAssignment createReservedIncreasedCSAssignment(
SchedContainerChangeRequest request) {
CSAssignment assignment =
new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null,
application, CSAssignment.SkippedType.NONE, false);
Resources.addTo(assignment.getAssignmentInformation().getReserved(),
request.getDeltaCapacity());
assignment.getAssignmentInformation().incrReservations();
assignment.getAssignmentInformation().addReservationDetails(
request.getRMContainer(), application.getCSLeafQueue().getQueuePath());
assignment.setIncreasedAllocation(true);
LOG.info("Reserved increase container request:" + request.toString());
return assignment;
}
private CSAssignment createSuccessfullyIncreasedCSAssignment(
SchedContainerChangeRequest request, boolean fromReservation) {
CSAssignment assignment =
new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null,
application, CSAssignment.SkippedType.NONE, fromReservation);
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
request.getDeltaCapacity());
assignment.getAssignmentInformation().incrAllocations();
assignment.getAssignmentInformation().addAllocationDetails(
request.getRMContainer(), application.getCSLeafQueue().getQueuePath());
assignment.setIncreasedAllocation(true);
if (fromReservation) {
assignment.setFulfilledReservedContainer(request.getRMContainer());
}
// notify application
application
.getCSLeafQueue()
.getOrderingPolicy()
.containerAllocated(application,
application.getRMContainer(request.getContainerId()));
LOG.info("Approved increase container request:" + request.toString()
+ " fromReservation=" + fromReservation);
return assignment;
}
private CSAssignment allocateIncreaseRequestFromReservedContainer(
SchedulerNode node, Resource cluster,
SchedContainerChangeRequest increaseRequest) {
if (Resources.fitsIn(rc, cluster, increaseRequest.getDeltaCapacity(),
node.getUnallocatedResource())) {
return createSuccessfullyIncreasedCSAssignment(increaseRequest, true);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to allocate reserved increase request:"
+ increaseRequest.toString()
+ ". There's no enough available resource");
}
// We still cannot allocate this container, will wait for next turn
return CSAssignment.SKIP_ASSIGNMENT;
}
}
private CSAssignment allocateIncreaseRequest(FiCaSchedulerNode node,
Resource cluster, SchedContainerChangeRequest increaseRequest) {
if (Resources.fitsIn(rc, cluster, increaseRequest.getDeltaCapacity(),
node.getUnallocatedResource())) {
return createSuccessfullyIncreasedCSAssignment(increaseRequest, false);
} else{
// We cannot allocate this container, but since queue capacity /
// user-limit matches, we can reserve this container on this node.
return createReservedIncreasedCSAssignment(increaseRequest);
}
}
@Override
public CSAssignment assignContainers(Resource clusterResource,
PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, RMContainer reservedContainer) {
AppSchedulingInfo sinfo = application.getAppSchedulingInfo();
FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
if (null == node) {
// This is global scheduling enabled
// FIXME, support container increase when global scheduling enabled
return CSAssignment.SKIP_ASSIGNMENT;
}
NodeId nodeId = node.getNodeID();
if (reservedContainer == null) {
// Do we have increase request on this node?
if (!sinfo.hasIncreaseRequest(nodeId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip allocating increase request since we don't have any"
+ " increase request on this node=" + node.getNodeID());
}
return CSAssignment.SKIP_ASSIGNMENT;
}
// Check if we need to unreserve something, note that we don't support
// continuousReservationLooking now. TODO, need think more about how to
// support it.
boolean shouldUnreserve =
Resources.greaterThan(rc, clusterResource,
resourceLimits.getAmountNeededUnreserve(), Resources.none());
// Check if we can allocate minimum resource according to headroom
boolean cannotAllocateAnything =
!checkHeadroom(clusterResource, resourceLimits, rmContext
.getScheduler().getMinimumResourceCapability());
// Skip the app if we failed either of above check
if (cannotAllocateAnything || shouldUnreserve) {
if (LOG.isDebugEnabled()) {
if (shouldUnreserve) {
LOG.debug("Cannot continue since we have to unreserve some resource"
+ ", now increase container allocation doesn't "
+ "support continuous reservation looking..");
}
if (cannotAllocateAnything) {
LOG.debug("We cannot allocate anything because of low headroom, "
+ "headroom=" + resourceLimits.getHeadroom());
}
}
return CSAssignment.SKIP_ASSIGNMENT;
}
CSAssignment assigned = null;
/*
* Loop each priority, and containerId. Container priority is not
* equivalent to request priority, application master can run an important
* task on a less prioritized container.
*
* So behavior here is, we still try to increase container with higher
* priority, but will skip increase request and move to next increase
* request if queue-limit or user-limit aren't satisfied
*/
for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Looking at increase request for application="
+ application.getApplicationAttemptId() + " priority="
+ schedulerKey.getPriority());
}
/*
* If we have multiple to-be-increased containers under same priority on
* a same host, we will try to increase earlier launched container
* first. And again - we will skip a request and move to next if it
* cannot be allocated.
*/
Map<ContainerId, SchedContainerChangeRequest> increaseRequestMap =
sinfo.getIncreaseRequests(nodeId, schedulerKey);
// We don't have more increase request on this priority, skip..
if (null == increaseRequestMap) {
if (LOG.isDebugEnabled()) {
LOG.debug("There's no increase request for "
+ application.getApplicationAttemptId() + " priority="
+ schedulerKey.getPriority());
}
continue;
}
Iterator<Entry<ContainerId, SchedContainerChangeRequest>> iter =
increaseRequestMap.entrySet().iterator();
while (iter.hasNext()) {
Entry<ContainerId, SchedContainerChangeRequest> entry =
iter.next();
SchedContainerChangeRequest increaseRequest =
entry.getValue();
if (LOG.isDebugEnabled()) {
LOG.debug(
"Looking at increase request=" + increaseRequest.toString());
}
boolean headroomSatisifed = checkHeadroom(clusterResource,
resourceLimits, increaseRequest.getDeltaCapacity());
if (!headroomSatisifed) {
// skip if doesn't satisfy headroom limit
if (LOG.isDebugEnabled()) {
LOG.debug(" Headroom is not satisfied, skip..");
}
continue;
}
RMContainer rmContainer = increaseRequest.getRMContainer();
if (rmContainer.getContainerState() != ContainerState.RUNNING) {
// if the container is not running, we should remove the
// increaseRequest and continue;
if (LOG.isDebugEnabled()) {
LOG.debug(" Container is not running any more, skip...");
}
application.addToBeRemovedIncreaseRequest(increaseRequest);
continue;
}
if (!Resources.fitsIn(rc, clusterResource,
increaseRequest.getTargetCapacity(), node.getTotalResource())) {
// if the target capacity is more than what the node can offer, we
// will simply remove and skip it.
// The reason of doing check here instead of adding increase request
// to scheduler because node's resource could be updated after
// request added.
if (LOG.isDebugEnabled()) {
LOG.debug(" Target capacity is more than what node can offer,"
+ " node.resource=" + node.getTotalResource());
}
application.addToBeRemovedIncreaseRequest(increaseRequest);
continue;
}
// Try to allocate the increase request
assigned =
allocateIncreaseRequest(node, clusterResource, increaseRequest);
if (assigned.getSkippedType()
== CSAssignment.SkippedType.NONE) {
// When we don't skip this request, which means we either allocated
// OR reserved this request. We will break
break;
}
}
// We may have allocated something
if (assigned != null && assigned.getSkippedType()
== CSAssignment.SkippedType.NONE) {
break;
}
}
return assigned == null ? CSAssignment.SKIP_ASSIGNMENT : assigned;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to allocate reserved increase container request..");
}
// We already reserved this increase container
SchedContainerChangeRequest request =
sinfo.getIncreaseRequest(nodeId,
reservedContainer.getAllocatedSchedulerKey(),
reservedContainer.getContainerId());
// We will cancel the reservation any of following happens
// - Container finished
// - No increase request needed
// - Target resource updated
if (null == request
|| reservedContainer.getContainerState() != ContainerState.RUNNING
|| (!Resources.equals(reservedContainer.getReservedResource(),
request.getDeltaCapacity()))) {
if (LOG.isDebugEnabled()) {
LOG.debug("We don't need reserved increase container request "
+ "for container=" + reservedContainer.getContainerId()
+ ". Unreserving and return...");
}
// We don't need this container now, just return excessive reservation
return new CSAssignment(application, reservedContainer);
}
return allocateIncreaseRequestFromReservedContainer(node, clusterResource,
request);
}
}
}

View File

@ -43,8 +43,6 @@ public class ContainerAllocationProposal<A extends SchedulerApplicationAttempt,
// not be included by toRelease list // not be included by toRelease list
private SchedulerContainer<A, N> allocateFromReservedContainer; private SchedulerContainer<A, N> allocateFromReservedContainer;
private boolean isIncreasedAllocation;
private NodeType allocationLocalityType; private NodeType allocationLocalityType;
private NodeType requestLocalityType; private NodeType requestLocalityType;
@ -57,7 +55,7 @@ public class ContainerAllocationProposal<A extends SchedulerApplicationAttempt,
SchedulerContainer<A, N> allocatedOrReservedContainer, SchedulerContainer<A, N> allocatedOrReservedContainer,
List<SchedulerContainer<A, N>> toRelease, List<SchedulerContainer<A, N>> toRelease,
SchedulerContainer<A, N> allocateFromReservedContainer, SchedulerContainer<A, N> allocateFromReservedContainer,
boolean isIncreasedAllocation, NodeType allocationLocalityType, NodeType allocationLocalityType,
NodeType requestLocalityType, SchedulingMode schedulingMode, NodeType requestLocalityType, SchedulingMode schedulingMode,
Resource allocatedResource) { Resource allocatedResource) {
this.allocatedOrReservedContainer = allocatedOrReservedContainer; this.allocatedOrReservedContainer = allocatedOrReservedContainer;
@ -65,7 +63,6 @@ public class ContainerAllocationProposal<A extends SchedulerApplicationAttempt,
this.toRelease = toRelease; this.toRelease = toRelease;
} }
this.allocateFromReservedContainer = allocateFromReservedContainer; this.allocateFromReservedContainer = allocateFromReservedContainer;
this.isIncreasedAllocation = isIncreasedAllocation;
this.allocationLocalityType = allocationLocalityType; this.allocationLocalityType = allocationLocalityType;
this.requestLocalityType = requestLocalityType; this.requestLocalityType = requestLocalityType;
this.schedulingMode = schedulingMode; this.schedulingMode = schedulingMode;
@ -84,10 +81,6 @@ public class ContainerAllocationProposal<A extends SchedulerApplicationAttempt,
return allocationLocalityType; return allocationLocalityType;
} }
public boolean isIncreasedAllocation() {
return isIncreasedAllocation;
}
public SchedulerContainer<A, N> getAllocateFromReservedContainer() { public SchedulerContainer<A, N> getAllocateFromReservedContainer() {
return allocateFromReservedContainer; return allocateFromReservedContainer;
} }

View File

@ -314,54 +314,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
return false; return false;
} }
private SchedContainerChangeRequest getResourceChangeRequest(
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) {
return appSchedulingInfo.getIncreaseRequest(
schedulerContainer.getSchedulerNode().getNodeID(),
schedulerContainer.getSchedulerRequestKey(),
schedulerContainer.getRmContainer().getContainerId());
}
private boolean checkIncreaseContainerAllocation(
ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation,
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) {
// When increase a container
if (schedulerContainer.getRmContainer().getState()
!= RMContainerState.RUNNING) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to increase a container, but container="
+ schedulerContainer.getRmContainer().getContainerId()
+ " is not in running state.");
}
return false;
}
// Check if increase request is still valid
SchedContainerChangeRequest increaseRequest = getResourceChangeRequest(
schedulerContainer);
if (null == increaseRequest || !Resources.equals(
increaseRequest.getDeltaCapacity(),
allocation.getAllocatedOrReservedResource())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Increase request has been changed, reject this proposal");
}
return false;
}
if (allocation.getAllocateFromReservedContainer() != null) {
// In addition, if allocation is from a reserved container, check
// if the reserved container has enough reserved space
if (!Resources.equals(
allocation.getAllocateFromReservedContainer().getRmContainer()
.getReservedResource(), increaseRequest.getDeltaCapacity())) {
return false;
}
}
return true;
}
private boolean commonCheckContainerAllocation( private boolean commonCheckContainerAllocation(
Resource cluster, Resource cluster,
ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation, ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation,
@ -447,30 +399,23 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
schedulerContainer = allocation.getAllocatedOrReservedContainer(); schedulerContainer = allocation.getAllocatedOrReservedContainer();
if (schedulerContainer.isAllocated()) { if (schedulerContainer.isAllocated()) {
if (!allocation.isIncreasedAllocation()) { // When allocate a new container
// When allocate a new container resourceRequests =
resourceRequests = schedulerContainer.getRmContainer().getResourceRequests();
schedulerContainer.getRmContainer().getResourceRequests();
// Check pending resource request // Check pending resource request
if (!appSchedulingInfo.checkAllocation(allocation.getAllocationLocalityType(), if (!appSchedulingInfo.checkAllocation(allocation.getAllocationLocalityType(),
schedulerContainer.getSchedulerNode(), schedulerContainer.getSchedulerNode(),
schedulerContainer.getSchedulerRequestKey())) { schedulerContainer.getSchedulerRequestKey())) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("No pending resource for: nodeType=" + allocation LOG.debug("No pending resource for: nodeType=" + allocation
.getAllocationLocalityType() + ", node=" + schedulerContainer .getAllocationLocalityType() + ", node=" + schedulerContainer
.getSchedulerNode() + ", requestKey=" + schedulerContainer .getSchedulerNode() + ", requestKey=" + schedulerContainer
.getSchedulerRequestKey() + ", application=" .getSchedulerRequestKey() + ", application="
+ getApplicationAttemptId()); + getApplicationAttemptId());
} }
return false; return false;
}
} else {
if (!checkIncreaseContainerAllocation(allocation,
schedulerContainer)) {
return false;
}
} }
// Common part of check container allocation regardless if it is a // Common part of check container allocation regardless if it is a
@ -543,12 +488,10 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
// Generate new containerId if it is not an allocation for increasing // Generate new containerId if it is not an allocation for increasing
// Or re-reservation // Or re-reservation
if (!allocation.isIncreasedAllocation()) { if (rmContainer.getContainer().getId() == null) {
if (rmContainer.getContainer().getId() == null) { rmContainer.setContainerId(BuilderUtils
rmContainer.setContainerId(BuilderUtils .newContainerId(getApplicationAttemptId(),
.newContainerId(getApplicationAttemptId(), getNewContainerId()));
getNewContainerId()));
}
} }
ContainerId containerId = rmContainer.getContainerId(); ContainerId containerId = rmContainer.getContainerId();
@ -564,77 +507,50 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
schedulerContainer.getSchedulerNode(), reservedContainer); schedulerContainer.getSchedulerNode(), reservedContainer);
} }
// Update this application for the allocated container // Allocate a new container
if (!allocation.isIncreasedAllocation()) { addToNewlyAllocatedContainers(
// Allocate a new container schedulerContainer.getSchedulerNode(), rmContainer);
addToNewlyAllocatedContainers( liveContainers.put(containerId, rmContainer);
schedulerContainer.getSchedulerNode(), rmContainer);
liveContainers.put(containerId, rmContainer);
// Deduct pending resource requests // Deduct pending resource requests
List<ResourceRequest> requests = appSchedulingInfo.allocate( List<ResourceRequest> requests = appSchedulingInfo.allocate(
allocation.getAllocationLocalityType(), allocation.getAllocationLocalityType(),
schedulerContainer.getSchedulerNode(), schedulerContainer.getSchedulerNode(),
schedulerContainer.getSchedulerRequestKey(), schedulerContainer.getSchedulerRequestKey(),
schedulerContainer.getRmContainer().getContainer()); schedulerContainer.getRmContainer().getContainer());
((RMContainerImpl) rmContainer).setResourceRequests(requests); ((RMContainerImpl) rmContainer).setResourceRequests(requests);
attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(), attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(),
allocation.getAllocatedOrReservedResource()); allocation.getAllocatedOrReservedResource());
rmContainer.handle( rmContainer.handle(
new RMContainerEvent(containerId, RMContainerEventType.START)); new RMContainerEvent(containerId, RMContainerEventType.START));
// Inform the node // Inform the node
schedulerContainer.getSchedulerNode().allocateContainer( schedulerContainer.getSchedulerNode().allocateContainer(
rmContainer); rmContainer);
// update locality statistics, // update locality statistics,
incNumAllocatedContainers(allocation.getAllocationLocalityType(), incNumAllocatedContainers(allocation.getAllocationLocalityType(),
allocation.getRequestLocalityType()); allocation.getRequestLocalityType());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId=" + containerId LOG.debug("allocate: applicationAttemptId=" + containerId
.getApplicationAttemptId() + " container=" + containerId .getApplicationAttemptId() + " container=" + containerId
+ " host=" + rmContainer.getAllocatedNode().getHost() + " host=" + rmContainer.getAllocatedNode().getHost()
+ " type=" + allocation.getAllocationLocalityType()); + " type=" + allocation.getAllocationLocalityType());
}
RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
"SchedulerApp", getApplicationId(), containerId,
allocation.getAllocatedOrReservedResource());
} else{
SchedContainerChangeRequest increaseRequest =
getResourceChangeRequest(schedulerContainer);
// allocate resource for an increase request
// Notify node
schedulerContainer.getSchedulerNode().increaseContainer(
increaseRequest.getContainerId(),
increaseRequest.getDeltaCapacity());
// OK, we can allocate this increase request
// Notify application
increaseContainer(increaseRequest);
} }
RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
"SchedulerApp", getApplicationId(), containerId,
allocation.getAllocatedOrReservedResource());
} else { } else {
if (!allocation.isIncreasedAllocation()) { // If the rmContainer's state is already updated to RESERVED, this is
// If the rmContainer's state is already updated to RESERVED, this is // a reReservation
// a reReservation reserve(schedulerContainer.getSchedulerRequestKey(),
reserve(schedulerContainer.getSchedulerRequestKey(), schedulerContainer.getSchedulerNode(),
schedulerContainer.getSchedulerNode(), schedulerContainer.getRmContainer(),
schedulerContainer.getRmContainer(), schedulerContainer.getRmContainer().getContainer(),
schedulerContainer.getRmContainer().getContainer(), reReservation);
reReservation);
} else{
SchedContainerChangeRequest increaseRequest =
getResourceChangeRequest(schedulerContainer);
reserveIncreasedContainer(
schedulerContainer.getSchedulerRequestKey(),
schedulerContainer.getSchedulerNode(),
increaseRequest.getRMContainer(),
increaseRequest.getDeltaCapacity());
}
} }
} }
} finally { } finally {
@ -651,9 +567,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
FiCaSchedulerNode node, RMContainer rmContainer) { FiCaSchedulerNode node, RMContainer rmContainer) {
try { try {
writeLock.lock(); writeLock.lock();
// Cancel increase request (if it has reserved increase request
rmContainer.cancelIncreaseReservation();
// Done with the reservation? // Done with the reservation?
if (internalUnreserve(node, schedulerKey)) { if (internalUnreserve(node, schedulerKey)) {
node.unreserveResource(this); node.unreserveResource(this);
@ -809,13 +722,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
.entrySet()) { .entrySet()) {
NodeId nodeId = entry.getKey(); NodeId nodeId = entry.getKey();
RMContainer reservedContainer = entry.getValue(); RMContainer reservedContainer = entry.getValue();
if (reservedContainer.hasIncreaseReservation()) {
// Currently, only regular container allocation supports continuous
// reservation looking, we don't support canceling increase request
// reservation when allocating regular container.
continue;
}
Resource reservedResource = reservedContainer.getReservedResource(); Resource reservedResource = reservedContainer.getReservedResource();
// make sure we unreserve one with at least the same amount of // make sure we unreserve one with at least the same amount of
@ -871,25 +777,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
} }
} }
public boolean reserveIncreasedContainer(SchedulerRequestKey schedulerKey,
FiCaSchedulerNode node,
RMContainer rmContainer, Resource reservedResource) {
// Inform the application
if (super.reserveIncreasedContainer(node, schedulerKey, rmContainer,
reservedResource)) {
queue.getMetrics().reserveResource(getUser(), reservedResource);
// Update the node
node.reserveResource(this, schedulerKey, rmContainer);
// Succeeded
return true;
}
return false;
}
public void reserve(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, public void reserve(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node,
RMContainer rmContainer, Container container, boolean reReservation) { RMContainer rmContainer, Container container, boolean reReservation) {
// Update reserved metrics if this is the first reservation // Update reserved metrics if this is the first reservation
@ -1111,31 +998,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
return this.writeLock; return this.writeLock;
} }
public void addToBeRemovedIncreaseRequest(
SchedContainerChangeRequest request) {
toBeRemovedIncRequests.put(request.getContainerId(), request);
}
public void removedToBeRemovedIncreaseRequests() {
// Remove invalid in request requests
if (!toBeRemovedIncRequests.isEmpty()) {
try {
writeLock.lock();
Iterator<Map.Entry<ContainerId, SchedContainerChangeRequest>> iter =
toBeRemovedIncRequests.entrySet().iterator();
while (iter.hasNext()) {
SchedContainerChangeRequest req = iter.next().getValue();
appSchedulingInfo.removeIncreaseRequest(req.getNodeId(),
req.getRMContainer().getAllocatedSchedulerKey(),
req.getContainerId());
iter.remove();
}
} finally {
writeLock.unlock();
}
}
}
/** /**
* Move reservation from one node to another * Move reservation from one node to another
* Comparing to unreserve container on source node and reserve a new * Comparing to unreserve container on source node and reserve a new

View File

@ -153,20 +153,6 @@ public class FiCaSchedulerNode extends SchedulerNode {
} }
} }
@Override
protected synchronized void changeContainerResource(ContainerId containerId,
Resource deltaResource, boolean increase) {
super.changeContainerResource(containerId, deltaResource, increase);
if (killableContainers.containsKey(containerId)) {
if (increase) {
Resources.addTo(totalKillableResources, deltaResource);
} else {
Resources.subtractFrom(totalKillableResources, deltaResource);
}
}
}
public synchronized Resource getTotalKillableResources() { public synchronized Resource getTotalKillableResources() {
return totalKillableResources; return totalKillableResources;
} }

View File

@ -836,9 +836,7 @@ public class FairScheduler extends
} }
// Handle promotions and demotions // Handle promotions and demotions
handleExecutionTypeUpdates( handleContainerUpdates(application, updateRequests);
application, updateRequests.getPromotionRequests(),
updateRequests.getDemotionRequests());
// Sanity check // Sanity check
normalizeRequests(ask); normalizeRequests(ask);
@ -1790,13 +1788,6 @@ public class FairScheduler extends
return targetQueueName; return targetQueueName;
} }
@Override
protected void decreaseContainer(
SchedContainerChangeRequest decreaseRequest,
SchedulerApplicationAttempt attempt) {
// TODO Auto-generated method stub
}
public float getReservableNodesRatio() { public float getReservableNodesRatio() {
return reservableNodesRatio; return reservableNodesRatio;
} }

View File

@ -936,14 +936,6 @@ public class FifoScheduler extends
return usedResource; return usedResource;
} }
@Override
protected void decreaseContainer(
SchedContainerChangeRequest decreaseRequest,
SchedulerApplicationAttempt attempt) {
// TODO Auto-generated method stub
}
@Override @Override
protected synchronized void nodeUpdate(RMNode nm) { protected synchronized void nodeUpdate(RMNode nm) {
super.nodeUpdate(nm); super.nodeUpdate(nm);

View File

@ -137,11 +137,11 @@ public class TestChildQueueOrder {
final Resource allocatedResource = Resources.createResource(allocation); final Resource allocatedResource = Resources.createResource(allocation);
if (queue instanceof ParentQueue) { if (queue instanceof ParentQueue) {
((ParentQueue)queue).allocateResource(clusterResource, ((ParentQueue)queue).allocateResource(clusterResource,
allocatedResource, RMNodeLabelsManager.NO_LABEL, false); allocatedResource, RMNodeLabelsManager.NO_LABEL);
} else { } else {
FiCaSchedulerApp app1 = getMockApplication(0, ""); FiCaSchedulerApp app1 = getMockApplication(0, "");
((LeafQueue)queue).allocateResource(clusterResource, app1, ((LeafQueue)queue).allocateResource(clusterResource, app1,
allocatedResource, null, null, false); allocatedResource, null, null);
} }
// Next call - nothing // Next call - nothing

View File

@ -87,18 +87,6 @@ public class TestContainerResizing {
super(); super();
} }
@Override
protected void decreaseContainers(
List<UpdateContainerRequest> decreaseRequests,
SchedulerApplicationAttempt attempt) {
try {
Thread.sleep(1000);
} catch(InterruptedException e) {
LOG.debug("Thread interrupted.");
}
super.decreaseContainers(decreaseRequests, attempt);
}
@Override @Override
public CSAssignment allocateContainersToNode( public CSAssignment allocateContainersToNode(
PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) { PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) {
@ -288,13 +276,9 @@ public class TestContainerResizing {
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1);
/* Check reservation statuses */ /* Check reservation statuses */
// Increase request should be reserved // Increase request should be reserved
Assert.assertTrue(rmContainer1.hasIncreaseReservation());
Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemorySize());
Assert.assertFalse(app.getReservedContainers().isEmpty()); Assert.assertFalse(app.getReservedContainers().isEmpty());
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Pending resource will not be changed since it's not satisfied // Pending resource will not be changed since it's not satisfied
@ -319,7 +303,6 @@ public class TestContainerResizing {
/* Check statuses after reservation satisfied */ /* Check statuses after reservation satisfied */
// Increase request should be unreserved // Increase request should be unreserved
Assert.assertFalse(rmContainer1.hasIncreaseReservation());
Assert.assertTrue(app.getReservedContainers().isEmpty()); Assert.assertTrue(app.getReservedContainers().isEmpty());
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Pending resource will be changed since it's satisfied // Pending resource will be changed since it's satisfied
@ -391,11 +374,8 @@ public class TestContainerResizing {
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId2);
/* Check reservation statuses */ /* Check reservation statuses */
// Increase request should *NOT* be reserved as it exceeds user limit // Increase request should *NOT* be reserved as it exceeds user limit
Assert.assertFalse(rmContainer1.hasIncreaseReservation());
Assert.assertTrue(app.getReservedContainers().isEmpty()); Assert.assertTrue(app.getReservedContainers().isEmpty());
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Pending resource will not be changed since it's not satisfied // Pending resource will not be changed since it's not satisfied
@ -471,13 +451,9 @@ public class TestContainerResizing {
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1);
/* Check reservation statuses */ /* Check reservation statuses */
// Increase request should be reserved // Increase request should be reserved
Assert.assertTrue(rmContainer1.hasIncreaseReservation());
Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemorySize());
Assert.assertFalse(app.getReservedContainers().isEmpty()); Assert.assertFalse(app.getReservedContainers().isEmpty());
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Pending resource will not be changed since it's not satisfied // Pending resource will not be changed since it's not satisfied
@ -510,7 +486,6 @@ public class TestContainerResizing {
// Increase request should be unreserved // Increase request should be unreserved
Assert.assertTrue(app.getReservedContainers().isEmpty()); Assert.assertTrue(app.getReservedContainers().isEmpty());
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
Assert.assertFalse(rmContainer1.hasIncreaseReservation());
// Pending resource will be changed since it's satisfied // Pending resource will be changed since it's satisfied
checkPendingResource(rm1, "default", 0 * GB, null); checkPendingResource(rm1, "default", 0 * GB, null);
Assert.assertEquals(0 * GB, Assert.assertEquals(0 * GB,
@ -585,13 +560,9 @@ public class TestContainerResizing {
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1);
/* Check reservation statuses */ /* Check reservation statuses */
// Increase request should be reserved // Increase request should be reserved
Assert.assertTrue(rmContainer1.hasIncreaseReservation());
Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemorySize());
Assert.assertFalse(app.getReservedContainers().isEmpty()); Assert.assertFalse(app.getReservedContainers().isEmpty());
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Pending resource will not be changed since it's not satisfied // Pending resource will not be changed since it's not satisfied
@ -614,7 +585,7 @@ public class TestContainerResizing {
am1.sendContainerResizingRequest(Arrays.asList( am1.sendContainerResizingRequest(Arrays.asList(
UpdateContainerRequest UpdateContainerRequest
.newInstance(0, containerId1, .newInstance(0, containerId1,
ContainerUpdateType.INCREASE_RESOURCE, ContainerUpdateType.DECREASE_RESOURCE,
Resources.createResource(1 * GB), null))); Resources.createResource(1 * GB), null)));
// Trigger a node heartbeat.. // Trigger a node heartbeat..
cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
@ -623,7 +594,6 @@ public class TestContainerResizing {
// Increase request should be unreserved // Increase request should be unreserved
Assert.assertTrue(app.getReservedContainers().isEmpty()); Assert.assertTrue(app.getReservedContainers().isEmpty());
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
Assert.assertFalse(rmContainer1.hasIncreaseReservation());
// Pending resource will be changed since it's satisfied // Pending resource will be changed since it's satisfied
checkPendingResource(rm1, "default", 0 * GB, null); checkPendingResource(rm1, "default", 0 * GB, null);
Assert.assertEquals(0 * GB, Assert.assertEquals(0 * GB,
@ -698,12 +668,8 @@ public class TestContainerResizing {
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
RMContainer rmContainer2 = app.getLiveContainersMap().get(containerId2);
/* Check reservation statuses */ /* Check reservation statuses */
// Increase request should be reserved // Increase request should be reserved
Assert.assertTrue(rmContainer2.hasIncreaseReservation());
Assert.assertEquals(6 * GB, rmContainer2.getReservedResource().getMemorySize());
Assert.assertFalse(app.getReservedContainers().isEmpty()); Assert.assertFalse(app.getReservedContainers().isEmpty());
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Pending resource will not be changed since it's not satisfied // Pending resource will not be changed since it's not satisfied
@ -721,12 +687,13 @@ public class TestContainerResizing {
// Complete container2, container will be unreserved and completed // Complete container2, container will be unreserved and completed
am1.allocate(null, Arrays.asList(containerId2)); am1.allocate(null, Arrays.asList(containerId2));
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
am1.allocate(null, null);
/* Check statuses after reservation satisfied */ /* Check statuses after reservation satisfied */
// Increase request should be unreserved // Increase request should be unreserved
Assert.assertTrue(app.getReservedContainers().isEmpty()); Assert.assertTrue(app.getReservedContainers().isEmpty());
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
Assert.assertFalse(rmContainer2.hasIncreaseReservation());
// Pending resource will be changed since it's satisfied // Pending resource will be changed since it's satisfied
checkPendingResource(rm1, "default", 0 * GB, null); checkPendingResource(rm1, "default", 0 * GB, null);
Assert.assertEquals(0 * GB, Assert.assertEquals(0 * GB,
@ -796,12 +763,8 @@ public class TestContainerResizing {
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
RMContainer rmContainer2 = app.getLiveContainersMap().get(containerId2);
/* Check reservation statuses */ /* Check reservation statuses */
// Increase request should be reserved // Increase request should be reserved
Assert.assertTrue(rmContainer2.hasIncreaseReservation());
Assert.assertEquals(6 * GB, rmContainer2.getReservedResource().getMemorySize());
Assert.assertFalse(app.getReservedContainers().isEmpty()); Assert.assertFalse(app.getReservedContainers().isEmpty());
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Pending resource will not be changed since it's not satisfied // Pending resource will not be changed since it's not satisfied
@ -825,11 +788,11 @@ public class TestContainerResizing {
// Increase request should be unreserved // Increase request should be unreserved
Assert.assertTrue(app.getReservedContainers().isEmpty()); Assert.assertTrue(app.getReservedContainers().isEmpty());
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
Assert.assertFalse(rmContainer2.hasIncreaseReservation());
// Pending resource will be changed since it's satisfied // Pending resource will be changed since it's satisfied
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
checkPendingResource(rm1, "default", 0 * GB, null); checkPendingResource(rm1, "default", 0 * GB, null);
Assert.assertEquals(0 * GB,
app.getAppAttemptResourceUsage().getPending().getMemorySize());
// Queue/user/application's usage will be updated // Queue/user/application's usage will be updated
checkUsedResource(rm1, "default", 0 * GB, null); checkUsedResource(rm1, "default", 0 * GB, null);
// User will be removed // User will be removed
@ -949,89 +912,6 @@ public class TestContainerResizing {
rm1.close(); rm1.close();
} }
@Test
public void testIncreaseContainerRequestGetPreferrence()
throws Exception {
/**
* There're multiple containers need to be increased, and there're several
* container allocation request, scheduler will try to increase container
* before allocate new containers
*/
MockRM rm1 = new MockRM() {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
// app1 -> a1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
rm1, app1.getApplicationId());
ApplicationAttemptId attemptId = am1.getApplicationAttemptId();
// Container 2, 3 (priority=3)
allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 3, 2);
// Container 4, 5 (priority=2)
allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 2, 4);
// Container 6, 7 (priority=4)
allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 4, 6);
// am1 asks to change its container[2-7] from 1G to 2G
List<UpdateContainerRequest> increaseRequests = new ArrayList<>();
for (int cId = 2; cId <= 7; cId++) {
ContainerId containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), cId);
increaseRequests.add(UpdateContainerRequest
.newInstance(0, containerId,
ContainerUpdateType.INCREASE_RESOURCE,
Resources.createResource(2 * GB), null));
}
am1.sendContainerResizingRequest(increaseRequests);
checkPendingResource(rm1, "default", 6 * GB, null);
Assert.assertEquals(6 * GB,
app.getAppAttemptResourceUsage().getPending().getMemorySize());
// Get rmNode1
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
// assignContainer, container-4/5/2 increased (which has highest priority OR
// earlier allocated)
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
AllocateResponse allocateResponse = am1.allocate(null, null);
Assert.assertEquals(3, allocateResponse.getUpdatedContainers().size());
verifyContainerIncreased(allocateResponse,
ContainerId.newContainerId(attemptId, 4), 2 * GB);
verifyContainerIncreased(allocateResponse,
ContainerId.newContainerId(attemptId, 5), 2 * GB);
verifyContainerIncreased(allocateResponse,
ContainerId.newContainerId(attemptId, 2), 2 * GB);
/* Check statuses after allocation */
// There're still 3 pending increase requests
checkPendingResource(rm1, "default", 3 * GB, null);
Assert.assertEquals(3 * GB,
app.getAppAttemptResourceUsage().getPending().getMemorySize());
// Queue/user/application's usage will be updated
checkUsedResource(rm1, "default", 10 * GB, null);
Assert.assertEquals(10 * GB, ((LeafQueue) cs.getQueue("default"))
.getUser("user").getUsed().getMemorySize());
Assert.assertEquals(0 * GB,
app.getAppAttemptResourceUsage().getReserved().getMemorySize());
Assert.assertEquals(10 * GB,
app.getAppAttemptResourceUsage().getUsed().getMemorySize());
rm1.close();
}
@Test (timeout = 60000) @Test (timeout = 60000)
public void testDecreaseContainerWillNotDeadlockContainerAllocation() public void testDecreaseContainerWillNotDeadlockContainerAllocation()
throws Exception { throws Exception {

View File

@ -200,6 +200,7 @@ public class TestIncreaseAllocationExpirer {
// back action to complete // back action to complete
Thread.sleep(10000); Thread.sleep(10000);
// Verify container size is 1G // Verify container size is 1G
am1.allocate(null, null);
Assert.assertEquals( Assert.assertEquals(
1 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) 1 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
.getAllocatedResource().getMemorySize()); .getAllocatedResource().getMemorySize());
@ -302,6 +303,8 @@ public class TestIncreaseAllocationExpirer {
// Wait long enough for the second token (5G) to expire, and verify that // Wait long enough for the second token (5G) to expire, and verify that
// the roll back action is completed as expected // the roll back action is completed as expected
Thread.sleep(10000); Thread.sleep(10000);
am1.allocate(null, null);
Thread.sleep(2000);
// Verify container size is rolled back to 3G // Verify container size is rolled back to 3G
Assert.assertEquals( Assert.assertEquals(
3 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) 3 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
@ -398,13 +401,13 @@ public class TestIncreaseAllocationExpirer {
// Decrease containers // Decrease containers
List<UpdateContainerRequest> decreaseRequests = new ArrayList<>(); List<UpdateContainerRequest> decreaseRequests = new ArrayList<>();
decreaseRequests.add(UpdateContainerRequest.newInstance(1, containerId2, decreaseRequests.add(UpdateContainerRequest.newInstance(1, containerId2,
ContainerUpdateType.INCREASE_RESOURCE, ContainerUpdateType.DECREASE_RESOURCE,
Resources.createResource(2 * GB), null)); Resources.createResource(2 * GB), null));
decreaseRequests.add(UpdateContainerRequest.newInstance(1, containerId3, decreaseRequests.add(UpdateContainerRequest.newInstance(1, containerId3,
ContainerUpdateType.INCREASE_RESOURCE, ContainerUpdateType.DECREASE_RESOURCE,
Resources.createResource(4 * GB), null)); Resources.createResource(4 * GB), null));
decreaseRequests.add(UpdateContainerRequest.newInstance(1, containerId4, decreaseRequests.add(UpdateContainerRequest.newInstance(1, containerId4,
ContainerUpdateType.INCREASE_RESOURCE, ContainerUpdateType.DECREASE_RESOURCE,
Resources.createResource(4 * GB), null)); Resources.createResource(4 * GB), null));
AllocateResponse response = AllocateResponse response =
am1.sendContainerResizingRequest(decreaseRequests); am1.sendContainerResizingRequest(decreaseRequests);
@ -416,6 +419,9 @@ public class TestIncreaseAllocationExpirer {
rm1, containerId4, Resources.createResource(6 * GB))); rm1, containerId4, Resources.createResource(6 * GB)));
// Wait for containerId3 token to expire, // Wait for containerId3 token to expire,
Thread.sleep(10000); Thread.sleep(10000);
am1.allocate(null, null);
Assert.assertEquals( Assert.assertEquals(
2 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) 2 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
.getAllocatedResource().getMemorySize()); .getAllocatedResource().getMemorySize());

View File

@ -1056,11 +1056,11 @@ public class TestLeafQueue {
qb.releaseResource(clusterResource, app_0, qb.releaseResource(clusterResource, app_0,
app_0.getAppSchedulingInfo().getPendingAsk(u0SchedKey) app_0.getAppSchedulingInfo().getPendingAsk(u0SchedKey)
.getPerAllocationResource(), .getPerAllocationResource(),
null, null, false); null, null);
qb.releaseResource(clusterResource, app_2, qb.releaseResource(clusterResource, app_2,
app_2.getAppSchedulingInfo().getPendingAsk(u1SchedKey) app_2.getAppSchedulingInfo().getPendingAsk(u1SchedKey)
.getPerAllocationResource(), .getPerAllocationResource(),
null, null, false); null, null);
qb.setUserLimit(50); qb.setUserLimit(50);
qb.setUserLimitFactor(1); qb.setUserLimitFactor(1);

View File

@ -171,11 +171,11 @@ public class TestParentQueue {
final Resource allocatedResource = Resources.createResource(allocation); final Resource allocatedResource = Resources.createResource(allocation);
if (queue instanceof ParentQueue) { if (queue instanceof ParentQueue) {
((ParentQueue)queue).allocateResource(clusterResource, ((ParentQueue)queue).allocateResource(clusterResource,
allocatedResource, RMNodeLabelsManager.NO_LABEL, false); allocatedResource, RMNodeLabelsManager.NO_LABEL);
} else { } else {
FiCaSchedulerApp app1 = getMockApplication(0, ""); FiCaSchedulerApp app1 = getMockApplication(0, "");
((LeafQueue)queue).allocateResource(clusterResource, app1, ((LeafQueue)queue).allocateResource(clusterResource, app1,
allocatedResource, null, null, false); allocatedResource, null, null);
} }
// Next call - nothing // Next call - nothing