YARN-7438. Additional changes to make SchedulingPlacementSet agnostic to ResourceRequest / placement algorithm. Contributed by Wangda Tan

This commit is contained in:
Sunil G 2017-12-05 22:50:07 +05:30
parent 3150c019ae
commit a957f1c60e
18 changed files with 227 additions and 155 deletions

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@ -86,8 +87,8 @@ public interface RMContainer extends EventHandler<RMContainerEvent>,
ContainerReport createContainerReport();
boolean isAMContainer();
List<ResourceRequest> getResourceRequests();
ContainerRequest getContainerRequest();
String getNodeHttpAddress();

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@ -178,7 +179,7 @@ public class RMContainerImpl implements RMContainer {
private long finishTime;
private ContainerStatus finishedStatus;
private boolean isAMContainer;
private List<ResourceRequest> resourceRequests;
private ContainerRequest containerRequestForRecovery;
// Only used for container resource increase and decrease. This is the
// resource to rollback to should container resource increase token expires.
@ -233,7 +234,6 @@ public class RMContainerImpl implements RMContainer {
this.eventHandler = rmContext.getDispatcher().getEventHandler();
this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer();
this.isAMContainer = false;
this.resourceRequests = null;
this.nodeLabelExpression = nodeLabelExpression;
this.lastConfirmedResource = container.getResource();
this.isExternallyAllocated = isExternallyAllocated;
@ -412,21 +412,21 @@ public class RMContainerImpl implements RMContainer {
readLock.unlock();
}
}
@Override
public List<ResourceRequest> getResourceRequests() {
public ContainerRequest getContainerRequest() {
try {
readLock.lock();
return resourceRequests;
return containerRequestForRecovery;
} finally {
readLock.unlock();
}
}
public void setResourceRequests(List<ResourceRequest> requests) {
public void setContainerRequest(ContainerRequest request) {
writeLock.lock();
try {
writeLock.lock();
this.resourceRequests = requests;
this.containerRequestForRecovery = request;
} finally {
writeLock.unlock();
}
@ -576,7 +576,7 @@ public class RMContainerImpl implements RMContainer {
public void transition(RMContainerImpl container, RMContainerEvent event) {
// Clear ResourceRequest stored in RMContainer, we don't need to remember
// this anymore.
container.setResourceRequests(null);
container.setContainerRequest(null);
// Register with containerAllocationExpirer.
container.containerAllocationExpirer.register(

View File

@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContai
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
@ -600,10 +601,10 @@ public abstract class AbstractYarnScheduler
* @param rmContainer rmContainer
*/
private void recoverResourceRequestForContainer(RMContainer rmContainer) {
List<ResourceRequest> requests = rmContainer.getResourceRequests();
ContainerRequest containerRequest = rmContainer.getContainerRequest();
// If container state is moved to ACQUIRED, request will be empty.
if (requests == null) {
if (containerRequest == null) {
return;
}
@ -618,7 +619,7 @@ public abstract class AbstractYarnScheduler
SchedulerApplicationAttempt schedulerAttempt =
getCurrentAttemptForContainer(rmContainer.getContainerId());
if (schedulerAttempt != null) {
schedulerAttempt.recoverResourceRequestsForContainer(requests);
schedulerAttempt.recoverResourceRequestsForContainer(containerRequest);
}
}

View File

@ -45,10 +45,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PendingAskUpdateResult;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@ -220,16 +221,14 @@ public class AppSchedulingInfo {
}
// Update AppPlacementAllocator
ResourceRequestUpdateResult pendingAmountChanges =
PendingAskUpdateResult pendingAmountChanges =
schedulerKeyToAppPlacementAllocator.get(schedulerRequestKey)
.updateResourceRequests(
entry.getValue().values(),
.updatePendingAsk(entry.getValue().values(),
recoverPreemptedRequestForAContainer);
if (null != pendingAmountChanges) {
updatePendingResources(
pendingAmountChanges.getLastAnyResourceRequest(),
pendingAmountChanges.getNewResourceRequest(), schedulerRequestKey,
pendingAmountChanges, schedulerRequestKey,
queue.getMetrics());
offswitchResourcesUpdated = true;
}
@ -237,12 +236,17 @@ public class AppSchedulingInfo {
return offswitchResourcesUpdated;
}
private void updatePendingResources(ResourceRequest lastRequest,
ResourceRequest request, SchedulerRequestKey schedulerKey,
QueueMetrics metrics) {
private void updatePendingResources(PendingAskUpdateResult updateResult,
SchedulerRequestKey schedulerKey, QueueMetrics metrics) {
PendingAsk lastPendingAsk = updateResult.getLastPendingAsk();
PendingAsk newPendingAsk = updateResult.getNewPendingAsk();
String lastNodePartition = updateResult.getLastNodePartition();
String newNodePartition = updateResult.getNewNodePartition();
int lastRequestContainers =
(lastRequest != null) ? lastRequest.getNumContainers() : 0;
if (request.getNumContainers() <= 0) {
(lastPendingAsk != null) ? lastPendingAsk.getCount() : 0;
if (newPendingAsk.getCount() <= 0) {
if (lastRequestContainers >= 0) {
schedulerKeys.remove(schedulerKey);
schedulerKeyToAppPlacementAllocator.remove(schedulerKey);
@ -258,31 +262,23 @@ public class AppSchedulingInfo {
}
}
Resource lastRequestCapability =
lastRequest != null ? lastRequest.getCapability() : Resources.none();
metrics.incrPendingResources(request.getNodeLabelExpression(), user,
request.getNumContainers(), request.getCapability());
if(lastRequest != null) {
metrics.decrPendingResources(lastRequest.getNodeLabelExpression(), user,
lastRequestContainers, lastRequestCapability);
if (lastPendingAsk != null) {
// Deduct resources from metrics / pending resources of queue/app.
metrics.decrPendingResources(lastNodePartition, user,
lastPendingAsk.getCount(), lastPendingAsk.getPerAllocationResource());
Resource decreasedResource = Resources.multiply(
lastPendingAsk.getPerAllocationResource(), lastRequestContainers);
queue.decPendingResource(lastNodePartition, decreasedResource);
appResourceUsage.decPending(lastNodePartition, decreasedResource);
}
// update queue:
Resource increasedResource =
Resources.multiply(request.getCapability(), request.getNumContainers());
queue.incPendingResource(request.getNodeLabelExpression(),
increasedResource);
appResourceUsage.incPending(request.getNodeLabelExpression(),
increasedResource);
if (lastRequest != null) {
Resource decreasedResource =
Resources.multiply(lastRequestCapability, lastRequestContainers);
queue.decPendingResource(lastRequest.getNodeLabelExpression(),
decreasedResource);
appResourceUsage.decPending(lastRequest.getNodeLabelExpression(),
decreasedResource);
}
// Increase resources to metrics / pending resources of queue/app.
metrics.incrPendingResources(newNodePartition, user,
newPendingAsk.getCount(), newPendingAsk.getPerAllocationResource());
Resource increasedResource = Resources.multiply(
newPendingAsk.getPerAllocationResource(), newPendingAsk.getCount());
queue.incPendingResource(newNodePartition, increasedResource);
appResourceUsage.incPending(newNodePartition, increasedResource);
}
public void addRequestedPartition(String partition) {
@ -417,7 +413,7 @@ public class AppSchedulingInfo {
}
}
public List<ResourceRequest> allocate(NodeType type,
public ContainerRequest allocate(NodeType type,
SchedulerNode node, SchedulerRequestKey schedulerKey,
Container containerAllocated) {
try {

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -155,16 +156,16 @@ public class ContainerUpdateContext {
AppPlacementAllocator<SchedulerNode> appPlacementAllocator =
appSchedulingInfo.getAppPlacementAllocator(schedulerKey);
if (appPlacementAllocator != null) {
Map<String, ResourceRequest> resourceRequests = appPlacementAllocator
.getResourceRequests();
ResourceRequest prevReq = resourceRequests.get(ResourceRequest.ANY);
PendingAsk pendingAsk = appPlacementAllocator.getPendingAsk(
ResourceRequest.ANY);
// Decrement the pending using a dummy RR with
// resource = prev update req capability
if (prevReq != null) {
if (pendingAsk != null && pendingAsk.getCount() > 0) {
appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode,
schedulerKey, Container.newInstance(UNDEFINED,
schedulerNode.getNodeID(), "host:port",
prevReq.getCapability(), schedulerKey.getPriority(), null));
pendingAsk.getPerAllocationResource(),
schedulerKey.getPriority(), null));
}
}
}

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpda
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
@ -449,11 +450,12 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
public void recoverResourceRequestsForContainer(
List<ResourceRequest> requests) {
ContainerRequest containerRequest) {
try {
writeLock.lock();
if (!isStopped) {
appSchedulingInfo.updateResourceRequests(requests, true);
appSchedulingInfo.updateResourceRequests(
containerRequest.getResourceRequests(), true);
}
} finally {
writeLock.unlock();
@ -913,7 +915,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
RMContainer c = tempIter.next();
// Mark container for release (set RRs to null, so RM does not think
// it is a recoverable container)
((RMContainerImpl) c).setResourceRequests(null);
((RMContainerImpl) c).setContainerRequest(null);
// Release this container async-ly so as to prevent
// 'LeafQueue::completedContainer()' from trying to acquire a lock
@ -1383,13 +1385,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
SchedulerRequestKey schedulerRequestKey) {
return appSchedulingInfo.getAppPlacementAllocator(schedulerRequestKey);
}
public Map<String, ResourceRequest> getResourceRequests(
SchedulerRequestKey schedulerRequestKey) {
return appSchedulingInfo.getAppPlacementAllocator(schedulerRequestKey)
.getResourceRequests();
}
public void incUnconfirmedRes(Resource res) {
unconfirmedAllocatedMem.addAndGet(res.getMemorySize());
unconfirmedAllocatedVcores.addAndGet(res.getVirtualCores());

View File

@ -299,7 +299,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
}
// If we have only ANY requests for this schedulerKey, we should not
// delay its scheduling.
if (application.getResourceRequests(schedulerKey).size() == 1) {
if (application.getAppPlacementAllocator(schedulerKey)
.getUniqueLocationAsks() == 1) {
return true;
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import java.util.List;
/**
* ContainerRequest is a class to capture resource requests associated with a
* Container, this will be used by scheduler to recover resource requests if the
* container preempted or cancelled before AM acquire the container.
*
* It should include deducted resource requests when the container allocated.
*
* Lifecycle of the ContainerRequest is:
*
* <pre>
* 1) It is instantiated when container created.
* 2) It will be set to ContainerImpl by scheduler.
* 3) When container preempted or cancelled because of whatever reason before
* container acquired by AM. ContainerRequest will be added back to pending
* request pool.
* 4) It will be cleared from ContainerImpl if the container already acquired by
* AM.
* </pre>
*/
public class ContainerRequest {
private List<ResourceRequest> requests;
public ContainerRequest(List<ResourceRequest> requests) {
this.requests = requests;
}
public List<ResourceRequest> getResourceRequests() {
return requests;
}
}

View File

@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Scheduli
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
@ -369,7 +370,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
public boolean accept(Resource cluster,
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
List<ResourceRequest> resourceRequests = null;
ContainerRequest containerRequest = null;
boolean reReservation = false;
try {
@ -397,8 +398,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
if (schedulerContainer.isAllocated()) {
// When allocate a new container
resourceRequests =
schedulerContainer.getRmContainer().getResourceRequests();
containerRequest =
schedulerContainer.getRmContainer().getContainerRequest();
// Check pending resource request
if (!appSchedulingInfo.checkAllocation(allocation.getAllocationLocalityType(),
@ -471,8 +472,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
// When rejected, recover resource requests for this app
if (!accepted && resourceRequests != null) {
recoverResourceRequestsForContainer(resourceRequests);
if (!accepted && containerRequest != null) {
recoverResourceRequestsForContainer(containerRequest);
}
return accepted;
@ -524,12 +525,12 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
liveContainers.put(containerId, rmContainer);
// Deduct pending resource requests
List<ResourceRequest> requests = appSchedulingInfo.allocate(
ContainerRequest containerRequest = appSchedulingInfo.allocate(
allocation.getAllocationLocalityType(),
schedulerContainer.getSchedulerNode(),
schedulerContainer.getSchedulerRequestKey(),
schedulerContainer.getRmContainer().getContainer());
((RMContainerImpl) rmContainer).setResourceRequests(requests);
((RMContainerImpl) rmContainer).setContainerRequest(containerRequest);
attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(),
allocation.getAllocatedOrReservedResource());

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -460,13 +461,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
ContainerRequest containerRequest = appSchedulingInfo.allocate(
type, node, schedulerKey, container);
this.attemptResourceUsage.incUsed(container.getResource());
getQueue().incUsedResource(container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
((RMContainerImpl) rmContainer).setContainerRequest(containerRequest);
// Inform the container
rmContainer.handle(

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -80,14 +81,14 @@ public class FifoAppAttempt extends FiCaSchedulerApp {
liveContainers.put(containerId, rmContainer);
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
ContainerRequest containerRequest = appSchedulingInfo.allocate(
type, node, schedulerKey, container);
attemptResourceUsage.incUsed(node.getPartition(),
container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
((RMContainerImpl) rmContainer).setContainerRequest(containerRequest);
// Inform the container
rmContainer.handle(

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@ -57,14 +58,14 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
Iterator<N> getPreferredNodeIterator(CandidateNodeSet<N> candidateNodeSet);
/**
* Replace existing ResourceRequest by the new requests
* Replace existing pending asks by the new requests
*
* @param requests new ResourceRequests
* @param requests new asks
* @param recoverPreemptedRequestForAContainer if we're recovering resource
* requests for preempted container
* @return true if total pending resource changed
*/
ResourceRequestUpdateResult updateResourceRequests(
PendingAskUpdateResult updatePendingAsk(
Collection<ResourceRequest> requests,
boolean recoverPreemptedRequestForAContainer);
@ -97,17 +98,13 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
* @param schedulerKey SchedulerRequestKey for this ResourceRequest
* @param type Type of the allocation
* @param node Which node this container allocated on
* @return list of ResourceRequests deducted
* @return ContainerRequest which include resource requests associated with
* the container. This will be used by scheduler to recover requests.
* Please refer to {@link ContainerRequest} for more details.
*/
List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey,
ContainerRequest allocate(SchedulerRequestKey schedulerKey,
NodeType type, SchedulerNode node);
/**
* Returns list of accepted resourceNames.
* @return Iterator of accepted resourceNames
*/
Iterator<String> getAcceptedResouceNames();
/**
* We can still have pending requirement for a given NodeType and node
* @param type Locality Type

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@ -44,7 +45,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* into account locality preferences (node, rack, any) when allocating
* containers.
*/
public class LocalityAppPlacementAllocator<N extends SchedulerNode>
public class LocalityAppPlacementAllocator <N extends SchedulerNode>
implements AppPlacementAllocator<N> {
private static final Log LOG =
LogFactory.getLog(LocalityAppPlacementAllocator.class);
@ -122,13 +123,13 @@ public class LocalityAppPlacementAllocator<N extends SchedulerNode>
}
@Override
public ResourceRequestUpdateResult updateResourceRequests(
public PendingAskUpdateResult updatePendingAsk(
Collection<ResourceRequest> requests,
boolean recoverPreemptedRequestForAContainer) {
try {
this.writeLock.lock();
ResourceRequestUpdateResult updateResult = null;
PendingAskUpdateResult updateResult = null;
// Update resource requests
for (ResourceRequest request : requests) {
@ -156,7 +157,16 @@ public class LocalityAppPlacementAllocator<N extends SchedulerNode>
//update the applications requested labels set
appSchedulingInfo.addRequestedPartition(partition);
updateResult = new ResourceRequestUpdateResult(lastRequest, request);
PendingAsk lastPendingAsk =
lastRequest == null ? null : new PendingAsk(
lastRequest.getCapability(), lastRequest.getNumContainers());
String lastRequestedNodePartition =
lastRequest == null ? null : lastRequest.getNodeLabelExpression();
updateResult = new PendingAskUpdateResult(lastPendingAsk,
new PendingAsk(request.getCapability(),
request.getNumContainers()), lastRequestedNodePartition,
request.getNodeLabelExpression());
}
}
return updateResult;
@ -380,7 +390,7 @@ public class LocalityAppPlacementAllocator<N extends SchedulerNode>
}
@Override
public List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey,
public ContainerRequest allocate(SchedulerRequestKey schedulerKey,
NodeType type, SchedulerNode node) {
try {
writeLock.lock();
@ -404,19 +414,9 @@ public class LocalityAppPlacementAllocator<N extends SchedulerNode>
allocateOffSwitch(schedulerKey, request, resourceRequests);
}
return resourceRequests;
return new ContainerRequest(resourceRequests);
} finally {
writeLock.unlock();
}
}
@Override
public Iterator<String> getAcceptedResouceNames() {
try {
readLock.lock();
return resourceRequestMap.keySet().iterator();
} finally {
readLock.unlock();
}
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
/**
* Result of a resource-request update. This will be used by
* {@link org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo}
* to update queue metrics and application/queue's overall pending resources.
* And this is per-scheduler-key.
*
* Following fields will be set if pending ask changed for a given scheduler key
* - lastPendingAsk: how many resource asked before.
* - newPendingAsk: how many resource asked now.
* - lastNodePartition: what's the node partition before.
* - newNodePartition: what's the node partition now.
*/
public class PendingAskUpdateResult {
private final PendingAsk lastPendingAsk;
private final String lastNodePartition;
private final PendingAsk newPendingAsk;
private final String newNodePartition;
public PendingAskUpdateResult(PendingAsk lastPendingAsk,
PendingAsk newPendingAsk, String lastNodePartition,
String newNodePartition) {
this.lastPendingAsk = lastPendingAsk;
this.newPendingAsk = newPendingAsk;
this.lastNodePartition = lastNodePartition;
this.newNodePartition = newNodePartition;
}
public PendingAsk getLastPendingAsk() {
return lastPendingAsk;
}
public PendingAsk getNewPendingAsk() {
return newPendingAsk;
}
public String getLastNodePartition() {
return lastNodePartition;
}
public String getNewNodePartition() {
return newNodePartition;
}
}

View File

@ -1,43 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
/**
* Result of ResourceRequest update
*/
public class ResourceRequestUpdateResult {
private final ResourceRequest lastAnyResourceRequest;
private final ResourceRequest newResourceRequest;
public ResourceRequestUpdateResult(ResourceRequest lastAnyResourceRequest,
ResourceRequest newResourceRequest) {
this.lastAnyResourceRequest = lastAnyResourceRequest;
this.newResourceRequest = newResourceRequest;
}
public ResourceRequest getLastAnyResourceRequest() {
return lastAnyResourceRequest;
}
public ResourceRequest getNewResourceRequest() {
return newResourceRequest;
}
}

View File

@ -431,7 +431,7 @@ public class Application {
if (type == NodeType.NODE_LOCAL) {
for (String host : task.getHosts()) {
if(LOG.isDebugEnabled()) {
LOG.debug("updateResourceRequests:" + " application=" + applicationId
LOG.debug("updatePendingAsk:" + " application=" + applicationId
+ " type=" + type + " host=" + host
+ " request=" + ((requests == null) ? "null" : requests.get(host)));
}
@ -442,7 +442,7 @@ public class Application {
if (type == NodeType.NODE_LOCAL || type == NodeType.RACK_LOCAL) {
for (String rack : task.getRacks()) {
if(LOG.isDebugEnabled()) {
LOG.debug("updateResourceRequests:" + " application=" + applicationId
LOG.debug("updatePendingAsk:" + " application=" + applicationId
+ " type=" + type + " rack=" + rack
+ " request=" + ((requests == null) ? "null" : requests.get(rack)));
}
@ -453,7 +453,7 @@ public class Application {
updateResourceRequest(requests.get(ResourceRequest.ANY));
if(LOG.isDebugEnabled()) {
LOG.debug("updateResourceRequests:" + " application=" + applicationId
LOG.debug("updatePendingAsk:" + " application=" + applicationId
+ " #asks=" + ask.size());
}
}

View File

@ -278,8 +278,8 @@ public class TestRMContainerImpl {
// Verify whether list of ResourceRequest is present in RMContainer
// while moving to ALLOCATED state
Assert.assertNotNull(scheduler.getRMContainer(containerId2)
.getResourceRequests());
Assert.assertNotNull(
scheduler.getRMContainer(containerId2).getContainerRequest());
// Allocate container
am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>())
@ -288,8 +288,8 @@ public class TestRMContainerImpl {
// After RMContainer moving to ACQUIRED state, list of ResourceRequest will
// be empty
Assert.assertNull(scheduler.getRMContainer(containerId2)
.getResourceRequests());
Assert.assertNull(
scheduler.getRMContainer(containerId2).getContainerRequest());
}
@Test (timeout = 180000)

View File

@ -1696,7 +1696,8 @@ public class TestCapacityScheduler {
rm1.waitForState(nm1, containerId1, RMContainerState.ALLOCATED);
RMContainer rmContainer = cs.getRMContainer(containerId1);
List<ResourceRequest> requests = rmContainer.getResourceRequests();
List<ResourceRequest> requests =
rmContainer.getContainerRequest().getResourceRequests();
FiCaSchedulerApp app = cs.getApplicationAttempt(am1
.getApplicationAttemptId());