YARN-5959. RM changes to support change of container ExecutionType. (Arun Suresh via wangda)

(cherry picked from commit 0a55bd841e)
This commit is contained in:
Wangda Tan 2017-01-05 10:31:05 -08:00 committed by Arun Suresh
parent 40bc9e7ddb
commit 650ff95e00
46 changed files with 1588 additions and 289 deletions

View File

@ -108,7 +108,6 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
@ -130,6 +129,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
@ -1705,8 +1705,7 @@ public synchronized Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<ContainerId> release, List<String> blacklistAdditions,
List<String> blacklistRemovals,
List<UpdateContainerRequest> increaseRequests,
List<UpdateContainerRequest> decreaseRequests) {
ContainerUpdates updateRequests) {
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
for (ResourceRequest req : ask) {
ResourceRequest reqCopy = ResourceRequest.newInstance(req
@ -1721,7 +1720,7 @@ public synchronized Allocation allocate(
lastBlacklistRemovals = blacklistRemovals;
Allocation allocation = super.allocate(
applicationAttemptId, askCopy, release, blacklistAdditions,
blacklistRemovals, increaseRequests, decreaseRequests);
blacklistRemovals, updateRequests);
if (forceResourceLimit != null) {
// Test wants to force the non-default resource limit
allocation.setResourceLimit(forceResourceLimit);
@ -1752,8 +1751,7 @@ public synchronized Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<ContainerId> release, List<String> blacklistAdditions,
List<String> blacklistRemovals,
List<UpdateContainerRequest> increaseRequest,
List<UpdateContainerRequest> decreaseRequests) {
ContainerUpdates updateRequests) {
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
for (ResourceRequest req : ask) {
ResourceRequest reqCopy = ResourceRequest.newInstance(req
@ -1764,7 +1762,7 @@ public synchronized Allocation allocate(
SecurityUtil.setTokenServiceUseIp(false);
Allocation normalAlloc = super.allocate(
applicationAttemptId, askCopy, release,
blacklistAdditions, blacklistRemovals, null, null);
blacklistAdditions, blacklistRemovals, updateRequests);
List<Container> containers = normalAlloc.getContainers();
if(containers.size() > 0) {
// allocate excess container

View File

@ -59,7 +59,6 @@
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@ -68,6 +67,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
@ -207,14 +207,13 @@ public void run() {
public Allocation allocate(ApplicationAttemptId attemptId,
List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
List<String> strings, List<String> strings2,
List<UpdateContainerRequest> increaseRequests,
List<UpdateContainerRequest> decreaseRequests) {
ContainerUpdates updateRequests) {
if (metricsON) {
final Timer.Context context = schedulerAllocateTimer.time();
Allocation allocation = null;
try {
allocation = scheduler.allocate(attemptId, resourceRequests,
containerIds, strings, strings2, null, null);
containerIds, strings, strings2, updateRequests);
return allocation;
} finally {
context.stop();
@ -228,7 +227,7 @@ public Allocation allocate(ApplicationAttemptId attemptId,
}
} else {
return scheduler.allocate(attemptId,
resourceRequests, containerIds, strings, strings2, null, null);
resourceRequests, containerIds, strings, strings2, updateRequests);
}
}

View File

@ -39,7 +39,12 @@ public enum ContainerUpdateType {
DECREASE_RESOURCE,
/**
* Execution Type change.
* Execution Type promotion.
*/
UPDATE_EXECUTION_TYPE
PROMOTE_EXECUTION_TYPE,
/**
* Execution Type demotion.
*/
DEMOTE_EXECUTION_TYPE
}

View File

@ -86,6 +86,12 @@ public int hashCode() {
return result;
}
@Override
public String toString() {
return "UpdateContainerError{reason=" + getReason() + ", "
+ "req=" + getUpdateContainerRequest() + "}";
}
@Override
public boolean equals(Object obj) {
if (this == obj) {

View File

@ -166,11 +166,13 @@ public int hashCode() {
ContainerId cId = getContainerId();
ExecutionType execType = getExecutionType();
Resource capability = getCapability();
ContainerUpdateType updateType = getContainerUpdateType();
result =
prime * result + ((capability == null) ? 0 : capability.hashCode());
result = prime * result + ((cId == null) ? 0 : cId.hashCode());
result = prime * result + getContainerVersion();
result = prime * result + ((execType == null) ? 0 : execType.hashCode());
result = prime * result + ((updateType== null) ? 0 : updateType.hashCode());
return result;
}
@ -224,6 +226,14 @@ public boolean equals(Object obj) {
} else if (!execType.equals(other.getExecutionType())) {
return false;
}
ContainerUpdateType updateType = getContainerUpdateType();
if (updateType == null) {
if (other.getContainerUpdateType() != null) {
return false;
}
} else if (!updateType.equals(other.getContainerUpdateType())) {
return false;
}
return true;
}
}

View File

@ -63,7 +63,8 @@ message FinishApplicationMasterResponseProto {
enum ContainerUpdateTypeProto {
INCREASE_RESOURCE = 0;
DECREASE_RESOURCE = 1;
UPDATE_EXECUTION_TYPE = 2;
PROMOTE_EXECUTION_TYPE = 2;
DEMOTE_EXECUTION_TYPE = 3;
}
message UpdateContainerRequestProto {

View File

@ -66,6 +66,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
@ -574,8 +575,7 @@ public synchronized Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<ContainerId> release, List<String> blacklistAdditions,
List<String> blacklistRemovals,
List<UpdateContainerRequest> increaseRequests,
List<UpdateContainerRequest> decreaseRequests) {
ContainerUpdates updateRequests) {
List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
for (ResourceRequest req : ask) {
ResourceRequest reqCopy =
@ -586,13 +586,12 @@ public synchronized Allocation allocate(
}
lastAsk = ask;
lastRelease = release;
lastIncrease = increaseRequests;
lastDecrease = decreaseRequests;
lastIncrease = updateRequests.getIncreaseRequests();
lastDecrease = updateRequests.getDecreaseRequests();
lastBlacklistAdditions = blacklistAdditions;
lastBlacklistRemovals = blacklistRemovals;
return super.allocate(applicationAttemptId, askCopy, release,
blacklistAdditions, blacklistRemovals, increaseRequests,
decreaseRequests);
blacklistAdditions, blacklistRemovals, updateRequests);
}
}

View File

@ -321,13 +321,21 @@ private Container buildContainer(long rmIdentifier,
// before accepting an ask)
Resource capability = normalizeCapability(appParams, rr);
return createContainer(
rmIdentifier, appParams.getContainerTokenExpiryInterval(),
SchedulerRequestKey.create(rr), userName, node, cId, capability);
}
private Container createContainer(long rmIdentifier, long tokenExpiry,
SchedulerRequestKey schedulerKey, String userName, RemoteNode node,
ContainerId cId, Resource capability) {
long currTime = System.currentTimeMillis();
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(
cId, 0, node.getNodeId().toString(), userName,
capability, currTime + appParams.containerTokenExpiryInterval,
capability, currTime + tokenExpiry,
tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier,
rr.getPriority(), currTime,
schedulerKey.getPriority(), currTime,
null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
ExecutionType.OPPORTUNISTIC);
byte[] pwd =
@ -336,9 +344,9 @@ private Container buildContainer(long rmIdentifier,
containerTokenIdentifier);
Container container = BuilderUtils.newContainer(
cId, node.getNodeId(), node.getHttpAddress(),
capability, rr.getPriority(), containerToken,
capability, schedulerKey.getPriority(), containerToken,
containerTokenIdentifier.getExecutionType(),
rr.getAllocationRequestId());
schedulerKey.getAllocationRequestId());
return container;
}

View File

@ -150,8 +150,9 @@ public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
resourceRequest.getNumContainers() + request.getNumContainers());
}
if (ResourceRequest.isAnyLocation(request.getResourceName())) {
LOG.info("# of outstandingOpReqs in ANY (at" +
"priority = "+ schedulerKey.getPriority()
LOG.info("# of outstandingOpReqs in ANY (at "
+ "priority = " + schedulerKey.getPriority()
+ ", allocationReqId = " + schedulerKey.getAllocationRequestId()
+ ", with capability = " + request.getCapability() + " ) : "
+ resourceRequest.getNumContainers());
}
@ -167,7 +168,8 @@ public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
public void matchAllocationToOutstandingRequest(Resource capability,
List<Container> allocatedContainers) {
for (Container c : allocatedContainers) {
SchedulerRequestKey schedulerKey = SchedulerRequestKey.extractFrom(c);
SchedulerRequestKey schedulerKey =
SchedulerRequestKey.extractFrom(c);
Map<Resource, ResourceRequest> asks =
outstandingOpReqs.get(schedulerKey);

View File

@ -19,8 +19,10 @@
package org.apache.hadoop.yarn.server.scheduler;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
/**
* Composite key for outstanding scheduler requests for any schedulable entity.
@ -31,6 +33,7 @@ public final class SchedulerRequestKey implements
private final Priority priority;
private final long allocationRequestId;
private final ContainerId containerToUpdate;
/**
* Factory method to generate a SchedulerRequestKey from a ResourceRequest.
@ -39,7 +42,13 @@ public final class SchedulerRequestKey implements
*/
public static SchedulerRequestKey create(ResourceRequest req) {
return new SchedulerRequestKey(req.getPriority(),
req.getAllocationRequestId());
req.getAllocationRequestId(), null);
}
public static SchedulerRequestKey create(UpdateContainerRequest req,
SchedulerRequestKey schedulerRequestKey) {
return new SchedulerRequestKey(schedulerRequestKey.getPriority(),
schedulerRequestKey.getAllocationRequestId(), req.getContainerId());
}
/**
@ -50,12 +59,16 @@ public static SchedulerRequestKey create(ResourceRequest req) {
*/
public static SchedulerRequestKey extractFrom(Container container) {
return new SchedulerRequestKey(container.getPriority(),
container.getAllocationRequestId());
container.getAllocationRequestId(), null);
}
SchedulerRequestKey(Priority priority, long allocationRequestId) {
public SchedulerRequestKey(Priority priority, long allocationRequestId,
ContainerId containerToUpdate) {
this.priority = priority;
this.allocationRequestId = allocationRequestId;
this.containerToUpdate = containerToUpdate;
}
/**
@ -76,6 +89,10 @@ public long getAllocationRequestId() {
return allocationRequestId;
}
public ContainerId getContainerToUpdate() {
return containerToUpdate;
}
@Override
public int compareTo(SchedulerRequestKey o) {
if (o == null) {
@ -85,6 +102,15 @@ public int compareTo(SchedulerRequestKey o) {
return 1;
}
}
// Ensure updates are ranked higher
if (this.containerToUpdate == null && o.containerToUpdate != null) {
return -1;
}
if (this.containerToUpdate != null && o.containerToUpdate == null) {
return 1;
}
int priorityCompare = o.getPriority().compareTo(priority);
// we first sort by priority and then by allocationRequestId
if (priorityCompare != 0) {
@ -107,16 +133,21 @@ public boolean equals(Object o) {
if (getAllocationRequestId() != that.getAllocationRequestId()) {
return false;
}
return getPriority() != null ?
getPriority().equals(that.getPriority()) :
that.getPriority() == null;
if (!getPriority().equals(that.getPriority())) {
return false;
}
return containerToUpdate != null ?
containerToUpdate.equals(that.containerToUpdate) :
that.containerToUpdate == null;
}
@Override
public int hashCode() {
int result = getPriority() != null ? getPriority().hashCode() : 0;
result = 31 * result + (int) (getAllocationRequestId() ^ (
getAllocationRequestId() >>> 32));
int result = priority != null ? priority.hashCode() : 0;
result = 31 * result + (int) (allocationRequestId ^ (allocationRequestId
>>> 32));
result = 31 * result + (containerToUpdate != null ? containerToUpdate
.hashCode() : 0);
return result;
}
@ -125,6 +156,7 @@ public String toString() {
return "SchedulerRequestKey{" +
"priority=" + priority +
", allocationRequestId=" + allocationRequestId +
", containerToUpdate=" + containerToUpdate +
'}';
}
}

View File

@ -68,7 +68,6 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
@ -93,7 +92,12 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security
@ -554,12 +558,10 @@ protected void allocateInternal(ApplicationAttemptId appAttemptId,
// Split Update Resource Requests into increase and decrease.
// No Exceptions are thrown here. All update errors are aggregated
// and returned to the AM.
List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>();
List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>();
List<UpdateContainerError> updateContainerErrors =
List<UpdateContainerError> updateErrors = new ArrayList<>();
ContainerUpdates containerUpdateRequests =
RMServerUtils.validateAndSplitUpdateResourceRequests(
rmContext, request, maximumCapacity,
increaseResourceReqs, decreaseResourceReqs);
rmContext, request, maximumCapacity, updateErrors);
// Send new requests to appAttempt.
Allocation allocation;
@ -575,7 +577,7 @@ protected void allocateInternal(ApplicationAttemptId appAttemptId,
allocation =
this.rScheduler.allocate(appAttemptId, ask, release,
blacklistAdditions, blacklistRemovals,
increaseResourceReqs, decreaseResourceReqs);
containerUpdateRequests);
}
if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
@ -591,7 +593,7 @@ protected void allocateInternal(ApplicationAttemptId appAttemptId,
}
// Notify the AM of container update errors
addToUpdateContainerErrors(allocateResponse, updateContainerErrors);
addToUpdateContainerErrors(allocateResponse, updateErrors);
// update the response with the deltas of node status changes
List<RMNode> updatedNodes = new ArrayList<RMNode>();
@ -625,15 +627,7 @@ protected void allocateInternal(ApplicationAttemptId appAttemptId,
.pullJustFinishedContainers());
allocateResponse.setAvailableResources(allocation.getResourceLimit());
// Handling increased containers
addToUpdatedContainers(
allocateResponse, ContainerUpdateType.INCREASE_RESOURCE,
allocation.getIncreasedContainers());
// Handling decreased containers
addToUpdatedContainers(
allocateResponse, ContainerUpdateType.DECREASE_RESOURCE,
allocation.getDecreasedContainers());
addToContainerUpdates(appAttemptId, allocateResponse, allocation);
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
@ -646,6 +640,37 @@ protected void allocateInternal(ApplicationAttemptId appAttemptId,
.getApplicationPriority());
}
private void addToContainerUpdates(ApplicationAttemptId appAttemptId,
AllocateResponse allocateResponse, Allocation allocation) {
// Handling increased containers
addToUpdatedContainers(
allocateResponse, ContainerUpdateType.INCREASE_RESOURCE,
allocation.getIncreasedContainers());
// Handling decreased containers
addToUpdatedContainers(
allocateResponse, ContainerUpdateType.DECREASE_RESOURCE,
allocation.getDecreasedContainers());
// Handling promoted containers
addToUpdatedContainers(
allocateResponse, ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
allocation.getPromotedContainers());
// Handling demoted containers
addToUpdatedContainers(
allocateResponse, ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
allocation.getDemotedContainers());
SchedulerApplicationAttempt applicationAttempt = ((AbstractYarnScheduler)
rScheduler).getApplicationAttempt(appAttemptId);
if (applicationAttempt != null) {
addToUpdateContainerErrors(allocateResponse,
((AbstractYarnScheduler)rScheduler)
.getApplicationAttempt(appAttemptId).pullUpdateContainerErrors());
}
}
protected void addToUpdateContainerErrors(AllocateResponse allocateResponse,
List<UpdateContainerError> updateContainerErrors) {
if (!updateContainerErrors.isEmpty()) {

View File

@ -32,7 +32,6 @@
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
@ -48,15 +47,14 @@
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
@ -69,9 +67,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import java.io.IOException;
@ -251,6 +249,7 @@ protected void allocateInternal(ApplicationAttemptId appAttemptId,
// Allocate GUARANTEED containers.
request.setAskList(partitionedAsks.getGuaranteed());
super.allocateInternal(appAttemptId, request, allocateResponse);
}
@ -298,15 +297,9 @@ private void handleNewContainers(List<Container> allocContainers,
boolean isRemotelyAllocated) {
for (Container container : allocContainers) {
// Create RMContainer
SchedulerApplicationAttempt appAttempt =
((AbstractYarnScheduler) rmContext.getScheduler())
.getCurrentAttemptForContainer(container.getId());
RMContainer rmContainer = new RMContainerImpl(container,
appAttempt.getApplicationAttemptId(), container.getNodeId(),
appAttempt.getUser(), rmContext, isRemotelyAllocated);
appAttempt.addRMContainer(container.getId(), rmContainer);
((AbstractYarnScheduler) rmContext.getScheduler()).getNode(
container.getNodeId()).allocateContainer(rmContainer);
RMContainer rmContainer =
SchedulerUtils.createOpportunisticRmContainer(
rmContext, container, isRemotelyAllocated);
rmContainer.handle(
new RMContainerEvent(container.getId(),
RMContainerEventType.ACQUIRED));

View File

@ -39,6 +39,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
@ -64,6 +66,7 @@
.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
@ -81,7 +84,7 @@
*/
public class RMServerUtils {
private static final String UPDATE_OUTSTANDING_ERROR =
public static final String UPDATE_OUTSTANDING_ERROR =
"UPDATE_OUTSTANDING_ERROR";
private static final String INCORRECT_CONTAINER_VERSION_ERROR =
"INCORRECT_CONTAINER_VERSION_ERROR";
@ -125,74 +128,105 @@ public static List<RMNode> queryRMNodes(RMContext context,
/**
* Check if we have:
* - Request for same containerId and different target resource
* - If targetResources violates maximum/minimumAllocation
* @param rmContext RM context
* @param request Allocate Request
* @param maximumAllocation Maximum Allocation
* @param increaseResourceReqs Increase Resource Request
* @param decreaseResourceReqs Decrease Resource Request
* @return List of container Errors
* - Request for same containerId and different target resource.
* - If targetResources violates maximum/minimumAllocation.
* @param rmContext RM context.
* @param request Allocate Request.
* @param maximumAllocation Maximum Allocation.
* @param updateErrors Container update errors.
* @return ContainerUpdateRequests.
*/
public static List<UpdateContainerError>
public static ContainerUpdates
validateAndSplitUpdateResourceRequests(RMContext rmContext,
AllocateRequest request, Resource maximumAllocation,
List<UpdateContainerRequest> increaseResourceReqs,
List<UpdateContainerRequest> decreaseResourceReqs) {
List<UpdateContainerError> errors = new ArrayList<>();
List<UpdateContainerError> updateErrors) {
ContainerUpdates updateRequests =
new ContainerUpdates();
Set<ContainerId> outstandingUpdate = new HashSet<>();
for (UpdateContainerRequest updateReq : request.getUpdateRequests()) {
RMContainer rmContainer = rmContext.getScheduler().getRMContainer(
updateReq.getContainerId());
String msg = null;
if (rmContainer == null) {
msg = INVALID_CONTAINER_ID;
}
// Only allow updates if the requested version matches the current
// version
if (msg == null && updateReq.getContainerVersion() !=
rmContainer.getContainer().getVersion()) {
msg = INCORRECT_CONTAINER_VERSION_ERROR + "|"
+ updateReq.getContainerVersion() + "|"
+ rmContainer.getContainer().getVersion();
}
// No more than 1 container update per request.
if (msg == null &&
outstandingUpdate.contains(updateReq.getContainerId())) {
msg = UPDATE_OUTSTANDING_ERROR;
}
String msg = validateContainerIdAndVersion(outstandingUpdate,
updateReq, rmContainer);
ContainerUpdateType updateType = updateReq.getContainerUpdateType();
if (msg == null) {
Resource original = rmContainer.getContainer().getResource();
Resource target = updateReq.getCapability();
if (Resources.fitsIn(target, original)) {
// This is a decrease request
if (validateIncreaseDecreaseRequest(rmContext, updateReq,
maximumAllocation, false)) {
decreaseResourceReqs.add(updateReq);
outstandingUpdate.add(updateReq.getContainerId());
if ((updateType != ContainerUpdateType.PROMOTE_EXECUTION_TYPE) &&
(updateType !=ContainerUpdateType.DEMOTE_EXECUTION_TYPE)) {
Resource original = rmContainer.getContainer().getResource();
Resource target = updateReq.getCapability();
if (Resources.fitsIn(target, original)) {
// This is a decrease request
if (validateIncreaseDecreaseRequest(rmContext, updateReq,
maximumAllocation, false)) {
updateRequests.getDecreaseRequests().add(updateReq);
outstandingUpdate.add(updateReq.getContainerId());
} else {
msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
}
} else {
msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
// This is an increase request
if (validateIncreaseDecreaseRequest(rmContext, updateReq,
maximumAllocation, true)) {
updateRequests.getIncreaseRequests().add(updateReq);
outstandingUpdate.add(updateReq.getContainerId());
} else {
msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
}
}
} else {
// This is an increase request
if (validateIncreaseDecreaseRequest(rmContext, updateReq,
maximumAllocation, true)) {
increaseResourceReqs.add(updateReq);
outstandingUpdate.add(updateReq.getContainerId());
} else {
msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
ExecutionType original = rmContainer.getExecutionType();
ExecutionType target = updateReq.getExecutionType();
if (target != original) {
if (target == ExecutionType.GUARANTEED &&
original == ExecutionType.OPPORTUNISTIC) {
updateRequests.getPromotionRequests().add(updateReq);
outstandingUpdate.add(updateReq.getContainerId());
} else if (target == ExecutionType.OPPORTUNISTIC &&
original == ExecutionType.GUARANTEED) {
updateRequests.getDemotionRequests().add(updateReq);
outstandingUpdate.add(updateReq.getContainerId());
}
}
}
}
if (msg != null) {
UpdateContainerError updateError = RECORD_FACTORY
.newRecordInstance(UpdateContainerError.class);
updateError.setReason(msg);
updateError.setUpdateContainerRequest(updateReq);
errors.add(updateError);
}
checkAndcreateUpdateError(updateErrors, updateReq, msg);
}
return errors;
return updateRequests;
}
private static void checkAndcreateUpdateError(
List<UpdateContainerError> errors, UpdateContainerRequest updateReq,
String msg) {
if (msg != null) {
UpdateContainerError updateError = RECORD_FACTORY
.newRecordInstance(UpdateContainerError.class);
updateError.setReason(msg);
updateError.setUpdateContainerRequest(updateReq);
errors.add(updateError);
}
}
private static String validateContainerIdAndVersion(
Set<ContainerId> outstandingUpdate, UpdateContainerRequest updateReq,
RMContainer rmContainer) {
String msg = null;
if (rmContainer == null) {
msg = INVALID_CONTAINER_ID;
}
// Only allow updates if the requested version matches the current
// version
if (msg == null && updateReq.getContainerVersion() !=
rmContainer.getContainer().getVersion()) {
msg = INCORRECT_CONTAINER_VERSION_ERROR + "|"
+ updateReq.getContainerVersion() + "|"
+ rmContainer.getContainer().getVersion();
}
// No more than 1 container update per request.
if (msg == null &&
outstandingUpdate.contains(updateReq.getContainerId())) {
msg = UPDATE_OUTSTANDING_ERROR;
}
return msg;
}
/**

View File

@ -96,6 +96,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@ -1123,7 +1124,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
Collections.singletonList(appAttempt.amReq),
EMPTY_CONTAINER_RELEASE_LIST,
amBlacklist.getBlacklistAdditions(),
amBlacklist.getBlacklistRemovals(), null, null);
amBlacklist.getBlacklistRemovals(),
new ContainerUpdates());
if (amContainerAllocation != null
&& amContainerAllocation.getContainers() != null) {
assert (amContainerAllocation.getContainers().size() == 0);
@ -1147,7 +1149,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,
null, null, null);
null, new ContainerUpdates());
// There must be at least one container allocated, because a
// CONTAINER_ALLOCATED is emitted after an RMContainer is constructed,
// and is put in SchedulerApplication#newlyAllocatedContainers.

View File

@ -108,6 +108,8 @@ RMContainerEventType.KILL, new FinishedTransition())
// Transitions from ACQUIRED state
.addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING,
RMContainerEventType.LAUNCHED)
.addTransition(RMContainerState.ACQUIRED, RMContainerState.ACQUIRED,
RMContainerEventType.ACQUIRED)
.addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED,
RMContainerEventType.FINISHED, new FinishedTransition())
.addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED,
@ -124,6 +126,8 @@ RMContainerEventType.FINISHED, new FinishedTransition())
RMContainerEventType.KILL, new KillTransition())
.addTransition(RMContainerState.RUNNING, RMContainerState.RELEASED,
RMContainerEventType.RELEASED, new KillTransition())
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
RMContainerEventType.ACQUIRED)
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
RMContainerEventType.RESERVED, new ContainerReservedTransition())
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
@ -163,13 +167,13 @@ RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition())
private final WriteLock writeLock;
private final ApplicationAttemptId appAttemptId;
private final NodeId nodeId;
private final Container container;
private final RMContext rmContext;
private final EventHandler eventHandler;
private final ContainerAllocationExpirer containerAllocationExpirer;
private final String user;
private final String nodeLabelExpression;
private volatile Container container;
private Resource reservedResource;
private NodeId reservedNode;
private SchedulerRequestKey reservedSchedulerKey;
@ -188,44 +192,44 @@ RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition())
private boolean isExternallyAllocated;
private SchedulerRequestKey allocatedSchedulerKey;
public RMContainerImpl(Container container,
public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext) {
this(container, appAttemptId, nodeId, user, rmContext, System
this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, System
.currentTimeMillis(), "");
}
public RMContainerImpl(Container container,
public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext, boolean isExternallyAllocated) {
this(container, appAttemptId, nodeId, user, rmContext, System
this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, System
.currentTimeMillis(), "", isExternallyAllocated);
}
private boolean saveNonAMContainerMetaInfo;
public RMContainerImpl(Container container,
public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext, String nodeLabelExpression) {
this(container, appAttemptId, nodeId, user, rmContext, System
this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, System
.currentTimeMillis(), nodeLabelExpression);
}
public RMContainerImpl(Container container,
public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext, long creationTime, String nodeLabelExpression) {
this(container, appAttemptId, nodeId, user, rmContext, creationTime,
nodeLabelExpression, false);
this(container, schedulerKey, appAttemptId, nodeId, user, rmContext,
creationTime, nodeLabelExpression, false);
}
public RMContainerImpl(Container container,
public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext, long creationTime, String nodeLabelExpression,
boolean isExternallyAllocated) {
this.stateMachine = stateMachineFactory.make(this);
this.nodeId = nodeId;
this.container = container;
this.allocatedSchedulerKey = SchedulerRequestKey.extractFrom(container);
this.allocatedSchedulerKey = schedulerKey;
this.appAttemptId = appAttemptId;
this.user = user;
this.creationTime = creationTime;
@ -276,6 +280,10 @@ public Container getContainer() {
return this.container;
}
public void setContainer(Container container) {
this.container = container;
}
@Override
public RMContainerState getState() {
this.readLock.lock();

View File

@ -51,6 +51,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
@ -62,6 +63,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -81,6 +83,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
@ -509,9 +516,11 @@ private RMContainer recoverAndCreateContainer(NMContainerStatus status,
ApplicationAttemptId attemptId =
container.getId().getApplicationAttemptId();
RMContainer rmContainer =
new RMContainerImpl(container, attemptId, node.getNodeID(),
applications.get(attemptId.getApplicationId()).getUser(), rmContext,
status.getCreationTime(), status.getNodeLabelExpression());
new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container), attemptId,
node.getNodeID(), applications.get(
attemptId.getApplicationId()).getUser(), rmContext,
status.getCreationTime(), status.getNodeLabelExpression());
return rmContainer;
}
@ -1064,4 +1073,93 @@ protected void normalizeRequests(List<ResourceRequest> asks) {
ask.setCapability(getNormalizedResource(ask.getCapability()));
}
}
protected void handleExecutionTypeUpdates(
SchedulerApplicationAttempt appAttempt,
List<UpdateContainerRequest> promotionRequests,
List<UpdateContainerRequest> demotionRequests) {
if (promotionRequests != null && !promotionRequests.isEmpty()) {
LOG.info("Promotion Update requests : " + promotionRequests);
handlePromotionRequests(appAttempt, promotionRequests);
}
if (demotionRequests != null && !demotionRequests.isEmpty()) {
LOG.info("Demotion Update requests : " + demotionRequests);
handleDemotionRequests(appAttempt, demotionRequests);
}
}
private void handlePromotionRequests(
SchedulerApplicationAttempt applicationAttempt,
List<UpdateContainerRequest> updateContainerRequests) {
for (UpdateContainerRequest uReq : updateContainerRequests) {
RMContainer rmContainer =
rmContext.getScheduler().getRMContainer(uReq.getContainerId());
// Check if this is a container update
// And not in the middle of a Demotion
if (rmContainer != null) {
// Check if this is an executionType change request
// If so, fix the rr to make it look like a normal rr
// with relaxLocality=false and numContainers=1
SchedulerNode schedulerNode = rmContext.getScheduler()
.getSchedulerNode(rmContainer.getContainer().getNodeId());
// Add only if no outstanding promote requests exist.
if (!applicationAttempt.getUpdateContext()
.checkAndAddToOutstandingIncreases(
rmContainer, schedulerNode, uReq)) {
applicationAttempt.addToUpdateContainerErrors(
UpdateContainerError.newInstance(
RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq));
}
} else {
LOG.warn("Cannot promote non-existent (or completed) Container ["
+ uReq.getContainerId() + "]");
}
}
}
private void handleDemotionRequests(SchedulerApplicationAttempt appAttempt,
List<UpdateContainerRequest> demotionRequests) {
OpportunisticContainerContext oppCntxt =
appAttempt.getOpportunisticContainerContext();
for (UpdateContainerRequest uReq : demotionRequests) {
RMContainer rmContainer =
rmContext.getScheduler().getRMContainer(uReq.getContainerId());
if (rmContainer != null) {
if (appAttempt.getUpdateContext().checkAndAddToOutstandingDecreases(
rmContainer.getContainer())) {
RMContainer demotedRMContainer =
createDemotedRMContainer(appAttempt, oppCntxt, rmContainer);
appAttempt.addToNewlyDemotedContainers(
uReq.getContainerId(), demotedRMContainer);
} else {
appAttempt.addToUpdateContainerErrors(
UpdateContainerError.newInstance(
RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq));
}
} else {
LOG.warn("Cannot demote non-existent (or completed) Container ["
+ uReq.getContainerId() + "]");
}
}
}
private RMContainer createDemotedRMContainer(
SchedulerApplicationAttempt appAttempt,
OpportunisticContainerContext oppCntxt,
RMContainer rmContainer) {
SchedulerRequestKey sk =
SchedulerRequestKey.extractFrom(rmContainer.getContainer());
Container demotedContainer = BuilderUtils.newContainer(
ContainerId.newContainerId(appAttempt.getApplicationAttemptId(),
oppCntxt.getContainerIdGenerator().generateContainerId()),
rmContainer.getContainer().getNodeId(),
rmContainer.getContainer().getNodeHttpAddress(),
rmContainer.getContainer().getResource(),
sk.getPriority(), null, ExecutionType.OPPORTUNISTIC,
sk.getAllocationRequestId());
demotedContainer.setVersion(rmContainer.getContainer().getVersion());
return SchedulerUtils.createOpportunisticRmContainer(
rmContext, demotedContainer, false);
}
}

View File

@ -37,6 +37,8 @@ public class Allocation {
final List<NMToken> nmTokens;
final List<Container> increasedContainers;
final List<Container> decreasedContainers;
final List<Container> promotedContainers;
final List<Container> demotedContainers;
private Resource resourceLimit;
@ -51,13 +53,23 @@ public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
List<ResourceRequest> fungibleResources, List<NMToken> nmTokens) {
this(containers, resourceLimit,strictContainers, fungibleContainers,
fungibleResources, nmTokens, null, null);
fungibleResources, nmTokens, null, null, null, null);
}
public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
List<ResourceRequest> fungibleResources, List<NMToken> nmTokens,
List<Container> increasedContainers, List<Container> decreasedContainer) {
this(containers, resourceLimit,strictContainers, fungibleContainers,
fungibleResources, nmTokens, increasedContainers, decreasedContainer,
null, null);
}
public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
List<ResourceRequest> fungibleResources, List<NMToken> nmTokens,
List<Container> increasedContainers, List<Container> decreasedContainer,
List<Container> promotedContainers, List<Container> demotedContainer) {
this.containers = containers;
this.resourceLimit = resourceLimit;
this.strictContainers = strictContainers;
@ -66,6 +78,8 @@ public Allocation(List<Container> containers, Resource resourceLimit,
this.nmTokens = nmTokens;
this.increasedContainers = increasedContainers;
this.decreasedContainers = decreasedContainer;
this.promotedContainers = promotedContainers;
this.demotedContainers = demotedContainer;
}
public List<Container> getContainers() {
@ -100,6 +114,14 @@ public List<Container> getDecreasedContainers() {
return decreasedContainers;
}
public List<Container> getPromotedContainers() {
return promotedContainers;
}
public List<Container> getDemotedContainers() {
return demotedContainers;
}
@VisibleForTesting
public void setResourceLimit(Resource resource) {
this.resourceLimit = resource;

View File

@ -55,7 +55,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* This class keeps track of all the consumption of an application. This also
* keeps track of current running/completed containers for the application.
@ -92,10 +91,11 @@ public class AppSchedulingInfo {
final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId,
SchedContainerChangeRequest>>> containerIncreaseRequestMap =
new ConcurrentHashMap<>();
private final ReentrantReadWriteLock.ReadLock readLock;
private final ReentrantReadWriteLock.WriteLock writeLock;
public final ContainerUpdateContext updateContext;
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
long epoch, ResourceUsage appResourceUsage) {
@ -109,6 +109,7 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
this.appResourceUsage = appResourceUsage;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
updateContext = new ContainerUpdateContext(this);
readLock = lock.readLock();
writeLock = lock.writeLock();
}
@ -376,6 +377,10 @@ public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
}
}
public ContainerUpdateContext getUpdateContext() {
return updateContext;
}
/**
* The ApplicationMaster is updating resource requirements for the
* application, by asking for more resources and releasing resources acquired
@ -414,29 +419,9 @@ public boolean updateResourceRequests(List<ResourceRequest> requests,
}
// Update scheduling placement set
for (Map.Entry<SchedulerRequestKey, Map<String, ResourceRequest>> entry : dedupRequests.entrySet()) {
SchedulerRequestKey schedulerRequestKey = entry.getKey();
if (!schedulerKeyToPlacementSets.containsKey(schedulerRequestKey)) {
schedulerKeyToPlacementSets.put(schedulerRequestKey,
new LocalitySchedulingPlacementSet<>(this));
}
// Update placement set
ResourceRequestUpdateResult pendingAmountChanges =
schedulerKeyToPlacementSets.get(schedulerRequestKey)
.updateResourceRequests(
entry.getValue().values(),
recoverPreemptedRequestForAContainer);
if (null != pendingAmountChanges) {
updatePendingResources(
pendingAmountChanges.getLastAnyResourceRequest(),
pendingAmountChanges.getNewResourceRequest(), schedulerRequestKey,
queue.getMetrics());
offswitchResourcesUpdated = true;
}
}
offswitchResourcesUpdated =
addToPlacementSets(
recoverPreemptedRequestForAContainer, dedupRequests);
return offswitchResourcesUpdated;
} finally {
@ -444,6 +429,37 @@ public boolean updateResourceRequests(List<ResourceRequest> requests,
}
}
boolean addToPlacementSets(
boolean recoverPreemptedRequestForAContainer,
Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests) {
boolean offswitchResourcesUpdated = false;
for (Map.Entry<SchedulerRequestKey, Map<String, ResourceRequest>> entry :
dedupRequests.entrySet()) {
SchedulerRequestKey schedulerRequestKey = entry.getKey();
if (!schedulerKeyToPlacementSets.containsKey(schedulerRequestKey)) {
schedulerKeyToPlacementSets.put(schedulerRequestKey,
new LocalitySchedulingPlacementSet<>(this));
}
// Update placement set
ResourceRequestUpdateResult pendingAmountChanges =
schedulerKeyToPlacementSets.get(schedulerRequestKey)
.updateResourceRequests(
entry.getValue().values(),
recoverPreemptedRequestForAContainer);
if (null != pendingAmountChanges) {
updatePendingResources(
pendingAmountChanges.getLastAnyResourceRequest(),
pendingAmountChanges.getNewResourceRequest(), schedulerRequestKey,
queue.getMetrics());
offswitchResourcesUpdated = true;
}
}
return offswitchResourcesUpdated;
}
private void updatePendingResources(ResourceRequest lastRequest,
ResourceRequest request, SchedulerRequestKey schedulerKey,
QueueMetrics metrics) {
@ -717,8 +733,8 @@ public List<ResourceRequest> allocate(NodeType type,
updateMetricsForAllocatedContainer(type, containerAllocated);
}
return schedulerKeyToPlacementSets.get(schedulerKey).allocate(type, node,
request);
return schedulerKeyToPlacementSets.get(schedulerKey)
.allocate(schedulerKey, type, node, request);
} finally {
writeLock.unlock();
}

View File

@ -0,0 +1,265 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* Class encapsulates all outstanding container increase and decrease
* requests for an application.
*/
public class ContainerUpdateContext {
public static final ContainerId UNDEFINED =
ContainerId.newContainerId(ApplicationAttemptId.newInstance(
ApplicationId.newInstance(-1, -1), -1), -1);
protected static final RecordFactory RECORD_FACTORY =
RecordFactoryProvider.getRecordFactory(null);
// Keep track of containers that are undergoing promotion
private final Map<SchedulerRequestKey, Map<Resource,
Map<NodeId, Set<ContainerId>>>> outstandingIncreases = new HashMap<>();
private final Set<ContainerId> outstandingDecreases = new HashSet<>();
private final AppSchedulingInfo appSchedulingInfo;
ContainerUpdateContext(AppSchedulingInfo appSchedulingInfo) {
this.appSchedulingInfo = appSchedulingInfo;
}
private synchronized boolean isBeingIncreased(Container container) {
Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
outstandingIncreases.get(
new SchedulerRequestKey(container.getPriority(),
container.getAllocationRequestId(), container.getId()));
if (resourceMap != null) {
Map<NodeId, Set<ContainerId>> locationMap =
resourceMap.get(container.getResource());
if (locationMap != null) {
Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
if (containerIds != null && !containerIds.isEmpty()) {
return containerIds.contains(container.getId());
}
}
}
return false;
}
/**
* Add the container to outstanding decreases.
* @param container Container.
* @return true if updated to outstanding decreases was successful.
*/
public synchronized boolean checkAndAddToOutstandingDecreases(
Container container) {
if (isBeingIncreased(container)
|| outstandingDecreases.contains(container.getId())) {
return false;
}
outstandingDecreases.add(container.getId());
return true;
}
/**
* Add the container to outstanding increases.
* @param rmContainer RMContainer.
* @param schedulerNode SchedulerNode.
* @param updateRequest UpdateContainerRequest.
* @return true if updated to outstanding increases was successful.
*/
public synchronized boolean checkAndAddToOutstandingIncreases(
RMContainer rmContainer, SchedulerNode schedulerNode,
UpdateContainerRequest updateRequest) {
Container container = rmContainer.getContainer();
SchedulerRequestKey schedulerKey =
SchedulerRequestKey.create(updateRequest,
rmContainer.getAllocatedSchedulerKey());
Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
outstandingIncreases.get(schedulerKey);
if (resourceMap == null) {
resourceMap = new HashMap<>();
outstandingIncreases.put(schedulerKey, resourceMap);
}
Map<NodeId, Set<ContainerId>> locationMap =
resourceMap.get(container.getResource());
if (locationMap == null) {
locationMap = new HashMap<>();
resourceMap.put(container.getResource(), locationMap);
}
Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
if (containerIds == null) {
containerIds = new HashSet<>();
locationMap.put(container.getNodeId(), containerIds);
}
if (containerIds.contains(container.getId())
|| outstandingDecreases.contains(container.getId())) {
return false;
}
containerIds.add(container.getId());
Map<SchedulerRequestKey, Map<String, ResourceRequest>> updateResReqs =
new HashMap<>();
Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer);
Map<String, ResourceRequest> resMap =
createResourceRequests(rmContainer, schedulerNode,
schedulerKey, resToIncrease);
updateResReqs.put(schedulerKey, resMap);
appSchedulingInfo.addToPlacementSets(false, updateResReqs);
return true;
}
private Map<String, ResourceRequest> createResourceRequests(
RMContainer rmContainer, SchedulerNode schedulerNode,
SchedulerRequestKey schedulerKey, Resource resToIncrease) {
Map<String, ResourceRequest> resMap = new HashMap<>();
resMap.put(rmContainer.getContainer().getNodeId().getHost(),
createResourceReqForIncrease(schedulerKey, resToIncrease,
RECORD_FACTORY.newRecordInstance(ResourceRequest.class),
rmContainer, rmContainer.getContainer().getNodeId().getHost()));
resMap.put(schedulerNode.getRackName(),
createResourceReqForIncrease(schedulerKey, resToIncrease,
RECORD_FACTORY.newRecordInstance(ResourceRequest.class),
rmContainer, schedulerNode.getRackName()));
resMap.put(ResourceRequest.ANY,
createResourceReqForIncrease(schedulerKey, resToIncrease,
RECORD_FACTORY.newRecordInstance(ResourceRequest.class),
rmContainer, ResourceRequest.ANY));
return resMap;
}
private Resource getResourceToIncrease(UpdateContainerRequest updateReq,
RMContainer rmContainer) {
if (updateReq.getContainerUpdateType() ==
ContainerUpdateType.PROMOTE_EXECUTION_TYPE) {
return rmContainer.getContainer().getResource();
}
// TODO: Fix this for container increase..
// This has to equal the Resources in excess of fitsIn()
// for container increase and is equal to the container total
// resource for Promotion.
return null;
}
private static ResourceRequest createResourceReqForIncrease(
SchedulerRequestKey schedulerRequestKey, Resource resToIncrease,
ResourceRequest rr, RMContainer rmContainer, String resourceName) {
rr.setResourceName(resourceName);
rr.setNumContainers(1);
rr.setRelaxLocality(false);
rr.setPriority(rmContainer.getContainer().getPriority());
rr.setAllocationRequestId(schedulerRequestKey.getAllocationRequestId());
rr.setCapability(resToIncrease);
rr.setNodeLabelExpression(rmContainer.getNodeLabelExpression());
rr.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(
ExecutionType.GUARANTEED, true));
return rr;
}
/**
* Remove Container from outstanding increases / decreases. Calling this
* method essentially completes the update process.
* @param schedulerKey SchedulerRequestKey.
* @param container Container.
*/
public synchronized void removeFromOutstandingUpdate(
SchedulerRequestKey schedulerKey, Container container) {
Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
outstandingIncreases.get(schedulerKey);
if (resourceMap != null) {
Map<NodeId, Set<ContainerId>> locationMap =
resourceMap.get(container.getResource());
if (locationMap != null) {
Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
if (containerIds != null && !containerIds.isEmpty()) {
containerIds.remove(container.getId());
if (containerIds.isEmpty()) {
locationMap.remove(container.getNodeId());
}
}
if (locationMap.isEmpty()) {
resourceMap.remove(container.getResource());
}
}
if (resourceMap.isEmpty()) {
outstandingIncreases.remove(schedulerKey);
}
}
outstandingDecreases.remove(container.getId());
}
/**
* Check if a new container is to be matched up against an outstanding
* Container increase request.
* @param node SchedulerNode.
* @param schedulerKey SchedulerRequestKey.
* @param rmContainer RMContainer.
* @return ContainerId.
*/
public ContainerId matchContainerToOutstandingIncreaseReq(
SchedulerNode node, SchedulerRequestKey schedulerKey,
RMContainer rmContainer) {
ContainerId retVal = null;
Container container = rmContainer.getContainer();
Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
outstandingIncreases.get(schedulerKey);
if (resourceMap != null) {
Map<NodeId, Set<ContainerId>> locationMap =
resourceMap.get(container.getResource());
if (locationMap != null) {
Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
if (containerIds != null && !containerIds.isEmpty()) {
retVal = containerIds.iterator().next();
}
}
}
// Allocation happened on NM on the same host, but not on the NM
// we need.. We need to signal that this container has to be released.
// We also need to add these requests back.. to be reallocated.
if (resourceMap != null && retVal == null) {
Map<SchedulerRequestKey, Map<String, ResourceRequest>> reqsToUpdate =
new HashMap<>();
Map<String, ResourceRequest> resMap = createResourceRequests
(rmContainer, node, schedulerKey,
rmContainer.getContainer().getResource());
reqsToUpdate.put(schedulerKey, resMap);
appSchedulingInfo.addToPlacementSets(true, reqsToUpdate);
return UNDEFINED;
}
return retVal;
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import java.util.ArrayList;
import java.util.List;
/**
* Holder class that maintains list of container update requests
*/
public class ContainerUpdates {
final List<UpdateContainerRequest> increaseRequests = new ArrayList<>();
final List<UpdateContainerRequest> decreaseRequests = new ArrayList<>();
final List<UpdateContainerRequest> promotionRequests = new ArrayList<>();
final List<UpdateContainerRequest> demotionRequests = new ArrayList<>();
/**
* Returns Container Increase Requests.
* @return Container Increase Requests.
*/
public List<UpdateContainerRequest> getIncreaseRequests() {
return increaseRequests;
}
/**
* Returns Container Decrease Requests.
* @return Container Decrease Requests.
*/
public List<UpdateContainerRequest> getDecreaseRequests() {
return decreaseRequests;
}
/**
* Returns Container Promotion Requests.
* @return Container Promotion Requests.
*/
public List<UpdateContainerRequest> getPromotionRequests() {
return promotionRequests;
}
/**
* Returns Container Demotion Requests.
* @return Container Demotion Requests.
*/
public List<UpdateContainerRequest> getDemotionRequests() {
return demotionRequests;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -47,6 +48,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NMToken;
@ -54,6 +56,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -133,10 +136,15 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private AtomicLong firstContainerAllocatedTime = new AtomicLong(0);
protected List<RMContainer> newlyAllocatedContainers = new ArrayList<>();
protected Map<ContainerId, RMContainer> newlyPromotedContainers = new HashMap<>();
protected Map<ContainerId, RMContainer> newlyDemotedContainers = new HashMap<>();
protected List<RMContainer> tempContainerToKill = new ArrayList<>();
protected Map<ContainerId, RMContainer> newlyDecreasedContainers = new HashMap<>();
protected Map<ContainerId, RMContainer> newlyIncreasedContainers = new HashMap<>();
protected Set<NMToken> updatedNMTokens = new HashSet<>();
protected List<UpdateContainerError> updateContainerErrors = new ArrayList<>();
// This pendingRelease is used in work-preserving recovery scenario to keep
// track of the AM's outstanding release requests. RM on recovery could
// receive the release request form AM before it receives the container status
@ -247,6 +255,10 @@ public AppSchedulingInfo getAppSchedulingInfo() {
return this.appSchedulingInfo;
}
public ContainerUpdateContext getUpdateContext() {
return this.appSchedulingInfo.getUpdateContext();
}
/**
* Is this application pending?
* @return true if it is else false.
@ -537,8 +549,9 @@ public RMContainer reserve(SchedulerNode node,
writeLock.lock();
// Create RMContainer if necessary
if (rmContainer == null) {
rmContainer = new RMContainerImpl(container, getApplicationAttemptId(),
node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
rmContainer = new RMContainerImpl(container, schedulerKey,
getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), rmContext);
}
if (rmContainer.getState() == RMContainerState.NEW) {
attemptResourceUsage.incReserved(node.getPartition(),
@ -635,10 +648,10 @@ public Resource getCurrentConsumption() {
}
private Container updateContainerAndNMToken(RMContainer rmContainer,
boolean newContainer, boolean increasedContainer) {
ContainerUpdateType updateType) {
Container container = rmContainer.getContainer();
ContainerType containerType = ContainerType.TASK;
if (!newContainer) {
if (updateType != null) {
container.setVersion(container.getVersion() + 1);
}
// The working knowledge is that masterContainer for AM is null as it
@ -662,12 +675,15 @@ private Container updateContainerAndNMToken(RMContainer rmContainer,
return null;
}
if (newContainer) {
if (updateType == null ||
ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateType ||
ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateType) {
rmContainer.handle(new RMContainerEvent(
rmContainer.getContainerId(), RMContainerEventType.ACQUIRED));
} else {
rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
rmContainer.getContainerId(), increasedContainer));
rmContainer.getContainerId(),
ContainerUpdateType.INCREASE_RESOURCE == updateType));
}
return container;
}
@ -699,8 +715,8 @@ public List<Container> pullNewlyAllocatedContainers() {
Iterator<RMContainer> i = newlyAllocatedContainers.iterator();
while (i.hasNext()) {
RMContainer rmContainer = i.next();
Container updatedContainer = updateContainerAndNMToken(rmContainer,
true, false);
Container updatedContainer =
updateContainerAndNMToken(rmContainer, null);
// Only add container to return list when it's not null.
// updatedContainer could be null when generate token failed, it can be
// caused by DNS resolving failed.
@ -713,9 +729,142 @@ public List<Container> pullNewlyAllocatedContainers() {
} finally {
writeLock.unlock();
}
}
public void addToNewlyDemotedContainers(ContainerId containerId,
RMContainer rmContainer) {
newlyDemotedContainers.put(containerId, rmContainer);
}
protected synchronized void addToUpdateContainerErrors(
UpdateContainerError error) {
updateContainerErrors.add(error);
}
protected synchronized void addToNewlyAllocatedContainers(
SchedulerNode node, RMContainer rmContainer) {
if (oppContainerContext == null) {
newlyAllocatedContainers.add(rmContainer);
return;
}
ContainerId matchedContainerId =
getUpdateContext().matchContainerToOutstandingIncreaseReq(
node, rmContainer.getAllocatedSchedulerKey(), rmContainer);
if (matchedContainerId != null) {
if (ContainerUpdateContext.UNDEFINED == matchedContainerId) {
// This is a spurious allocation (relaxLocality = false
// resulted in the Container being allocated on an NM on the same host
// but not on the NM running the container to be updated. Can
// happen if more than one NM exists on the same host.. usually
// occurs when using MiniYARNCluster to test).
tempContainerToKill.add(rmContainer);
} else {
newlyPromotedContainers.put(matchedContainerId, rmContainer);
}
} else {
newlyAllocatedContainers.add(rmContainer);
}
}
public List<Container> pullNewlyPromotedContainers() {
return pullContainersWithUpdatedExecType(newlyPromotedContainers,
ContainerUpdateType.PROMOTE_EXECUTION_TYPE);
}
public List<Container> pullNewlyDemotedContainers() {
return pullContainersWithUpdatedExecType(newlyDemotedContainers,
ContainerUpdateType.DEMOTE_EXECUTION_TYPE);
}
public List<UpdateContainerError> pullUpdateContainerErrors() {
List<UpdateContainerError> errors =
new ArrayList<>(updateContainerErrors);
updateContainerErrors.clear();
return errors;
}
/**
* A container is promoted if its executionType is changed from
* OPPORTUNISTIC to GUARANTEED. It id demoted if the change is from
* GUARANTEED to OPPORTUNISTIC.
* @return Newly Promoted and Demoted containers
*/
private List<Container> pullContainersWithUpdatedExecType(
Map<ContainerId, RMContainer> newlyUpdatedContainers,
ContainerUpdateType updateTpe) {
List<Container> updatedContainers = new ArrayList<>();
if (oppContainerContext == null) {
return updatedContainers;
}
try {
writeLock.lock();
Iterator<Map.Entry<ContainerId, RMContainer>> i =
newlyUpdatedContainers.entrySet().iterator();
while (i.hasNext()) {
Map.Entry<ContainerId, RMContainer> entry = i.next();
ContainerId matchedContainerId = entry.getKey();
RMContainer rmContainer = entry.getValue();
// swap containers
RMContainer existingRMContainer = swapContainer(
rmContainer, matchedContainerId);
getUpdateContext().removeFromOutstandingUpdate(
rmContainer.getAllocatedSchedulerKey(),
existingRMContainer.getContainer());
Container updatedContainer = updateContainerAndNMToken(
existingRMContainer, updateTpe);
updatedContainers.add(updatedContainer);
tempContainerToKill.add(rmContainer);
i.remove();
}
// Release all temporary containers
Iterator<RMContainer> tempIter = tempContainerToKill.iterator();
while (tempIter.hasNext()) {
RMContainer c = tempIter.next();
// Mark container for release (set RRs to null, so RM does not think
// it is a recoverable container)
((RMContainerImpl) c).setResourceRequests(null);
((AbstractYarnScheduler) rmContext.getScheduler()).completedContainer(c,
SchedulerUtils.createAbnormalContainerStatus(c.getContainerId(),
SchedulerUtils.UPDATED_CONTAINER),
RMContainerEventType.KILL);
tempIter.remove();
}
return updatedContainers;
} finally {
writeLock.unlock();
}
}
private RMContainer swapContainer(RMContainer rmContainer, ContainerId
matchedContainerId) {
RMContainer existingRMContainer =
getRMContainer(matchedContainerId);
if (existingRMContainer != null) {
// Swap updated container with the existing container
Container updatedContainer = rmContainer.getContainer();
Container newContainer = Container.newInstance(matchedContainerId,
existingRMContainer.getContainer().getNodeId(),
existingRMContainer.getContainer().getNodeHttpAddress(),
updatedContainer.getResource(),
existingRMContainer.getContainer().getPriority(), null,
updatedContainer.getExecutionType());
newContainer.setAllocationRequestId(
existingRMContainer.getContainer().getAllocationRequestId());
newContainer.setVersion(existingRMContainer.getContainer().getVersion());
rmContainer.getContainer().setResource(
existingRMContainer.getContainer().getResource());
rmContainer.getContainer().setExecutionType(
existingRMContainer.getContainer().getExecutionType());
((RMContainerImpl)existingRMContainer).setContainer(newContainer);
}
return existingRMContainer;
}
private List<Container> pullNewlyUpdatedContainers(
Map<ContainerId, RMContainer> updatedContainerMap, boolean increase) {
try {
@ -728,7 +877,8 @@ private List<Container> pullNewlyUpdatedContainers(
while (i.hasNext()) {
RMContainer rmContainer = i.next().getValue();
Container updatedContainer = updateContainerAndNMToken(rmContainer,
false, increase);
increase ? ContainerUpdateType.INCREASE_RESOURCE :
ContainerUpdateType.DECREASE_RESOURCE);
if (updatedContainer != null) {
returnContainerList.add(updatedContainer);
i.remove();

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@ -41,7 +42,10 @@
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -57,6 +61,9 @@ public class SchedulerUtils {
public static final String RELEASED_CONTAINER =
"Container released by application";
public static final String UPDATED_CONTAINER =
"Temporary container killed by application for ExeutionType update";
public static final String LOST_CONTAINER =
"Container released on a *lost* node";
@ -380,4 +387,19 @@ public static boolean hasPendingResourceRequest(ResourceCalculator rc,
}
return hasPendingResourceRequest(rc, usage, partitionToLookAt, cluster);
}
public static RMContainer createOpportunisticRmContainer(RMContext rmContext,
Container container, boolean isRemotelyAllocated) {
SchedulerApplicationAttempt appAttempt =
((AbstractYarnScheduler) rmContext.getScheduler())
.getCurrentAttemptForContainer(container.getId());
RMContainer rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container),
appAttempt.getApplicationAttemptId(), container.getNodeId(),
appAttempt.getUser(), rmContext, isRemotelyAllocated);
appAttempt.addRMContainer(container.getId(), rmContainer);
((AbstractYarnScheduler) rmContext.getScheduler()).getNode(
container.getNodeId()).allocateContainer(rmContainer);
return rmContainer;
}
}

View File

@ -136,8 +136,7 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
* @param release
* @param blacklistAdditions
* @param blacklistRemovals
* @param increaseRequests
* @param decreaseRequests
* @param updateRequests
* @return the {@link Allocation} for the application
*/
@Public
@ -145,8 +144,7 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
Allocation allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask, List<ContainerId> release,
List<String> blacklistAdditions, List<String> blacklistRemovals,
List<UpdateContainerRequest> increaseRequests,
List<UpdateContainerRequest> decreaseRequests);
ContainerUpdates updateRequests);
/**
* Get node resource usage report.

View File

@ -75,6 +75,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@ -913,22 +916,27 @@ private LeafQueue updateIncreaseRequests(
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
List<ResourceRequest> ask, List<ContainerId> release,
List<String> blacklistAdditions, List<String> blacklistRemovals,
List<UpdateContainerRequest> increaseRequests,
List<UpdateContainerRequest> decreaseRequests) {
ContainerUpdates updateRequests) {
FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
if (application == null) {
return EMPTY_ALLOCATION;
}
// Handle promotions and demotions
handleExecutionTypeUpdates(
application, updateRequests.getPromotionRequests(),
updateRequests.getDemotionRequests());
// Release containers
releaseContainers(release, application);
// update increase requests
LeafQueue updateDemandForQueue = updateIncreaseRequests(increaseRequests,
LeafQueue updateDemandForQueue =
updateIncreaseRequests(updateRequests.getIncreaseRequests(),
application);
// Decrease containers
decreaseContainers(decreaseRequests, application);
decreaseContainers(updateRequests.getDecreaseRequests(), application);
// Sanity check for new allocation requests
normalizeRequests(ask);

View File

@ -746,7 +746,7 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult,
// When reserving container
RMContainer updatedContainer = reservedContainer;
if (updatedContainer == null) {
updatedContainer = new RMContainerImpl(container,
updatedContainer = new RMContainerImpl(container, schedulerKey,
application.getApplicationAttemptId(), node.getNodeID(),
application.getAppSchedulingInfo().getUser(), rmContext);
}

View File

@ -223,7 +223,7 @@ public RMContainer allocate(FiCaSchedulerNode node,
}
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container,
RMContainer rmContainer = new RMContainerImpl(container, schedulerKey,
this.getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), this.rmContext,
request.getNodeLabelExpression());
@ -555,12 +555,14 @@ public void apply(Resource cluster,
// Update this application for the allocated container
if (!allocation.isIncreasedAllocation()) {
// Allocate a new container
newlyAllocatedContainers.add(rmContainer);
addToNewlyAllocatedContainers(
schedulerContainer.getSchedulerNode(), rmContainer);
liveContainers.put(containerId, rmContainer);
// Deduct pending resource requests
List<ResourceRequest> requests = appSchedulingInfo.allocate(
allocation.getAllocationLocalityType(), schedulerContainer.getSchedulerNode(),
allocation.getAllocationLocalityType(),
schedulerContainer.getSchedulerNode(),
schedulerContainer.getSchedulerRequestKey(),
schedulerContainer.getRmContainer().getContainer());
((RMContainerImpl) rmContainer).setResourceRequests(requests);
@ -752,12 +754,15 @@ public Allocation getAllocation(ResourceCalculator resourceCalculator,
List<Container> newlyAllocatedContainers = pullNewlyAllocatedContainers();
List<Container> newlyIncreasedContainers = pullNewlyIncreasedContainers();
List<Container> newlyDecreasedContainers = pullNewlyDecreasedContainers();
List<Container> newlyPromotedContainers = pullNewlyPromotedContainers();
List<Container> newlyDemotedContainers = pullNewlyDemotedContainers();
List<NMToken> updatedNMTokens = pullUpdatedNMTokens();
Resource headroom = getHeadroom();
setApplicationHeadroomForMetrics(headroom);
return new Allocation(newlyAllocatedContainers, headroom, null,
currentContPreemption, Collections.singletonList(rr), updatedNMTokens,
newlyIncreasedContainers, newlyDecreasedContainers);
newlyIncreasedContainers, newlyDecreasedContainers,
newlyPromotedContainers, newlyDemotedContainers);
} finally {
writeLock.unlock();
}

View File

@ -449,13 +449,13 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node,
}
// Create RMContainer
rmContainer = new RMContainerImpl(container,
rmContainer = new RMContainerImpl(container, schedulerKey,
getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), rmContext);
((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
addToNewlyAllocatedContainers(node, rmContainer);
liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations

View File

@ -54,7 +54,6 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -84,6 +83,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@ -811,8 +811,7 @@ public Resource getNormalizedResource(Resource requestedResource) {
public Allocation allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask, List<ContainerId> release,
List<String> blacklistAdditions, List<String> blacklistRemovals,
List<UpdateContainerRequest> increaseRequests,
List<UpdateContainerRequest> decreaseRequests) {
ContainerUpdates updateRequests) {
// Make sure this application exists
FSAppAttempt application = getSchedulerApp(appAttemptId);
@ -822,6 +821,11 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
return EMPTY_ALLOCATION;
}
// Handle promotions and demotions
handleExecutionTypeUpdates(
application, updateRequests.getPromotionRequests(),
updateRequests.getDemotionRequests());
// Sanity check
normalizeRequests(ask);
@ -876,7 +880,9 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
application.setApplicationHeadroomForMetrics(headroom);
return new Allocation(newlyAllocatedContainers, headroom,
preemptionContainerIds, null, null,
application.pullUpdatedNMTokens());
application.pullUpdatedNMTokens(), null, null,
application.pullNewlyPromotedContainers(),
application.pullNewlyDemotedContainers());
}
@Override

View File

@ -68,7 +68,7 @@ public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container,
this.getApplicationAttemptId(), node.getNodeID(),
schedulerKey, this.getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), this.rmContext,
request.getNodeLabelExpression());
((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
@ -76,7 +76,7 @@ public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
updateAMContainerDiagnostics(AMState.ASSIGNED, null);
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
addToNewlyAllocatedContainers(node, rmContainer);
ContainerId containerId = container.getId();
liveContainers.put(containerId, rmContainer);

View File

@ -40,7 +40,6 @@
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -62,6 +61,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -325,8 +325,7 @@ public synchronized void setRMContext(RMContext rmContext) {
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
List<ResourceRequest> ask, List<ContainerId> release,
List<String> blacklistAdditions, List<String> blacklistRemovals,
List<UpdateContainerRequest> increaseRequests,
List<UpdateContainerRequest> decreaseRequests) {
ContainerUpdates updateRequests) {
FifoAppAttempt application = getApplicationAttempt(applicationAttemptId);
if (application == null) {
LOG.error("Calling allocate on removed " +

View File

@ -157,7 +157,8 @@ public ResourceRequest getResourceRequest(String resourceName) {
return resourceRequestMap.get(resourceName);
}
private void decrementOutstanding(ResourceRequest offSwitchRequest) {
private void decrementOutstanding(SchedulerRequestKey schedulerRequestKey,
ResourceRequest offSwitchRequest) {
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
// Do not remove ANY
@ -166,8 +167,6 @@ private void decrementOutstanding(ResourceRequest offSwitchRequest) {
// Do we have any outstanding requests?
// If there is nothing, we need to deactivate this application
if (numOffSwitchContainers == 0) {
SchedulerRequestKey schedulerRequestKey = SchedulerRequestKey.create(
offSwitchRequest);
appSchedulingInfo.decrementSchedulerKeyReference(schedulerRequestKey);
appSchedulingInfo.checkForDeactivation();
}
@ -193,15 +192,15 @@ public ResourceRequest cloneResourceRequest(ResourceRequest request) {
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
*/
private void allocateRackLocal(SchedulerNode node,
ResourceRequest rackLocalRequest,
private void allocateRackLocal(SchedulerRequestKey schedulerKey,
SchedulerNode node, ResourceRequest rackLocalRequest,
List<ResourceRequest> resourceRequests) {
// Update future requirements
decResourceRequest(node.getRackName(), rackLocalRequest);
ResourceRequest offRackRequest = resourceRequestMap.get(
ResourceRequest.ANY);
decrementOutstanding(offRackRequest);
decrementOutstanding(schedulerKey, offRackRequest);
// Update cloned RackLocal and OffRack requests for recovery
resourceRequests.add(cloneResourceRequest(rackLocalRequest));
@ -212,10 +211,11 @@ private void allocateRackLocal(SchedulerNode node,
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
*/
private void allocateOffSwitch(ResourceRequest offSwitchRequest,
private void allocateOffSwitch(SchedulerRequestKey schedulerKey,
ResourceRequest offSwitchRequest,
List<ResourceRequest> resourceRequests) {
// Update future requirements
decrementOutstanding(offSwitchRequest);
decrementOutstanding(schedulerKey, offSwitchRequest);
// Update cloned OffRack requests for recovery
resourceRequests.add(cloneResourceRequest(offSwitchRequest));
}
@ -225,8 +225,8 @@ private void allocateOffSwitch(ResourceRequest offSwitchRequest,
* The {@link ResourceScheduler} is allocating data-local resources to the
* application.
*/
private void allocateNodeLocal(SchedulerNode node,
ResourceRequest nodeLocalRequest,
private void allocateNodeLocal(SchedulerRequestKey schedulerKey,
SchedulerNode node, ResourceRequest nodeLocalRequest,
List<ResourceRequest> resourceRequests) {
// Update future requirements
decResourceRequest(node.getNodeName(), nodeLocalRequest);
@ -237,7 +237,7 @@ private void allocateNodeLocal(SchedulerNode node,
ResourceRequest offRackRequest = resourceRequestMap.get(
ResourceRequest.ANY);
decrementOutstanding(offRackRequest);
decrementOutstanding(schedulerKey, offRackRequest);
// Update cloned NodeLocal, RackLocal and OffRack requests for recovery
resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
@ -282,8 +282,8 @@ public boolean canAllocate(NodeType type, SchedulerNode node) {
}
@Override
public List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
ResourceRequest request) {
public List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey,
NodeType type, SchedulerNode node, ResourceRequest request) {
try {
writeLock.lock();
@ -300,11 +300,11 @@ public List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
}
if (type == NodeType.NODE_LOCAL) {
allocateNodeLocal(node, request, resourceRequests);
allocateNodeLocal(schedulerKey, node, request, resourceRequests);
} else if (type == NodeType.RACK_LOCAL) {
allocateRackLocal(node, request, resourceRequests);
allocateRackLocal(schedulerKey, node, request, resourceRequests);
} else{
allocateOffSwitch(request, resourceRequests);
allocateOffSwitch(schedulerKey, request, resourceRequests);
}
return resourceRequests;

View File

@ -78,13 +78,14 @@ ResourceRequestUpdateResult updateResourceRequests(
/**
* Notify container allocated.
* @param schedulerKey SchedulerRequestKey for this ResourceRequest
* @param type Type of the allocation
* @param node Which node this container allocated on
* @param request Which resource request to allocate
* @return list of ResourceRequests deducted
*/
List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
ResourceRequest request);
List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey,
NodeType type, SchedulerNode node, ResourceRequest request);
/**
* We can still have pending requirement for a given NodeType and node

View File

@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -331,7 +332,8 @@ public synchronized List<Container> getResources() throws IOException {
// Get resources from the ResourceManager
Allocation allocation = resourceManager.getResourceScheduler().allocate(
applicationAttemptId, new ArrayList<ResourceRequest>(ask),
new ArrayList<ContainerId>(), null, null, null, null);
new ArrayList<ContainerId>(), null, null,
new ContainerUpdates());
if (LOG.isInfoEnabled()) {
LOG.info("-=======" + applicationAttemptId + System.lineSeparator() +

View File

@ -251,6 +251,13 @@ public AllocateResponse sendContainerResizingRequest(
return allocate(req);
}
public AllocateResponse sendContainerUpdateRequest(
List<UpdateContainerRequest> updateRequests) throws Exception {
final AllocateRequest req = AllocateRequest.newInstance(0, 0F, null, null,
updateRequests, null);
return allocate(req);
}
public AllocateResponse allocate(AllocateRequest allocateRequest)
throws Exception {
UserGroupInformation ugi =

View File

@ -195,6 +195,12 @@ public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
isHealthy, resId);
}
public NodeHeartbeatResponse nodeHeartbeat(
List<ContainerStatus> updatedStats, boolean isHealthy) throws Exception {
return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(),
isHealthy, ++responseId);
}
public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
List<Container> increasedConts, boolean isHealthy, int resId)
throws Exception {

View File

@ -140,6 +140,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock;
@ -1060,7 +1062,8 @@ public ApplicationReport createAndGetApplicationReport(
Container container = Container.newInstance(
ContainerId.newContainerId(attemptId, 1), null, "", null, null, null);
RMContainerImpl containerimpl = spy(new RMContainerImpl(container,
attemptId, null, "", rmContext));
SchedulerRequestKey.extractFrom(container), attemptId, null, "",
rmContext));
Map<ApplicationAttemptId, RMAppAttempt> attempts =
new HashMap<ApplicationAttemptId, RMAppAttempt>();
attempts.put(attemptId, rmAppAttemptImpl);

View File

@ -34,11 +34,15 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -64,8 +68,11 @@
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
@ -75,13 +82,17 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
/**
@ -91,8 +102,10 @@ public class TestOpportunisticContainerAllocatorAMService {
private static final int GB = 1024;
@Test(timeout = 60000)
public void testNodeRemovalDuringAllocate() throws Exception {
private MockRM rm;
@Before
public void createAndStartRM() {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
YarnConfiguration conf = new YarnConfiguration(csConf);
@ -102,8 +115,445 @@ public void testNodeRemovalDuringAllocate() throws Exception {
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setInt(
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100);
MockRM rm = new MockRM(conf);
rm = new MockRM(conf);
rm.start();
}
@After
public void stopRM() {
if (rm != null) {
rm.stop();
}
}
@Test(timeout = 600000)
public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h1:4321", 4096, rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
MockNM nm3 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm3.getNodeId(), nm3);
MockNM nm4 = new MockNM("h2:4321", 4096, rm.getResourceTrackerService());
nodes.put(nm4.getNodeId(), nm4);
nm1.registerNode();
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
OpportunisticContainerAllocatorAMService amservice =
(OpportunisticContainerAllocatorAMService) rm
.getApplicationMasterService();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
ApplicationAttemptId attemptId =
app1.getCurrentAppAttempt().getAppAttemptId();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
ResourceScheduler scheduler = rm.getResourceScheduler();
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId());
RMNode rmNode4 = rm.getRMContext().getRMNodes().get(nm4.getNodeId());
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
nm4.nodeHeartbeat(true);
((RMNodeImpl) rmNode1)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
((RMNodeImpl) rmNode2)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
((RMNodeImpl) rmNode3)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
((RMNodeImpl) rmNode4)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
.getApplicationAttempt(attemptId).getOpportunisticContainerContext();
// Send add and update node events to AM Service.
amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
amservice.handle(new NodeAddedSchedulerEvent(rmNode3));
amservice.handle(new NodeAddedSchedulerEvent(rmNode4));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode3));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode4));
// All nodes 1 - 4 will be applicable for scheduling.
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
nm4.nodeHeartbeat(true);
Thread.sleep(1000);
QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue()
.getMetrics();
// Verify Metrics
verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
AllocateResponse allocateResponse = am1.allocate(
Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
"*", Resources.createResource(1 * GB), 2, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true))),
null);
List<Container> allocatedContainers = allocateResponse
.getAllocatedContainers();
Assert.assertEquals(2, allocatedContainers.size());
Container container = allocatedContainers.get(0);
MockNM allocNode = nodes.get(container.getNodeId());
MockNM sameHostDiffNode = null;
for (NodeId n : nodes.keySet()) {
if (n.getHost().equals(allocNode.getNodeId().getHost()) &&
n.getPort() != allocNode.getNodeId().getPort()) {
sameHostDiffNode = nodes.get(n);
}
}
// Verify Metrics After OPP allocation (Nothing should change)
verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
am1.sendContainerUpdateRequest(
Arrays.asList(UpdateContainerRequest.newInstance(0,
container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
null, ExecutionType.GUARANTEED)));
// Node on same host should not result in allocation
sameHostDiffNode.nodeHeartbeat(true);
Thread.sleep(200);
allocateResponse = am1.allocate(null, null);
Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
// Verify Metrics After OPP allocation (Nothing should change again)
verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
// Send Promotion req again... this should result in update error
allocateResponse = am1.sendContainerUpdateRequest(
Arrays.asList(UpdateContainerRequest.newInstance(0,
container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
null, ExecutionType.GUARANTEED)));
Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
Assert.assertEquals(1, allocateResponse.getUpdateErrors().size());
Assert.assertEquals("UPDATE_OUTSTANDING_ERROR",
allocateResponse.getUpdateErrors().get(0).getReason());
Assert.assertEquals(container.getId(),
allocateResponse.getUpdateErrors().get(0)
.getUpdateContainerRequest().getContainerId());
// Send Promotion req again with incorrect version...
// this should also result in update error
allocateResponse = am1.sendContainerUpdateRequest(
Arrays.asList(UpdateContainerRequest.newInstance(1,
container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
null, ExecutionType.GUARANTEED)));
Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
Assert.assertEquals(1, allocateResponse.getUpdateErrors().size());
Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR|1|0",
allocateResponse.getUpdateErrors().get(0).getReason());
Assert.assertEquals(container.getId(),
allocateResponse.getUpdateErrors().get(0)
.getUpdateContainerRequest().getContainerId());
// Ensure after correct node heartbeats, we should get the allocation
allocNode.nodeHeartbeat(true);
Thread.sleep(200);
allocateResponse = am1.allocate(null, null);
Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
Container uc =
allocateResponse.getUpdatedContainers().get(0).getContainer();
Assert.assertEquals(ExecutionType.GUARANTEED, uc.getExecutionType());
Assert.assertEquals(uc.getId(), container.getId());
Assert.assertEquals(uc.getVersion(), container.getVersion() + 1);
// Verify Metrics After OPP allocation :
// Allocated cores+mem should have increased, available should decrease
verifyMetrics(metrics, 14336, 14, 2048, 2, 2);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
nm4.nodeHeartbeat(true);
Thread.sleep(200);
// Verify that the container is still in ACQUIRED state wrt the RM.
RMContainer rmContainer = ((CapacityScheduler) scheduler)
.getApplicationAttempt(
uc.getId().getApplicationAttemptId()).getRMContainer(uc.getId());
Assert.assertEquals(RMContainerState.ACQUIRED, rmContainer.getState());
// Now demote the container back..
allocateResponse = am1.sendContainerUpdateRequest(
Arrays.asList(UpdateContainerRequest.newInstance(uc.getVersion(),
uc.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
null, ExecutionType.OPPORTUNISTIC)));
// This should happen in the same heartbeat..
Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
uc = allocateResponse.getUpdatedContainers().get(0).getContainer();
Assert.assertEquals(ExecutionType.OPPORTUNISTIC, uc.getExecutionType());
Assert.assertEquals(uc.getId(), container.getId());
Assert.assertEquals(uc.getVersion(), container.getVersion() + 2);
// Verify Metrics After OPP allocation :
// Everything should have reverted to what it was
verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
}
@Test(timeout = 60000)
public void testContainerPromoteAfterContainerStart() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
nm1.registerNode();
nm2.registerNode();
OpportunisticContainerAllocatorAMService amservice =
(OpportunisticContainerAllocatorAMService) rm
.getApplicationMasterService();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
ApplicationAttemptId attemptId =
app1.getCurrentAppAttempt().getAppAttemptId();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
ResourceScheduler scheduler = rm.getResourceScheduler();
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
((RMNodeImpl) rmNode1)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
((RMNodeImpl) rmNode2)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
.getApplicationAttempt(attemptId).getOpportunisticContainerContext();
// Send add and update node events to AM Service.
amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
// All nodes 1 to 2 will be applicable for scheduling.
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue()
.getMetrics();
// Verify Metrics
verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
AllocateResponse allocateResponse = am1.allocate(
Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
"*", Resources.createResource(1 * GB), 2, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true))),
null);
List<Container> allocatedContainers = allocateResponse
.getAllocatedContainers();
Assert.assertEquals(2, allocatedContainers.size());
Container container = allocatedContainers.get(0);
MockNM allocNode = nodes.get(container.getNodeId());
// Start Container in NM
allocNode.nodeHeartbeat(Arrays.asList(
ContainerStatus.newInstance(container.getId(),
ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)),
true);
Thread.sleep(200);
// Verify that container is actually running wrt the RM..
RMContainer rmContainer = ((CapacityScheduler) scheduler)
.getApplicationAttempt(
container.getId().getApplicationAttemptId()).getRMContainer(
container.getId());
Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState());
// Verify Metrics After OPP allocation (Nothing should change)
verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
am1.sendContainerUpdateRequest(
Arrays.asList(UpdateContainerRequest.newInstance(0,
container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
null, ExecutionType.GUARANTEED)));
// Verify Metrics After OPP allocation (Nothing should change again)
verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
// Send Promotion req again... this should result in update error
allocateResponse = am1.sendContainerUpdateRequest(
Arrays.asList(UpdateContainerRequest.newInstance(0,
container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
null, ExecutionType.GUARANTEED)));
Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
Assert.assertEquals(1, allocateResponse.getUpdateErrors().size());
Assert.assertEquals("UPDATE_OUTSTANDING_ERROR",
allocateResponse.getUpdateErrors().get(0).getReason());
Assert.assertEquals(container.getId(),
allocateResponse.getUpdateErrors().get(0)
.getUpdateContainerRequest().getContainerId());
// Start Container in NM
allocNode.nodeHeartbeat(Arrays.asList(
ContainerStatus.newInstance(container.getId(),
ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)),
true);
Thread.sleep(200);
allocateResponse = am1.allocate(null, null);
Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
Container uc =
allocateResponse.getUpdatedContainers().get(0).getContainer();
Assert.assertEquals(ExecutionType.GUARANTEED, uc.getExecutionType());
Assert.assertEquals(uc.getId(), container.getId());
Assert.assertEquals(uc.getVersion(), container.getVersion() + 1);
// Verify that the Container is still in RUNNING state wrt RM..
rmContainer = ((CapacityScheduler) scheduler)
.getApplicationAttempt(
uc.getId().getApplicationAttemptId()).getRMContainer(uc.getId());
Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState());
// Verify Metrics After OPP allocation :
// Allocated cores+mem should have increased, available should decrease
verifyMetrics(metrics, 6144, 6, 2048, 2, 2);
}
@Test(timeout = 600000)
public void testContainerPromoteAfterContainerComplete() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
nm1.registerNode();
nm2.registerNode();
OpportunisticContainerAllocatorAMService amservice =
(OpportunisticContainerAllocatorAMService) rm
.getApplicationMasterService();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
ApplicationAttemptId attemptId =
app1.getCurrentAppAttempt().getAppAttemptId();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
ResourceScheduler scheduler = rm.getResourceScheduler();
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
((RMNodeImpl) rmNode1)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
((RMNodeImpl) rmNode2)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
.getApplicationAttempt(attemptId).getOpportunisticContainerContext();
// Send add and update node events to AM Service.
amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
// All nodes 1 to 2 will be applicable for scheduling.
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue()
.getMetrics();
// Verify Metrics
verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
AllocateResponse allocateResponse = am1.allocate(
Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
"*", Resources.createResource(1 * GB), 2, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true))),
null);
List<Container> allocatedContainers = allocateResponse
.getAllocatedContainers();
Assert.assertEquals(2, allocatedContainers.size());
Container container = allocatedContainers.get(0);
MockNM allocNode = nodes.get(container.getNodeId());
// Start Container in NM
allocNode.nodeHeartbeat(Arrays.asList(
ContainerStatus.newInstance(container.getId(),
ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)),
true);
Thread.sleep(200);
// Verify that container is actually running wrt the RM..
RMContainer rmContainer = ((CapacityScheduler) scheduler)
.getApplicationAttempt(
container.getId().getApplicationAttemptId()).getRMContainer(
container.getId());
Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState());
// Container Completed in the NM
allocNode.nodeHeartbeat(Arrays.asList(
ContainerStatus.newInstance(container.getId(),
ExecutionType.OPPORTUNISTIC, ContainerState.COMPLETE, "", 0)),
true);
Thread.sleep(200);
// Verify that container has been removed..
rmContainer = ((CapacityScheduler) scheduler)
.getApplicationAttempt(
container.getId().getApplicationAttemptId()).getRMContainer(
container.getId());
Assert.assertNull(rmContainer);
// Verify Metrics After OPP allocation (Nothing should change)
verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
// Send Promotion req... this should result in update error
// Since the container doesn't exist anymore..
allocateResponse = am1.sendContainerUpdateRequest(
Arrays.asList(UpdateContainerRequest.newInstance(0,
container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
null, ExecutionType.GUARANTEED)));
Assert.assertEquals(1,
allocateResponse.getCompletedContainersStatuses().size());
Assert.assertEquals(container.getId(),
allocateResponse.getCompletedContainersStatuses().get(0)
.getContainerId());
Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
Assert.assertEquals(1, allocateResponse.getUpdateErrors().size());
Assert.assertEquals("INVALID_CONTAINER_ID",
allocateResponse.getUpdateErrors().get(0).getReason());
Assert.assertEquals(container.getId(),
allocateResponse.getUpdateErrors().get(0)
.getUpdateContainerRequest().getContainerId());
// Verify Metrics After OPP allocation (Nothing should change again)
verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
}
private void verifyMetrics(QueueMetrics metrics, long availableMB,
int availableVirtualCores, long allocatedMB,
int allocatedVirtualCores, int allocatedContainers) {
Assert.assertEquals(availableMB, metrics.getAvailableMB());
Assert.assertEquals(availableVirtualCores, metrics.getAvailableVirtualCores());
Assert.assertEquals(allocatedMB, metrics.getAllocatedMB());
Assert.assertEquals(allocatedVirtualCores, metrics.getAllocatedVirtualCores());
Assert.assertEquals(allocatedContainers, metrics.getAllocatedContainers());
}
@Test(timeout = 60000)
public void testNodeRemovalDuringAllocate() throws Exception {
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nm1.registerNode();

View File

@ -100,6 +100,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@ -478,7 +479,7 @@ private void testAppAttemptScheduledState() {
assertEquals(expectedState, applicationAttempt.getAppAttemptState());
verify(scheduler, times(expectedAllocateCount)).allocate(
any(ApplicationAttemptId.class), any(List.class), any(List.class),
any(List.class), any(List.class), any(List.class), any(List.class));
any(List.class), any(List.class), any(ContainerUpdates.class));
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertNull(applicationAttempt.getMasterContainer());
@ -499,7 +500,7 @@ private void testAppAttemptAllocatedState(Container amContainer) {
verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class));
verify(scheduler, times(2)).allocate(any(ApplicationAttemptId.class),
any(List.class), any(List.class), any(List.class), any(List.class),
any(List.class), any(List.class));
any(ContainerUpdates.class));
verify(nmTokenManager).clearNodeSetForAttempt(
applicationAttempt.getAppAttemptId());
}
@ -526,7 +527,7 @@ private void testAppAttemptFailedState(Container container,
}
/**
* {@link RMAppAttemptState#LAUNCH}
* {@link RMAppAttemptState#LAUNCHED}
*/
private void testAppAttemptLaunchedState(Container container) {
assertEquals(RMAppAttemptState.LAUNCHED,
@ -649,8 +650,8 @@ private Container allocateApplicationAttempt() {
when(allocation.getContainers()).
thenReturn(Collections.singletonList(container));
when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class),
any(List.class), any(List.class), any(List.class), any(List.class),
any(List.class))).
any(List.class), any(List.class), any(List.class),
any(ContainerUpdates.class))).
thenReturn(allocation);
RMContainer rmContainer = mock(RMContainerImpl.class);
when(scheduler.getRMContainer(container.getId())).
@ -1129,8 +1130,9 @@ public void testLaunchedFailWhileAHSEnabled() {
when(allocation.getContainers()).
thenReturn(Collections.singletonList(amContainer));
when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class),
any(List.class), any(List.class), any(List.class), any(List.class),
any(List.class))).thenReturn(allocation);
any(List.class), any(List.class), any(List.class),
any(ContainerUpdates.class)))
.thenReturn(allocation);
RMContainer rmContainer = mock(RMContainerImpl.class);
when(scheduler.getRMContainer(amContainer.getId())).thenReturn(rmContainer);
@ -1610,7 +1612,8 @@ public void testScheduleTransitionReplaceAMContainerRequestWithDefaults() {
YarnScheduler mockScheduler = mock(YarnScheduler.class);
when(mockScheduler.allocate(any(ApplicationAttemptId.class),
any(List.class), any(List.class), any(List.class), any(List.class),
any(List.class), any(List.class))).thenAnswer(new Answer<Allocation>() {
any(ContainerUpdates.class)))
.thenAnswer(new Answer<Allocation>() {
@SuppressWarnings("rawtypes")
@Override

View File

@ -62,6 +62,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Test;
@ -114,7 +115,8 @@ public void testReleaseWhileRunning() {
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
true);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
RMContainer rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container), appAttemptId,
nodeId, "user", rmContext);
assertEquals(RMContainerState.NEW, rmContainer.getState());
@ -216,7 +218,8 @@ public void testExpireWhileRunning() {
when(rmContext.getYarnConfiguration()).thenReturn(conf);
when(rmContext.getRMApps()).thenReturn(appMap);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
RMContainer rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container), appAttemptId,
nodeId, "user", rmContext);
assertEquals(RMContainerState.NEW, rmContainer.getState());

View File

@ -97,6 +97,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@ -158,7 +159,9 @@
public class TestCapacityScheduler {
private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
private final int GB = 1024;
private final static ContainerUpdates NULL_UPDATE_REQUESTS =
new ContainerUpdates();
private static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
private static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
private static final String A1 = A + ".a1";
@ -807,12 +810,12 @@ public void testBlackListNodes() throws Exception {
// Verify the blacklist can be updated independent of requesting containers
cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(),
Collections.singletonList(host), null, null, null);
Collections.singletonList(host), null, NULL_UPDATE_REQUESTS);
Assert.assertTrue(cs.getApplicationAttempt(appAttemptId)
.isPlaceBlacklisted(host));
cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(), null,
Collections.singletonList(host), null, null);
Collections.singletonList(host), NULL_UPDATE_REQUESTS);
Assert.assertFalse(cs.getApplicationAttempt(appAttemptId)
.isPlaceBlacklisted(host));
rm.stop();
@ -908,7 +911,7 @@ public void testAllocateReorder() throws Exception {
cs.allocate(appAttemptId1,
Collections.<ResourceRequest>singletonList(r1),
Collections.<ContainerId>emptyList(),
null, null, null, null);
null, null, NULL_UPDATE_REQUESTS);
//And this will result in container assignment for app1
CapacityScheduler.schedule(cs);
@ -925,7 +928,7 @@ public void testAllocateReorder() throws Exception {
cs.allocate(appAttemptId2,
Collections.<ResourceRequest>singletonList(r2),
Collections.<ContainerId>emptyList(),
null, null, null, null);
null, null, NULL_UPDATE_REQUESTS);
//In this case we do not perform container assignment because we want to
//verify re-ordering based on the allocation alone
@ -3050,7 +3053,8 @@ public void testApplicationHeadRoom() throws Exception {
Allocation allocate =
cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(),
Collections.<ContainerId> emptyList(), null, null, null, null);
Collections.<ContainerId> emptyList(), null, null,
NULL_UPDATE_REQUESTS);
Assert.assertNotNull(attempt);
@ -3066,7 +3070,8 @@ public void testApplicationHeadRoom() throws Exception {
allocate =
cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(),
Collections.<ContainerId> emptyList(), null, null, null, null);
Collections.<ContainerId> emptyList(), null, null,
NULL_UPDATE_REQUESTS);
// All resources should be sent as headroom
Assert.assertEquals(newResource, allocate.getResourceLimit());
@ -3573,7 +3578,7 @@ public void testCSReservationWithRootUnblocked() throws Exception {
cs.allocate(appAttemptId3,
Collections.<ResourceRequest>singletonList(y1Req),
Collections.<ContainerId>emptyList(),
null, null, null, null);
null, null, NULL_UPDATE_REQUESTS);
CapacityScheduler.schedule(cs);
}
assertEquals("Y1 Used Resource should be 4 GB", 4 * GB,
@ -3587,7 +3592,7 @@ public void testCSReservationWithRootUnblocked() throws Exception {
cs.allocate(appAttemptId1,
Collections.<ResourceRequest>singletonList(x1Req),
Collections.<ContainerId>emptyList(),
null, null, null, null);
null, null, NULL_UPDATE_REQUESTS);
CapacityScheduler.schedule(cs);
}
assertEquals("X1 Used Resource should be 7 GB", 7 * GB,
@ -3600,7 +3605,7 @@ public void testCSReservationWithRootUnblocked() throws Exception {
cs.allocate(appAttemptId2,
Collections.<ResourceRequest>singletonList(x2Req),
Collections.<ContainerId>emptyList(),
null, null, null, null);
null, null, NULL_UPDATE_REQUESTS);
CapacityScheduler.schedule(cs);
assertEquals("X2 Used Resource should be 0", 0,
cs.getQueue("x2").getUsedResources().getMemorySize());
@ -3612,7 +3617,7 @@ public void testCSReservationWithRootUnblocked() throws Exception {
cs.allocate(appAttemptId1,
Collections.<ResourceRequest>singletonList(x1Req),
Collections.<ContainerId>emptyList(),
null, null, null, null);
null, null, NULL_UPDATE_REQUESTS);
CapacityScheduler.schedule(cs);
assertEquals("X1 Used Resource should be 7 GB", 7 * GB,
cs.getQueue("x1").getUsedResources().getMemorySize());
@ -3626,7 +3631,7 @@ public void testCSReservationWithRootUnblocked() throws Exception {
cs.allocate(appAttemptId3,
Collections.<ResourceRequest>singletonList(y1Req),
Collections.<ContainerId>emptyList(),
null, null, null, null);
null, null, NULL_UPDATE_REQUESTS);
CapacityScheduler.schedule(cs);
}
assertEquals("P2 Used Resource should be 8 GB", 8 * GB,
@ -3685,7 +3690,7 @@ public void testCSQueueBlocked() throws Exception {
//This will allocate for app1
cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1),
Collections.<ContainerId>emptyList(),
null, null, null, null).getContainers().size();
null, null, NULL_UPDATE_REQUESTS).getContainers().size();
CapacityScheduler.schedule(cs);
ResourceRequest r2 = null;
for (int i =0; i < 13; i++) {
@ -3694,7 +3699,7 @@ public void testCSQueueBlocked() throws Exception {
cs.allocate(appAttemptId2,
Collections.<ResourceRequest>singletonList(r2),
Collections.<ContainerId>emptyList(),
null, null, null, null);
null, null, NULL_UPDATE_REQUESTS);
CapacityScheduler.schedule(cs);
}
assertEquals("A Used Resource should be 2 GB", 2 * GB,
@ -3707,11 +3712,11 @@ public void testCSQueueBlocked() throws Exception {
ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1),
Collections.<ContainerId>emptyList(),
null, null, null, null).getContainers().size();
null, null, NULL_UPDATE_REQUESTS).getContainers().size();
CapacityScheduler.schedule(cs);
cs.allocate(appAttemptId2, Collections.<ResourceRequest>singletonList(r2),
Collections.<ContainerId>emptyList(), null, null, null, null);
Collections.<ContainerId>emptyList(), null, null, NULL_UPDATE_REQUESTS);
CapacityScheduler.schedule(cs);
//Check blocked Resource
assertEquals("A Used Resource should be 2 GB", 2 * GB,

View File

@ -56,6 +56,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -280,7 +282,8 @@ public void testSortedQueues() throws Exception {
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
Container container=TestUtils.getMockContainer(containerId,
node_0.getNodeID(), Resources.createResource(1*GB), priority);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
RMContainer rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container), appAttemptId,
node_0.getNodeID(), "user", rmContext);
// Assign {1,2,3,4} 1GB containers respectively to queues

View File

@ -921,13 +921,15 @@ public void testGetAppToUnreserve() throws Exception {
Container container = TestUtils.getMockContainer(containerId,
node_1.getNodeID(), Resources.createResource(2*GB),
priorityMap.getPriority());
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
RMContainer rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container), appAttemptId,
node_1.getNodeID(), "user", rmContext);
Container container_1 = TestUtils.getMockContainer(containerId,
node_0.getNodeID(), Resources.createResource(1*GB),
priorityMap.getPriority());
RMContainer rmContainer_1 = new RMContainerImpl(container_1, appAttemptId,
RMContainer rmContainer_1 = new RMContainerImpl(container_1,
SchedulerRequestKey.extractFrom(container_1), appAttemptId,
node_0.getNodeID(), "user", rmContext);
// no reserved containers
@ -994,7 +996,8 @@ public void testFindNodeToUnreserve() throws Exception {
Container container = TestUtils.getMockContainer(containerId,
node_1.getNodeID(), Resources.createResource(2*GB),
priorityMap.getPriority());
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
RMContainer rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container), appAttemptId,
node_1.getNodeID(), "user", rmContext);
// nothing reserved

View File

@ -42,6 +42,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@ -72,6 +75,8 @@ public class FairSchedulerTestBase {
public static final float TEST_RESERVATION_THRESHOLD = 0.09f;
private static final int SLEEP_DURATION = 10;
private static final int SLEEP_RETRIES = 1000;
final static ContainerUpdates NULL_UPDATE_REQUESTS =
new ContainerUpdates();
/**
* The list of nodes added to the cluster using the {@link #addNode} method.
@ -183,7 +188,8 @@ protected ApplicationAttemptId createSchedulingRequest(
resourceManager.getRMContext().getRMApps()
.put(id.getApplicationId(), rmApp);
scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null, null, null);
scheduler.allocate(id, ask, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
return id;
}
@ -209,7 +215,8 @@ protected ApplicationAttemptId createSchedulingRequest(String queueId,
resourceManager.getRMContext().getRMApps()
.put(id.getApplicationId(), rmApp);
scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null, null, null);
scheduler.allocate(id, ask, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
return id;
}
@ -231,7 +238,8 @@ protected void createSchedulingRequestExistingApplication(
ResourceRequest request, ApplicationAttemptId attId) {
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ask.add(request);
scheduler.allocate(attId, ask, new ArrayList<ContainerId>(), null, null, null, null);
scheduler.allocate(attId, ask, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
}
protected void createApplicationWithAMResource(ApplicationAttemptId attId,

View File

@ -119,7 +119,8 @@ public void testBasic() throws InterruptedException {
List<ResourceRequest> ask = new ArrayList<>();
ask.add(createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true));
scheduler.allocate(
appAttemptId, ask, new ArrayList<ContainerId>(), null, null, null, null);
appAttemptId, ask, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
triggerSchedulingAttempt();
@ -157,7 +158,7 @@ public void testSortedNodes() throws Exception {
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
ask.add(request);
scheduler.allocate(appAttemptId, ask,
new ArrayList<ContainerId>(), null, null, null, null);
new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
triggerSchedulingAttempt();
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
@ -169,7 +170,7 @@ public void testSortedNodes() throws Exception {
ask.clear();
ask.add(request);
scheduler.allocate(appAttemptId, ask,
new ArrayList<ContainerId>(), null, null, null, null);
new ArrayList<ContainerId>(), null, null, NULL_UPDATE_REQUESTS);
triggerSchedulingAttempt();
checkAppConsumption(app, Resources.createResource(2048,2));
@ -335,7 +336,7 @@ public void testFairSchedulerContinuousSchedulingInitTime() throws Exception {
ask1.add(request1);
ask1.add(request2);
scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, null,
null, null);
NULL_UPDATE_REQUESTS);
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);

View File

@ -94,6 +94,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@ -127,6 +130,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
private final int GB = 1024;
private final static String ALLOC_FILE =
new File(TEST_DIR, "test-queues").getAbsolutePath();
private final static ContainerUpdates NULL_UPDATE_REQUESTS =
new ContainerUpdates();
@Before
public void setUp() throws IOException {
@ -1260,7 +1265,7 @@ public void testRackLocalAppReservationThreshold() throws Exception {
asks.add(createResourceRequest(2048, node2.getRackName(), 1, 1, false));
scheduler.allocate(attemptId, asks, new ArrayList<ContainerId>(), null,
null, null, null);
null, NULL_UPDATE_REQUESTS);
ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", "user1", 1);
scheduler.update();
@ -2114,7 +2119,8 @@ public void testQueueDemandCalculation() throws Exception {
ResourceRequest request1 =
createResourceRequest(minReqSize * 2, ResourceRequest.ANY, 1, 1, true);
ask1.add(request1);
scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, null, null, null);
scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
// Second ask, queue2 requests 1 large.
List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
@ -2124,7 +2130,8 @@ public void testQueueDemandCalculation() throws Exception {
ResourceRequest.ANY, 1, 1, false);
ask2.add(request2);
ask2.add(request3);
scheduler.allocate(id21, ask2, new ArrayList<ContainerId>(), null, null, null, null);
scheduler.allocate(id21, ask2, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
// Third ask, queue2 requests 2 small (minReqSize).
List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>();
@ -2134,7 +2141,8 @@ public void testQueueDemandCalculation() throws Exception {
ResourceRequest.ANY, 2, 2, true);
ask3.add(request4);
ask3.add(request5);
scheduler.allocate(id22, ask3, new ArrayList<ContainerId>(), null, null, null, null);
scheduler.allocate(id22, ask3, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
scheduler.update();
@ -2661,7 +2669,7 @@ public void testReservationWithMultiplePriorities() throws IOException {
app1.getLiveContainers().iterator().next().getContainerId();
scheduler.allocate(app1.getApplicationAttemptId(),
new ArrayList<ResourceRequest>(),
Arrays.asList(containerId), null, null, null, null);
Arrays.asList(containerId), null, null, new ContainerUpdates());
// Trigger allocation for app2
scheduler.handle(updateEvent);
@ -2747,7 +2755,7 @@ public void testMultipleNodesSingleRackRequest() throws Exception {
asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true));
scheduler.allocate(attemptId, asks, new ArrayList<ContainerId>(), null,
null, null, null);
null, NULL_UPDATE_REQUESTS);
// node 1 checks in
scheduler.update();
@ -3193,7 +3201,8 @@ public void testCancelStrictLocality() throws IOException {
createResourceRequest(1024, node1.getHostName(), 1, 0, true),
createResourceRequest(1024, "rack1", 1, 0, true),
createResourceRequest(1024, ResourceRequest.ANY, 1, 1, true));
scheduler.allocate(attId1, update, new ArrayList<ContainerId>(), null, null, null, null);
scheduler.allocate(attId1, update, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
// then node2 should get the container
scheduler.handle(node2UpdateEvent);
@ -4330,7 +4339,7 @@ public void testSchedulingOnRemovedNode() throws Exception {
ask1.add(request1);
scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null,
null, null, null);
null, NULL_UPDATE_REQUESTS);
String hostName = "127.0.0.1";
RMNode node1 = MockNodes.newNodeInfo(1,
@ -4406,11 +4415,11 @@ public void testBlacklistNodes() throws Exception {
// Verify the blacklist can be updated independent of requesting containers
scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(),
Collections.singletonList(host), null, null, null);
Collections.singletonList(host), null, NULL_UPDATE_REQUESTS);
assertTrue(app.isPlaceBlacklisted(host));
scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(), null,
Collections.singletonList(host), null, null);
Collections.singletonList(host), NULL_UPDATE_REQUESTS);
assertFalse(scheduler.getSchedulerApp(appAttemptId)
.isPlaceBlacklisted(host));
@ -4420,7 +4429,7 @@ public void testBlacklistNodes() throws Exception {
// Verify a container does not actually get placed on the blacklisted host
scheduler.allocate(appAttemptId, update,
Collections.<ContainerId>emptyList(),
Collections.singletonList(host), null, null, null);
Collections.singletonList(host), null, NULL_UPDATE_REQUESTS);
assertTrue(app.isPlaceBlacklisted(host));
scheduler.update();
scheduler.handle(updateEvent);
@ -4430,7 +4439,7 @@ public void testBlacklistNodes() throws Exception {
// Verify a container gets placed on the empty blacklist
scheduler.allocate(appAttemptId, update,
Collections.<ContainerId>emptyList(), null,
Collections.singletonList(host), null, null);
Collections.singletonList(host), NULL_UPDATE_REQUESTS);
assertFalse(app.isPlaceBlacklisted(host));
createSchedulingRequest(GB, "root.default", "user", 1);
scheduler.update();

View File

@ -83,6 +83,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
@ -119,7 +120,10 @@ public class TestFifoScheduler {
private static Configuration conf;
private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private final static ContainerUpdates NULL_UPDATE_REQUESTS =
new ContainerUpdates();
@Before
public void setUp() throws Exception {
conf = new Configuration();
@ -274,7 +278,8 @@ public void testNodeLocalAssignment() throws Exception {
ask.add(nodeLocal);
ask.add(rackLocal);
ask.add(any);
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null, null, null);
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0);
@ -368,7 +373,8 @@ public void testUpdateResourceOnNode() throws Exception {
ask.add(nodeLocal);
ask.add(rackLocal);
ask.add(any);
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null, null, null);
scheduler.allocate(appAttemptId, ask, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
// Before the node update event, there are one local request
Assert.assertEquals(1, nodeLocal.getNumContainers());
@ -944,7 +950,7 @@ public void testBlackListNodes() throws Exception {
ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1,
RMNodeLabelsManager.NO_LABEL));
fs.allocate(appAttemptId1, ask1, emptyId,
Collections.singletonList(host_1_0), null, null, null);
Collections.singletonList(host_1_0), null, NULL_UPDATE_REQUESTS);
// Trigger container assignment
fs.handle(new NodeUpdateSchedulerEvent(n3));
@ -952,14 +958,16 @@ public void testBlackListNodes() throws Exception {
// Get the allocation for the application and verify no allocation on
// blacklist node
Allocation allocation1 =
fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null);
fs.allocate(appAttemptId1, emptyAsk, emptyId,
null, null, NULL_UPDATE_REQUESTS);
Assert.assertEquals("allocation1", 0, allocation1.getContainers().size());
// verify host_1_1 can get allocated as not in blacklist
fs.handle(new NodeUpdateSchedulerEvent(n4));
Allocation allocation2 =
fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null);
fs.allocate(appAttemptId1, emptyAsk, emptyId,
null, null, NULL_UPDATE_REQUESTS);
Assert.assertEquals("allocation2", 1, allocation2.getContainers().size());
List<Container> containerList = allocation2.getContainers();
for (Container container : containerList) {
@ -974,29 +982,33 @@ public void testBlackListNodes() throws Exception {
ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
fs.allocate(appAttemptId1, ask2, emptyId,
Collections.singletonList("rack0"), null, null, null);
Collections.singletonList("rack0"), null, NULL_UPDATE_REQUESTS);
// verify n1 is not qualified to be allocated
fs.handle(new NodeUpdateSchedulerEvent(n1));
Allocation allocation3 =
fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null);
fs.allocate(appAttemptId1, emptyAsk, emptyId,
null, null, NULL_UPDATE_REQUESTS);
Assert.assertEquals("allocation3", 0, allocation3.getContainers().size());
// verify n2 is not qualified to be allocated
fs.handle(new NodeUpdateSchedulerEvent(n2));
Allocation allocation4 =
fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null);
fs.allocate(appAttemptId1, emptyAsk, emptyId,
null, null, NULL_UPDATE_REQUESTS);
Assert.assertEquals("allocation4", 0, allocation4.getContainers().size());
// verify n3 is not qualified to be allocated
fs.handle(new NodeUpdateSchedulerEvent(n3));
Allocation allocation5 =
fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null);
fs.allocate(appAttemptId1, emptyAsk, emptyId,
null, null, NULL_UPDATE_REQUESTS);
Assert.assertEquals("allocation5", 0, allocation5.getContainers().size());
fs.handle(new NodeUpdateSchedulerEvent(n4));
Allocation allocation6 =
fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null);
fs.allocate(appAttemptId1, emptyAsk, emptyId,
null, null, NULL_UPDATE_REQUESTS);
Assert.assertEquals("allocation6", 1, allocation6.getContainers().size());
containerList = allocation6.getContainers();
@ -1055,25 +1067,29 @@ public void testHeadroom() throws Exception {
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
fs.allocate(appAttemptId1, ask1, emptyId, null, null, null, null);
fs.allocate(appAttemptId1, ask1, emptyId,
null, null, NULL_UPDATE_REQUESTS);
// Ask for a 2 GB container for app 2
List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
ResourceRequest.ANY, BuilderUtils.newResource(2 * GB, 1), 1));
fs.allocate(appAttemptId2, ask2, emptyId, null, null, null, null);
fs.allocate(appAttemptId2, ask2, emptyId,
null, null, NULL_UPDATE_REQUESTS);
// Trigger container assignment
fs.handle(new NodeUpdateSchedulerEvent(n1));
// Get the allocation for the applications and verify headroom
Allocation allocation1 =
fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null, null);
fs.allocate(appAttemptId1, emptyAsk, emptyId,
null, null, NULL_UPDATE_REQUESTS);
Assert.assertEquals("Allocation headroom", 1 * GB, allocation1
.getResourceLimit().getMemorySize());
Allocation allocation2 =
fs.allocate(appAttemptId2, emptyAsk, emptyId, null, null, null, null);
fs.allocate(appAttemptId2, emptyAsk, emptyId,
null, null, NULL_UPDATE_REQUESTS);
Assert.assertEquals("Allocation headroom", 1 * GB, allocation2
.getResourceLimit().getMemorySize());