YARN-5938. Refactoring OpportunisticContainerAllocator to use SchedulerRequestKey instead of Priority and other misc fixes (asuresh)

This commit is contained in:
Arun Suresh 2016-12-27 11:54:57 -08:00
parent c3973e7080
commit ac1e5d4f77
39 changed files with 378 additions and 322 deletions

View File

@ -158,6 +158,17 @@ public abstract class UpdateContainerRequest extends AbstractResourceRequest {
return result;
}
@Override
public String toString() {
return "UpdateReq{" +
"containerId=" + getContainerId() + ", " +
"containerVersion=" + getContainerVersion() + ", " +
"targetExecType=" + getExecutionType() + ", " +
"targetCapability=" + getCapability() + ", " +
"updateType=" + getContainerUpdateType() + ", " +
"}";
}
@Override
public boolean equals(Object obj) {
if (this == obj) {

View File

@ -282,8 +282,8 @@ public class AllocateResponsePBImpl extends AllocateResponse {
final List<Container> containers) {
if (containers == null)
return;
// this looks like a bug because it results in append and not set
initLocalNewContainerList();
allocatedContainers.clear();
allocatedContainers.addAll(containers);
}
@ -299,6 +299,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
if (containers == null)
return;
initLocalUpdatedContainerList();
updatedContainers.clear();
updatedContainers.addAll(containers);
}
@ -315,6 +316,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
if (containers == null)
return;
initLocalFinishedContainerList();
completedContainersStatuses.clear();
completedContainersStatuses.addAll(containers);
}

View File

@ -87,4 +87,11 @@ public abstract class RemoteNode implements Comparable<RemoteNode> {
public int compareTo(RemoteNode other) {
return this.getNodeId().compareTo(other.getNodeId());
}
@Override
public String toString() {
return "RemoteNode{" +
"nodeId=" + getNodeId() + ", " +
"httpAddress=" + getHttpAddress() + "}";
}
}

View File

@ -22,8 +22,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@ -192,7 +199,8 @@ public class OpportunisticContainerAllocator {
/**
* Allocate OPPORTUNISTIC containers.
* @param request AllocateRequest
* @param blackList Resource BlackList Request
* @param oppResourceReqs Opportunistic Resource Requests
* @param applicationAttemptId ApplicationAttemptId
* @param opportContext App specific OpportunisticContainerContext
* @param rmIdentifier RM Identifier
@ -200,32 +208,24 @@ public class OpportunisticContainerAllocator {
* @return List of Containers.
* @throws YarnException YarnException
*/
public List<Container> allocateContainers(
AllocateRequest request, ApplicationAttemptId applicationAttemptId,
public List<Container> allocateContainers(ResourceBlacklistRequest blackList,
List<ResourceRequest> oppResourceReqs,
ApplicationAttemptId applicationAttemptId,
OpportunisticContainerContext opportContext, long rmIdentifier,
String appSubmitter) throws YarnException {
// Update released containers.
List<ContainerId> releasedContainers = request.getReleaseList();
int numReleasedContainers = releasedContainers.size();
if (numReleasedContainers > 0) {
LOG.info("AttemptID: " + applicationAttemptId + " released: "
+ numReleasedContainers);
opportContext.getContainersAllocated().removeAll(releasedContainers);
}
// Update black list.
ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
if (rbr != null) {
opportContext.getBlacklist().removeAll(rbr.getBlacklistRemovals());
opportContext.getBlacklist().addAll(rbr.getBlacklistAdditions());
if (blackList != null) {
opportContext.getBlacklist().removeAll(blackList.getBlacklistRemovals());
opportContext.getBlacklist().addAll(blackList.getBlacklistAdditions());
}
// Add OPPORTUNISTIC requests to the outstanding ones.
opportContext.addToOutstandingReqs(request.getAskList());
opportContext.addToOutstandingReqs(oppResourceReqs);
// Satisfy the outstanding OPPORTUNISTIC requests.
List<Container> allocatedContainers = new ArrayList<>();
for (Priority priority :
for (SchedulerRequestKey schedulerKey :
opportContext.getOutstandingOpReqs().descendingKeySet()) {
// Allocated containers :
// Key = Requested Capability,
@ -234,7 +234,7 @@ public class OpportunisticContainerAllocator {
// we need the requested capability (key) to match against
// the outstanding reqs)
Map<Resource, List<Container>> allocated = allocate(rmIdentifier,
opportContext, priority, applicationAttemptId, appSubmitter);
opportContext, schedulerKey, applicationAttemptId, appSubmitter);
for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
opportContext.matchAllocationToOutstandingRequest(
e.getKey(), e.getValue());
@ -246,19 +246,22 @@ public class OpportunisticContainerAllocator {
}
private Map<Resource, List<Container>> allocate(long rmIdentifier,
OpportunisticContainerContext appContext, Priority priority,
OpportunisticContainerContext appContext, SchedulerRequestKey schedKey,
ApplicationAttemptId appAttId, String userName) throws YarnException {
Map<Resource, List<Container>> containers = new HashMap<>();
for (ResourceRequest anyAsk :
appContext.getOutstandingOpReqs().get(priority).values()) {
appContext.getOutstandingOpReqs().get(schedKey).values()) {
allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
appContext.getContainerIdGenerator(), appContext.getBlacklist(),
appAttId, appContext.getNodeMap(), userName, containers, anyAsk);
LOG.info("Opportunistic allocation requested for ["
+ "priority=" + anyAsk.getPriority()
+ ", num_containers=" + anyAsk.getNumContainers()
+ ", capability=" + anyAsk.getCapability() + "]"
+ " allocated = " + containers.get(anyAsk.getCapability()).size());
if (!containers.isEmpty()) {
LOG.info("Opportunistic allocation requested for ["
+ "priority=" + anyAsk.getPriority()
+ ", allocationRequestId=" + anyAsk.getAllocationRequestId()
+ ", num_containers=" + anyAsk.getNumContainers()
+ ", capability=" + anyAsk.getCapability() + "]"
+ " allocated = " + containers.keySet());
}
}
return containers;
}
@ -282,7 +285,9 @@ public class OpportunisticContainerAllocator {
nodesForScheduling.add(nodeEntry.getValue());
}
if (nodesForScheduling.isEmpty()) {
LOG.warn("No nodes available for allocating opportunistic containers.");
LOG.warn("No nodes available for allocating opportunistic containers. [" +
"allNodes=" + allNodes + ", " +
"blacklist=" + blacklist + "]");
return;
}
int numAllocated = 0;

View File

@ -18,12 +18,7 @@
package org.apache.hadoop.yarn.server.scheduler;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
@ -52,9 +47,6 @@ public class OpportunisticContainerContext {
private static final Logger LOG = LoggerFactory
.getLogger(OpportunisticContainerContext.class);
// Currently just used to keep track of allocated containers.
// Can be used for reporting stats later.
private Set<ContainerId> containersAllocated = new HashSet<>();
private AllocationParams appParams =
new AllocationParams();
private ContainerIdGenerator containerIdGenerator =
@ -69,13 +61,9 @@ public class OpportunisticContainerContext {
// Resource Name (host/rack/any) and capability. This mapping is required
// to match a received Container to an outstanding OPPORTUNISTIC
// ResourceRequest (ask).
private final TreeMap<Priority, Map<Resource, ResourceRequest>>
private final TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>>
outstandingOpReqs = new TreeMap<>();
public Set<ContainerId> getContainersAllocated() {
return containersAllocated;
}
public AllocationParams getAppParams() {
return appParams;
}
@ -119,20 +107,11 @@ public class OpportunisticContainerContext {
return blacklist;
}
public TreeMap<Priority, Map<Resource, ResourceRequest>>
public TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>>
getOutstandingOpReqs() {
return outstandingOpReqs;
}
public void updateCompletedContainers(AllocateResponse allocateResponse) {
for (ContainerStatus cs :
allocateResponse.getCompletedContainersStatuses()) {
if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
containersAllocated.remove(cs.getContainerId());
}
}
}
/**
* Takes a list of ResourceRequests (asks), extracts the key information viz.
* (Priority, ResourceName, Capability) and adds to the outstanding
@ -144,7 +123,7 @@ public class OpportunisticContainerContext {
*/
public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
for (ResourceRequest request : resourceAsks) {
Priority priority = request.getPriority();
SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
// TODO: Extend for Node/Rack locality. We only handle ANY requests now
if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
@ -156,10 +135,10 @@ public class OpportunisticContainerContext {
}
Map<Resource, ResourceRequest> reqMap =
outstandingOpReqs.get(priority);
outstandingOpReqs.get(schedulerKey);
if (reqMap == null) {
reqMap = new HashMap<>();
outstandingOpReqs.put(priority, reqMap);
outstandingOpReqs.put(schedulerKey, reqMap);
}
ResourceRequest resourceRequest = reqMap.get(request.getCapability());
@ -171,7 +150,8 @@ public class OpportunisticContainerContext {
resourceRequest.getNumContainers() + request.getNumContainers());
}
if (ResourceRequest.isAnyLocation(request.getResourceName())) {
LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority
LOG.info("# of outstandingOpReqs in ANY (at" +
"priority = "+ schedulerKey.getPriority()
+ ", with capability = " + request.getCapability() + " ) : "
+ resourceRequest.getNumContainers());
}
@ -187,9 +167,9 @@ public class OpportunisticContainerContext {
public void matchAllocationToOutstandingRequest(Resource capability,
List<Container> allocatedContainers) {
for (Container c : allocatedContainers) {
containersAllocated.add(c.getId());
SchedulerRequestKey schedulerKey = SchedulerRequestKey.extractFrom(c);
Map<Resource, ResourceRequest> asks =
outstandingOpReqs.get(c.getPriority());
outstandingOpReqs.get(schedulerKey);
if (asks == null) {
continue;

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
package org.apache.hadoop.yarn.server.scheduler;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
@ -53,7 +53,7 @@ public final class SchedulerRequestKey implements
container.getAllocationRequestId());
}
private SchedulerRequestKey(Priority priority, long allocationRequestId) {
SchedulerRequestKey(Priority priority, long allocationRequestId) {
this.priority = priority;
this.allocationRequestId = allocationRequestId;
}
@ -119,4 +119,12 @@ public final class SchedulerRequestKey implements
getAllocationRequestId() >>> 32));
return result;
}
@Override
public String toString() {
return "SchedulerRequestKey{" +
"priority=" + priority +
", allocationRequestId=" + allocationRequestId +
'}';
}
}

View File

@ -227,10 +227,10 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
.partitionAskList(request.getAllocateRequest().getAskList());
// Allocate OPPORTUNISTIC containers.
request.getAllocateRequest().setAskList(partitionedAsks.getOpportunistic());
List<Container> allocatedContainers =
containerAllocator.allocateContainers(
request.getAllocateRequest(), applicationAttemptId,
request.getAllocateRequest().getResourceBlacklistRequest(),
partitionedAsks.getOpportunistic(), applicationAttemptId,
oppContainerContext, rmIdentifier, appSubmitter);
// Prepare request for sending to RM for scheduling GUARANTEED containers.
@ -252,18 +252,11 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
nodeTokens.put(nmToken.getNodeId(), nmToken);
}
oppContainerContext.updateCompletedContainers(dsResp.getAllocateResponse());
// Check if we have NM tokens for all the allocated containers. If not
// generate one and update the response.
updateAllocateResponse(
dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
if (LOG.isDebugEnabled()) {
LOG.debug("Number of opportunistic containers currently" +
"allocated by application: " + oppContainerContext
.getContainersAllocated().size());
}
return dsResp;
}
}

View File

@ -405,7 +405,6 @@ public class ApplicationMasterService extends AbstractService implements
ApplicationAttemptId appAttemptId =
amrmTokenIdentifier.getApplicationAttemptId();
ApplicationId applicationId = appAttemptId.getApplicationId();
this.amLivelinessMonitor.receivedPing(appAttemptId);
@ -422,8 +421,10 @@ public class ApplicationMasterService extends AbstractService implements
AllocateResponse lastResponse = lock.getAllocateResponse();
if (!hasApplicationMasterRegistered(appAttemptId)) {
String message =
"AM is not registered for known application attempt: " + appAttemptId
+ " or RM had restarted after AM registered . AM should re-register.";
"AM is not registered for known application attempt: "
+ appAttemptId
+ " or RM had restarted after AM registered. "
+ " AM should re-register.";
throw new ApplicationMasterNotRegisteredException(message);
}
@ -438,185 +439,10 @@ public class ApplicationMasterService extends AbstractService implements
throw new InvalidApplicationMasterRequestException(message);
}
//filter illegal progress values
float filteredProgress = request.getProgress();
if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY
|| filteredProgress < 0) {
request.setProgress(0);
} else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) {
request.setProgress(1);
}
// Send the status update to the appAttempt.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptStatusupdateEvent(appAttemptId, request
.getProgress()));
List<ResourceRequest> ask = request.getAskList();
List<ContainerId> release = request.getReleaseList();
ResourceBlacklistRequest blacklistRequest =
request.getResourceBlacklistRequest();
List<String> blacklistAdditions =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
List<String> blacklistRemovals =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
RMApp app =
this.rmContext.getRMApps().get(applicationId);
// set label expression for Resource Requests if resourceName=ANY
ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
for (ResourceRequest req : ask) {
if (null == req.getNodeLabelExpression()
&& ResourceRequest.ANY.equals(req.getResourceName())) {
req.setNodeLabelExpression(asc.getNodeLabelExpression());
}
}
Resource maximumCapacity = rScheduler.getMaximumResourceCapability();
// sanity check
try {
RMServerUtils.normalizeAndValidateRequests(ask,
maximumCapacity, app.getQueue(),
rScheduler, rmContext);
} catch (InvalidResourceRequestException e) {
LOG.warn("Invalid resource ask by application " + appAttemptId, e);
throw e;
}
try {
RMServerUtils.validateBlacklistRequest(blacklistRequest);
} catch (InvalidResourceBlacklistRequestException e) {
LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
throw e;
}
// In the case of work-preserving AM restart, it's possible for the
// AM to release containers from the earlier attempt.
if (!app.getApplicationSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
try {
RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
} catch (InvalidContainerReleaseException e) {
LOG.warn("Invalid container release by application " + appAttemptId,
e);
throw e;
}
}
// Split Update Resource Requests into increase and decrease.
// No Exceptions are thrown here. All update errors are aggregated
// and returned to the AM.
List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>();
List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>();
List<UpdateContainerError> updateContainerErrors =
RMServerUtils.validateAndSplitUpdateResourceRequests(rmContext,
request, maximumCapacity, increaseResourceReqs,
decreaseResourceReqs);
// Send new requests to appAttempt.
Allocation allocation;
RMAppAttemptState state =
app.getRMAppAttempt(appAttemptId).getAppAttemptState();
if (state.equals(RMAppAttemptState.FINAL_SAVING) ||
state.equals(RMAppAttemptState.FINISHING) ||
app.isAppFinalStateStored()) {
LOG.warn(appAttemptId + " is in " + state +
" state, ignore container allocate request.");
allocation = EMPTY_ALLOCATION;
} else {
allocation =
this.rScheduler.allocate(appAttemptId, ask, release,
blacklistAdditions, blacklistRemovals,
increaseResourceReqs, decreaseResourceReqs);
}
if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
LOG.info("blacklist are updated in Scheduler." +
"blacklistAdditions: " + blacklistAdditions + ", " +
"blacklistRemovals: " + blacklistRemovals);
}
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
AllocateResponse allocateResponse =
AllocateResponse response =
recordFactory.newRecordInstance(AllocateResponse.class);
if (allocation.getNMTokens() != null &&
!allocation.getNMTokens().isEmpty()) {
allocateResponse.setNMTokens(allocation.getNMTokens());
}
// Notify the AM of container update errors
if (!updateContainerErrors.isEmpty()) {
allocateResponse.setUpdateErrors(updateContainerErrors);
}
// update the response with the deltas of node status changes
List<RMNode> updatedNodes = new ArrayList<RMNode>();
if(app.pullRMNodeUpdates(updatedNodes) > 0) {
List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>();
for(RMNode rmNode: updatedNodes) {
SchedulerNodeReport schedulerNodeReport =
rScheduler.getNodeReport(rmNode.getNodeID());
Resource used = BuilderUtils.newResource(0, 0);
int numContainers = 0;
if (schedulerNodeReport != null) {
used = schedulerNodeReport.getUsedResource();
numContainers = schedulerNodeReport.getNumContainers();
}
NodeId nodeId = rmNode.getNodeID();
NodeReport report =
BuilderUtils.newNodeReport(nodeId, rmNode.getState(),
rmNode.getHttpAddress(), rmNode.getRackName(), used,
rmNode.getTotalCapability(), numContainers,
rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
rmNode.getNodeLabels());
updatedNodeReports.add(report);
}
allocateResponse.setUpdatedNodes(updatedNodeReports);
}
allocateResponse.setAllocatedContainers(allocation.getContainers());
allocateResponse.setCompletedContainersStatuses(appAttempt
.pullJustFinishedContainers());
allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
allocateResponse.setAvailableResources(allocation.getResourceLimit());
// Handling increased/decreased containers
List<UpdatedContainer> updatedContainers = new ArrayList<>();
if (allocation.getIncreasedContainers() != null) {
for (Container c : allocation.getIncreasedContainers()) {
updatedContainers.add(
UpdatedContainer.newInstance(
ContainerUpdateType.INCREASE_RESOURCE, c));
}
}
if (allocation.getDecreasedContainers() != null) {
for (Container c : allocation.getDecreasedContainers()) {
updatedContainers.add(
UpdatedContainer.newInstance(
ContainerUpdateType.DECREASE_RESOURCE, c));
}
}
allocateResponse.setUpdatedContainers(updatedContainers);
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
// add collector address for this application
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
allocateResponse.setCollectorAddr(
this.rmContext.getRMApps().get(applicationId).getCollectorAddr());
}
// add preemption to the allocateResponse message (if any)
allocateResponse
.setPreemptionMessage(generatePreemptionMessage(allocation));
// Set application priority
allocateResponse.setApplicationPriority(app
.getApplicationPriority());
allocateInternal(amrmTokenIdentifier.getApplicationAttemptId(),
request, response);
// update AMRMToken if the token is rolled-up
MasterKeyData nextMasterKey =
@ -624,21 +450,24 @@ public class ApplicationMasterService extends AbstractService implements
if (nextMasterKey != null
&& nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
.getKeyId()) {
.getKeyId()) {
RMApp app =
this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
RMAppAttemptImpl appAttemptImpl = (RMAppAttemptImpl)appAttempt;
Token<AMRMTokenIdentifier> amrmToken = appAttempt.getAMRMToken();
if (nextMasterKey.getMasterKey().getKeyId() !=
appAttemptImpl.getAMRMTokenKeyId()) {
LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back"
+ " to application: " + applicationId);
+ " to application: " + appAttemptId.getApplicationId());
amrmToken = rmContext.getAMRMTokenSecretManager()
.createAndGetAMRMToken(appAttemptId);
appAttemptImpl.setAMRMToken(amrmToken);
}
allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
.newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
.toString(), amrmToken.getPassword(), amrmToken.getService()
.toString()));
response.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
.newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
.toString(), amrmToken.getPassword(), amrmToken.getService()
.toString()));
}
/*
@ -646,11 +475,227 @@ public class ApplicationMasterService extends AbstractService implements
* need to worry about unregister call occurring in between (which
* removes the lock object).
*/
lock.setAllocateResponse(allocateResponse);
return allocateResponse;
response.setResponseId(lastResponse.getResponseId() + 1);
lock.setAllocateResponse(response);
return response;
}
}
protected void allocateInternal(ApplicationAttemptId appAttemptId,
AllocateRequest request, AllocateResponse allocateResponse)
throws YarnException {
//filter illegal progress values
float filteredProgress = request.getProgress();
if (Float.isNaN(filteredProgress) ||
filteredProgress == Float.NEGATIVE_INFINITY ||
filteredProgress < 0) {
request.setProgress(0);
} else if (filteredProgress > 1 ||
filteredProgress == Float.POSITIVE_INFINITY) {
request.setProgress(1);
}
// Send the status update to the appAttempt.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptStatusupdateEvent(appAttemptId, request
.getProgress()));
List<ResourceRequest> ask = request.getAskList();
List<ContainerId> release = request.getReleaseList();
ResourceBlacklistRequest blacklistRequest =
request.getResourceBlacklistRequest();
List<String> blacklistAdditions =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
List<String> blacklistRemovals =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
RMApp app =
this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
// set label expression for Resource Requests if resourceName=ANY
ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
for (ResourceRequest req : ask) {
if (null == req.getNodeLabelExpression()
&& ResourceRequest.ANY.equals(req.getResourceName())) {
req.setNodeLabelExpression(asc.getNodeLabelExpression());
}
}
Resource maximumCapacity = rScheduler.getMaximumResourceCapability();
// sanity check
try {
RMServerUtils.normalizeAndValidateRequests(ask,
maximumCapacity, app.getQueue(),
rScheduler, rmContext);
} catch (InvalidResourceRequestException e) {
LOG.warn("Invalid resource ask by application " + appAttemptId, e);
throw e;
}
try {
RMServerUtils.validateBlacklistRequest(blacklistRequest);
} catch (InvalidResourceBlacklistRequestException e) {
LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
throw e;
}
// In the case of work-preserving AM restart, it's possible for the
// AM to release containers from the earlier attempt.
if (!app.getApplicationSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
try {
RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
} catch (InvalidContainerReleaseException e) {
LOG.warn("Invalid container release by application " + appAttemptId,
e);
throw e;
}
}
// Split Update Resource Requests into increase and decrease.
// No Exceptions are thrown here. All update errors are aggregated
// and returned to the AM.
List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>();
List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>();
List<UpdateContainerError> updateContainerErrors =
RMServerUtils.validateAndSplitUpdateResourceRequests(
rmContext, request, maximumCapacity,
increaseResourceReqs, decreaseResourceReqs);
// Send new requests to appAttempt.
Allocation allocation;
RMAppAttemptState state =
app.getRMAppAttempt(appAttemptId).getAppAttemptState();
if (state.equals(RMAppAttemptState.FINAL_SAVING) ||
state.equals(RMAppAttemptState.FINISHING) ||
app.isAppFinalStateStored()) {
LOG.warn(appAttemptId + " is in " + state +
" state, ignore container allocate request.");
allocation = EMPTY_ALLOCATION;
} else {
allocation =
this.rScheduler.allocate(appAttemptId, ask, release,
blacklistAdditions, blacklistRemovals,
increaseResourceReqs, decreaseResourceReqs);
}
if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
LOG.info("blacklist are updated in Scheduler." +
"blacklistAdditions: " + blacklistAdditions + ", " +
"blacklistRemovals: " + blacklistRemovals);
}
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
if (allocation.getNMTokens() != null &&
!allocation.getNMTokens().isEmpty()) {
allocateResponse.setNMTokens(allocation.getNMTokens());
}
// Notify the AM of container update errors
addToUpdateContainerErrors(allocateResponse, updateContainerErrors);
// update the response with the deltas of node status changes
List<RMNode> updatedNodes = new ArrayList<RMNode>();
if(app.pullRMNodeUpdates(updatedNodes) > 0) {
List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>();
for(RMNode rmNode: updatedNodes) {
SchedulerNodeReport schedulerNodeReport =
rScheduler.getNodeReport(rmNode.getNodeID());
Resource used = BuilderUtils.newResource(0, 0);
int numContainers = 0;
if (schedulerNodeReport != null) {
used = schedulerNodeReport.getUsedResource();
numContainers = schedulerNodeReport.getNumContainers();
}
NodeId nodeId = rmNode.getNodeID();
NodeReport report =
BuilderUtils.newNodeReport(nodeId, rmNode.getState(),
rmNode.getHttpAddress(), rmNode.getRackName(), used,
rmNode.getTotalCapability(), numContainers,
rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
rmNode.getNodeLabels());
updatedNodeReports.add(report);
}
allocateResponse.setUpdatedNodes(updatedNodeReports);
}
addToAllocatedContainers(allocateResponse, allocation.getContainers());
allocateResponse.setCompletedContainersStatuses(appAttempt
.pullJustFinishedContainers());
allocateResponse.setAvailableResources(allocation.getResourceLimit());
// Handling increased containers
addToUpdatedContainers(
allocateResponse, ContainerUpdateType.INCREASE_RESOURCE,
allocation.getIncreasedContainers());
// Handling decreased containers
addToUpdatedContainers(
allocateResponse, ContainerUpdateType.DECREASE_RESOURCE,
allocation.getDecreasedContainers());
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
// add collector address for this application
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
allocateResponse.setCollectorAddr(
this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
.getCollectorAddr());
}
// add preemption to the allocateResponse message (if any)
allocateResponse
.setPreemptionMessage(generatePreemptionMessage(allocation));
// Set application priority
allocateResponse.setApplicationPriority(app
.getApplicationPriority());
}
protected void addToUpdateContainerErrors(AllocateResponse allocateResponse,
List<UpdateContainerError> updateContainerErrors) {
if (!updateContainerErrors.isEmpty()) {
if (allocateResponse.getUpdateErrors() != null
&& !allocateResponse.getUpdateErrors().isEmpty()) {
updateContainerErrors = new ArrayList<>(updateContainerErrors);
updateContainerErrors.addAll(allocateResponse.getUpdateErrors());
}
allocateResponse.setUpdateErrors(updateContainerErrors);
}
}
protected void addToUpdatedContainers(AllocateResponse allocateResponse,
ContainerUpdateType updateType, List<Container> updatedContainers) {
if (updatedContainers != null && updatedContainers.size() > 0) {
ArrayList<UpdatedContainer> containersToSet = new ArrayList<>();
if (allocateResponse.getUpdatedContainers() != null &&
!allocateResponse.getUpdatedContainers().isEmpty()) {
containersToSet.addAll(allocateResponse.getUpdatedContainers());
}
for (Container updatedContainer : updatedContainers) {
containersToSet.add(
UpdatedContainer.newInstance(updateType, updatedContainer));
}
allocateResponse.setUpdatedContainers(containersToSet);
}
}
protected void addToAllocatedContainers(AllocateResponse allocateResponse,
List<Container> allocatedContainers) {
if (allocateResponse.getAllocatedContainers() != null
&& !allocateResponse.getAllocatedContainers().isEmpty()) {
allocatedContainers = new ArrayList<>(allocatedContainers);
allocatedContainers.addAll(allocateResponse.getAllocatedContainers());
}
allocateResponse.setAllocatedContainers(allocatedContainers);
}
private PreemptionMessage generatePreemptionMessage(Allocation allocation){
PreemptionMessage pMsg = null;
// assemble strict preemption request

View File

@ -101,8 +101,8 @@ public class OpportunisticContainerAllocatorAMService
private final int k;
private final long cacheRefreshInterval;
private List<RemoteNode> cachedNodes;
private long lastCacheUpdateTime;
private volatile List<RemoteNode> cachedNodes;
private volatile long lastCacheUpdateTime;
public OpportunisticContainerAllocatorAMService(RMContext rmContext,
YarnScheduler scheduler) {
@ -218,8 +218,9 @@ public class OpportunisticContainerAllocatorAMService
}
@Override
public AllocateResponse allocate(AllocateRequest request) throws
YarnException, IOException {
protected void allocateInternal(ApplicationAttemptId appAttemptId,
AllocateRequest request, AllocateResponse allocateResponse)
throws YarnException {
// Partition requests to GUARANTEED and OPPORTUNISTIC.
OpportunisticContainerAllocator.PartitionedResourceRequests
@ -227,40 +228,30 @@ public class OpportunisticContainerAllocatorAMService
oppContainerAllocator.partitionAskList(request.getAskList());
// Allocate OPPORTUNISTIC containers.
request.setAskList(partitionedAsks.getOpportunistic());
final ApplicationAttemptId appAttemptId = getAppAttemptId();
SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
rmContext.getScheduler()).getApplicationAttempt(appAttemptId);
SchedulerApplicationAttempt appAttempt =
((AbstractYarnScheduler)rmContext.getScheduler())
.getApplicationAttempt(appAttemptId);
OpportunisticContainerContext oppCtx =
appAttempt.getOpportunisticContainerContext();
oppCtx.updateNodeList(getLeastLoadedNodes());
List<Container> oppContainers =
oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx,
ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
oppContainerAllocator.allocateContainers(
request.getResourceBlacklistRequest(),
partitionedAsks.getOpportunistic(), appAttemptId, oppCtx,
ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
// Create RMContainers and update the NMTokens.
if (!oppContainers.isEmpty()) {
handleNewContainers(oppContainers, false);
appAttempt.updateNMTokens(oppContainers);
addToAllocatedContainers(allocateResponse, oppContainers);
}
// Allocate GUARANTEED containers.
request.setAskList(partitionedAsks.getGuaranteed());
AllocateResponse allocateResp = super.allocate(request);
// Add allocated OPPORTUNISTIC containers to the AllocateResponse.
if (!oppContainers.isEmpty()) {
allocateResp.getAllocatedContainers().addAll(oppContainers);
}
// Update opportunistic container context with the allocated GUARANTEED
// containers.
oppCtx.updateCompletedContainers(allocateResp);
// Add all opportunistic containers
return allocateResp;
super.allocateInternal(appAttemptId, request, allocateResponse);
}
@Override
@ -304,7 +295,7 @@ public class OpportunisticContainerAllocatorAMService
}
private void handleNewContainers(List<Container> allocContainers,
boolean isRemotelyAllocated) {
boolean isRemotelyAllocated) {
for (Container container : allocContainers) {
// Create RMContainer
SchedulerApplicationAttempt appAttempt =
@ -387,10 +378,12 @@ public class OpportunisticContainerAllocatorAMService
private synchronized List<RemoteNode> getLeastLoadedNodes() {
long currTime = System.currentTimeMillis();
if ((currTime - lastCacheUpdateTime > cacheRefreshInterval)
|| cachedNodes == null) {
|| (cachedNodes == null)) {
cachedNodes = convertToRemoteNodes(
this.nodeMonitor.selectLeastLoadedNodes(this.k));
lastCacheUpdateTime = currTime;
if (cachedNodes.size() > 0) {
lastCacheUpdateTime = currTime;
}
}
return cachedNodes;
}

View File

@ -25,14 +25,13 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
/**

View File

@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
.RMNodeDecreaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
/**
* The event signifying that a container has been reserved.

View File

@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@ -965,6 +967,7 @@ public class AppSchedulingInfo {
public ResourceRequest cloneResourceRequest(ResourceRequest request) {
ResourceRequest newRequest = ResourceRequest.newBuilder()
.priority(request.getPriority())
.allocationRequestId(request.getAllocationRequestId())
.resourceName(request.getResourceName())
.capability(request.getCapability())
.numContainers(1)

View File

@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -73,12 +72,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.ImmutableSet;

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@ -2204,7 +2205,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
if (application != null && rmContainer != null
&& rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, application, rmContainer.getContainer()
@ -2222,7 +2224,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public void detachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
if (application != null && rmContainer != null
&& rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
releaseResource(clusterResource, application, rmContainer.getContainer()

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;

View File

@ -22,7 +22,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
/**
* Contexts for a container inside scheduler

View File

@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
@ -69,6 +68,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@ -61,9 +60,9 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
@Override
public int compare(ClusterNode o1, ClusterNode o2) {
if (getMetric(o1) == getMetric(o2)) {
return o1.timestamp < o2.timestamp ? +1 : -1;
return (int)(o2.timestamp - o1.timestamp);
}
return getMetric(o1) > getMetric(o2) ? +1 : -1;
return getMetric(o1) - getMetric(o2);
}
public int getMetric(ClusterNode c) {
@ -115,8 +114,13 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
ReentrantReadWriteLock.WriteLock writeLock = sortedNodesLock.writeLock();
writeLock.lock();
try {
sortedNodes.clear();
sortedNodes.addAll(sortNodes());
try {
List<NodeId> nodeIds = sortNodes();
sortedNodes.clear();
sortedNodes.addAll(nodeIds);
} catch (Exception ex) {
LOG.warn("Got Exception while sorting nodes..", ex);
}
if (thresholdCalculator != null) {
thresholdCalculator.update();
}
@ -273,7 +277,7 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
List<NodeId> retVal = ((k < this.sortedNodes.size()) && (k >= 0)) ?
new ArrayList<>(this.sortedNodes).subList(0, k) :
new ArrayList<>(this.sortedNodes);
return Collections.unmodifiableList(retVal);
return retVal;
} finally {
readLock.unlock();
}

View File

@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;

View File

@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import java.util.Collection;

View File

@ -33,10 +33,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import java.util.List;
public class FifoAppAttempt extends FiCaSchedulerApp {

View File

@ -77,7 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerCha
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -709,7 +709,7 @@ public class FifoScheduler extends
// Inform the application
RMContainer rmContainer = application.allocate(type, node, schedulerKey,
request, container);
// Inform the node
node.allocateContainer(rmContainer);

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import java.util.Iterator;
import java.util.List;

View File

@ -61,7 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.TestUtils;

View File

@ -36,8 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;

View File

@ -36,8 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.junit.Assert;
import org.junit.Test;

View File

@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.junit.After;
import org.junit.Test;

View File

@ -120,7 +120,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;

View File

@ -75,7 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;

View File

@ -60,7 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;

View File

@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.TestUtils;