YARN-5938. Refactoring OpportunisticContainerAllocator to use SchedulerRequestKey instead of Priority and other misc fixes (asuresh)

(cherry picked from commit ac1e5d4f77)
This commit is contained in:
Arun Suresh 2016-12-27 11:54:57 -08:00
parent 2b4d3e8506
commit 81da7d1d30
41 changed files with 382 additions and 323 deletions

View File

@ -158,6 +158,17 @@ public int hashCode() {
return result; return result;
} }
@Override
public String toString() {
return "UpdateReq{" +
"containerId=" + getContainerId() + ", " +
"containerVersion=" + getContainerVersion() + ", " +
"targetExecType=" + getExecutionType() + ", " +
"targetCapability=" + getCapability() + ", " +
"updateType=" + getContainerUpdateType() + ", " +
"}";
}
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (this == obj) { if (this == obj) {

View File

@ -282,8 +282,8 @@ public synchronized void setAllocatedContainers(
final List<Container> containers) { final List<Container> containers) {
if (containers == null) if (containers == null)
return; return;
// this looks like a bug because it results in append and not set
initLocalNewContainerList(); initLocalNewContainerList();
allocatedContainers.clear();
allocatedContainers.addAll(containers); allocatedContainers.addAll(containers);
} }
@ -299,6 +299,7 @@ public synchronized void setUpdatedContainers(
if (containers == null) if (containers == null)
return; return;
initLocalUpdatedContainerList(); initLocalUpdatedContainerList();
updatedContainers.clear();
updatedContainers.addAll(containers); updatedContainers.addAll(containers);
} }
@ -315,6 +316,7 @@ public synchronized void setCompletedContainersStatuses(
if (containers == null) if (containers == null)
return; return;
initLocalFinishedContainerList(); initLocalFinishedContainerList();
completedContainersStatuses.clear();
completedContainersStatuses.addAll(containers); completedContainersStatuses.addAll(containers);
} }

View File

@ -87,4 +87,11 @@ public static RemoteNode newInstance(NodeId nodeId, String httpAddress) {
public int compareTo(RemoteNode other) { public int compareTo(RemoteNode other) {
return this.getNodeId().compareTo(other.getNodeId()); return this.getNodeId().compareTo(other.getNodeId());
} }
@Override
public String toString() {
return "RemoteNode{" +
"nodeId=" + getNodeId() + ", " +
"httpAddress=" + getHttpAddress() + "}";
}
} }

View File

@ -22,8 +22,15 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@ -192,7 +199,8 @@ public OpportunisticContainerAllocator(
/** /**
* Allocate OPPORTUNISTIC containers. * Allocate OPPORTUNISTIC containers.
* @param request AllocateRequest * @param blackList Resource BlackList Request
* @param oppResourceReqs Opportunistic Resource Requests
* @param applicationAttemptId ApplicationAttemptId * @param applicationAttemptId ApplicationAttemptId
* @param opportContext App specific OpportunisticContainerContext * @param opportContext App specific OpportunisticContainerContext
* @param rmIdentifier RM Identifier * @param rmIdentifier RM Identifier
@ -200,32 +208,24 @@ public OpportunisticContainerAllocator(
* @return List of Containers. * @return List of Containers.
* @throws YarnException YarnException * @throws YarnException YarnException
*/ */
public List<Container> allocateContainers( public List<Container> allocateContainers(ResourceBlacklistRequest blackList,
AllocateRequest request, ApplicationAttemptId applicationAttemptId, List<ResourceRequest> oppResourceReqs,
ApplicationAttemptId applicationAttemptId,
OpportunisticContainerContext opportContext, long rmIdentifier, OpportunisticContainerContext opportContext, long rmIdentifier,
String appSubmitter) throws YarnException { String appSubmitter) throws YarnException {
// Update released containers.
List<ContainerId> releasedContainers = request.getReleaseList();
int numReleasedContainers = releasedContainers.size();
if (numReleasedContainers > 0) {
LOG.info("AttemptID: " + applicationAttemptId + " released: "
+ numReleasedContainers);
opportContext.getContainersAllocated().removeAll(releasedContainers);
}
// Update black list. // Update black list.
ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest(); if (blackList != null) {
if (rbr != null) { opportContext.getBlacklist().removeAll(blackList.getBlacklistRemovals());
opportContext.getBlacklist().removeAll(rbr.getBlacklistRemovals()); opportContext.getBlacklist().addAll(blackList.getBlacklistAdditions());
opportContext.getBlacklist().addAll(rbr.getBlacklistAdditions());
} }
// Add OPPORTUNISTIC requests to the outstanding ones. // Add OPPORTUNISTIC requests to the outstanding ones.
opportContext.addToOutstandingReqs(request.getAskList()); opportContext.addToOutstandingReqs(oppResourceReqs);
// Satisfy the outstanding OPPORTUNISTIC requests. // Satisfy the outstanding OPPORTUNISTIC requests.
List<Container> allocatedContainers = new ArrayList<>(); List<Container> allocatedContainers = new ArrayList<>();
for (Priority priority : for (SchedulerRequestKey schedulerKey :
opportContext.getOutstandingOpReqs().descendingKeySet()) { opportContext.getOutstandingOpReqs().descendingKeySet()) {
// Allocated containers : // Allocated containers :
// Key = Requested Capability, // Key = Requested Capability,
@ -234,7 +234,7 @@ public List<Container> allocateContainers(
// we need the requested capability (key) to match against // we need the requested capability (key) to match against
// the outstanding reqs) // the outstanding reqs)
Map<Resource, List<Container>> allocated = allocate(rmIdentifier, Map<Resource, List<Container>> allocated = allocate(rmIdentifier,
opportContext, priority, applicationAttemptId, appSubmitter); opportContext, schedulerKey, applicationAttemptId, appSubmitter);
for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) { for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
opportContext.matchAllocationToOutstandingRequest( opportContext.matchAllocationToOutstandingRequest(
e.getKey(), e.getValue()); e.getKey(), e.getValue());
@ -246,19 +246,22 @@ public List<Container> allocateContainers(
} }
private Map<Resource, List<Container>> allocate(long rmIdentifier, private Map<Resource, List<Container>> allocate(long rmIdentifier,
OpportunisticContainerContext appContext, Priority priority, OpportunisticContainerContext appContext, SchedulerRequestKey schedKey,
ApplicationAttemptId appAttId, String userName) throws YarnException { ApplicationAttemptId appAttId, String userName) throws YarnException {
Map<Resource, List<Container>> containers = new HashMap<>(); Map<Resource, List<Container>> containers = new HashMap<>();
for (ResourceRequest anyAsk : for (ResourceRequest anyAsk :
appContext.getOutstandingOpReqs().get(priority).values()) { appContext.getOutstandingOpReqs().get(schedKey).values()) {
allocateContainersInternal(rmIdentifier, appContext.getAppParams(), allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
appContext.getContainerIdGenerator(), appContext.getBlacklist(), appContext.getContainerIdGenerator(), appContext.getBlacklist(),
appAttId, appContext.getNodeMap(), userName, containers, anyAsk); appAttId, appContext.getNodeMap(), userName, containers, anyAsk);
LOG.info("Opportunistic allocation requested for [" if (!containers.isEmpty()) {
+ "priority=" + anyAsk.getPriority() LOG.info("Opportunistic allocation requested for ["
+ ", num_containers=" + anyAsk.getNumContainers() + "priority=" + anyAsk.getPriority()
+ ", capability=" + anyAsk.getCapability() + "]" + ", allocationRequestId=" + anyAsk.getAllocationRequestId()
+ " allocated = " + containers.get(anyAsk.getCapability()).size()); + ", num_containers=" + anyAsk.getNumContainers()
+ ", capability=" + anyAsk.getCapability() + "]"
+ " allocated = " + containers.keySet());
}
} }
return containers; return containers;
} }
@ -282,7 +285,9 @@ private void allocateContainersInternal(long rmIdentifier,
nodesForScheduling.add(nodeEntry.getValue()); nodesForScheduling.add(nodeEntry.getValue());
} }
if (nodesForScheduling.isEmpty()) { if (nodesForScheduling.isEmpty()) {
LOG.warn("No nodes available for allocating opportunistic containers."); LOG.warn("No nodes available for allocating opportunistic containers. [" +
"allNodes=" + allNodes + ", " +
"blacklist=" + blacklist + "]");
return; return;
} }
int numAllocated = 0; int numAllocated = 0;

View File

@ -18,12 +18,7 @@
package org.apache.hadoop.yarn.server.scheduler; package org.apache.hadoop.yarn.server.scheduler;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
@ -52,9 +47,6 @@ public class OpportunisticContainerContext {
private static final Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory
.getLogger(OpportunisticContainerContext.class); .getLogger(OpportunisticContainerContext.class);
// Currently just used to keep track of allocated containers.
// Can be used for reporting stats later.
private Set<ContainerId> containersAllocated = new HashSet<>();
private AllocationParams appParams = private AllocationParams appParams =
new AllocationParams(); new AllocationParams();
private ContainerIdGenerator containerIdGenerator = private ContainerIdGenerator containerIdGenerator =
@ -69,13 +61,9 @@ public class OpportunisticContainerContext {
// Resource Name (host/rack/any) and capability. This mapping is required // Resource Name (host/rack/any) and capability. This mapping is required
// to match a received Container to an outstanding OPPORTUNISTIC // to match a received Container to an outstanding OPPORTUNISTIC
// ResourceRequest (ask). // ResourceRequest (ask).
private final TreeMap<Priority, Map<Resource, ResourceRequest>> private final TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>>
outstandingOpReqs = new TreeMap<>(); outstandingOpReqs = new TreeMap<>();
public Set<ContainerId> getContainersAllocated() {
return containersAllocated;
}
public AllocationParams getAppParams() { public AllocationParams getAppParams() {
return appParams; return appParams;
} }
@ -119,20 +107,11 @@ public Set<String> getBlacklist() {
return blacklist; return blacklist;
} }
public TreeMap<Priority, Map<Resource, ResourceRequest>> public TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>>
getOutstandingOpReqs() { getOutstandingOpReqs() {
return outstandingOpReqs; return outstandingOpReqs;
} }
public void updateCompletedContainers(AllocateResponse allocateResponse) {
for (ContainerStatus cs :
allocateResponse.getCompletedContainersStatuses()) {
if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
containersAllocated.remove(cs.getContainerId());
}
}
}
/** /**
* Takes a list of ResourceRequests (asks), extracts the key information viz. * Takes a list of ResourceRequests (asks), extracts the key information viz.
* (Priority, ResourceName, Capability) and adds to the outstanding * (Priority, ResourceName, Capability) and adds to the outstanding
@ -144,7 +123,7 @@ public void updateCompletedContainers(AllocateResponse allocateResponse) {
*/ */
public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) { public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
for (ResourceRequest request : resourceAsks) { for (ResourceRequest request : resourceAsks) {
Priority priority = request.getPriority(); SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
// TODO: Extend for Node/Rack locality. We only handle ANY requests now // TODO: Extend for Node/Rack locality. We only handle ANY requests now
if (!ResourceRequest.isAnyLocation(request.getResourceName())) { if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
@ -156,10 +135,10 @@ public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
} }
Map<Resource, ResourceRequest> reqMap = Map<Resource, ResourceRequest> reqMap =
outstandingOpReqs.get(priority); outstandingOpReqs.get(schedulerKey);
if (reqMap == null) { if (reqMap == null) {
reqMap = new HashMap<>(); reqMap = new HashMap<>();
outstandingOpReqs.put(priority, reqMap); outstandingOpReqs.put(schedulerKey, reqMap);
} }
ResourceRequest resourceRequest = reqMap.get(request.getCapability()); ResourceRequest resourceRequest = reqMap.get(request.getCapability());
@ -171,7 +150,8 @@ public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
resourceRequest.getNumContainers() + request.getNumContainers()); resourceRequest.getNumContainers() + request.getNumContainers());
} }
if (ResourceRequest.isAnyLocation(request.getResourceName())) { if (ResourceRequest.isAnyLocation(request.getResourceName())) {
LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority LOG.info("# of outstandingOpReqs in ANY (at" +
"priority = "+ schedulerKey.getPriority()
+ ", with capability = " + request.getCapability() + " ) : " + ", with capability = " + request.getCapability() + " ) : "
+ resourceRequest.getNumContainers()); + resourceRequest.getNumContainers());
} }
@ -187,9 +167,9 @@ public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
public void matchAllocationToOutstandingRequest(Resource capability, public void matchAllocationToOutstandingRequest(Resource capability,
List<Container> allocatedContainers) { List<Container> allocatedContainers) {
for (Container c : allocatedContainers) { for (Container c : allocatedContainers) {
containersAllocated.add(c.getId()); SchedulerRequestKey schedulerKey = SchedulerRequestKey.extractFrom(c);
Map<Resource, ResourceRequest> asks = Map<Resource, ResourceRequest> asks =
outstandingOpReqs.get(c.getPriority()); outstandingOpReqs.get(schedulerKey);
if (asks == null) { if (asks == null) {
continue; continue;

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.scheduler;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
@ -53,7 +53,7 @@ public static SchedulerRequestKey extractFrom(Container container) {
container.getAllocationRequestId()); container.getAllocationRequestId());
} }
private SchedulerRequestKey(Priority priority, long allocationRequestId) { SchedulerRequestKey(Priority priority, long allocationRequestId) {
this.priority = priority; this.priority = priority;
this.allocationRequestId = allocationRequestId; this.allocationRequestId = allocationRequestId;
} }
@ -119,4 +119,12 @@ public int hashCode() {
getAllocationRequestId() >>> 32)); getAllocationRequestId() >>> 32));
return result; return result;
} }
@Override
public String toString() {
return "SchedulerRequestKey{" +
"priority=" + priority +
", allocationRequestId=" + allocationRequestId +
'}';
}
} }

View File

@ -227,10 +227,10 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
.partitionAskList(request.getAllocateRequest().getAskList()); .partitionAskList(request.getAllocateRequest().getAskList());
// Allocate OPPORTUNISTIC containers. // Allocate OPPORTUNISTIC containers.
request.getAllocateRequest().setAskList(partitionedAsks.getOpportunistic());
List<Container> allocatedContainers = List<Container> allocatedContainers =
containerAllocator.allocateContainers( containerAllocator.allocateContainers(
request.getAllocateRequest(), applicationAttemptId, request.getAllocateRequest().getResourceBlacklistRequest(),
partitionedAsks.getOpportunistic(), applicationAttemptId,
oppContainerContext, rmIdentifier, appSubmitter); oppContainerContext, rmIdentifier, appSubmitter);
// Prepare request for sending to RM for scheduling GUARANTEED containers. // Prepare request for sending to RM for scheduling GUARANTEED containers.
@ -252,18 +252,11 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
nodeTokens.put(nmToken.getNodeId(), nmToken); nodeTokens.put(nmToken.getNodeId(), nmToken);
} }
oppContainerContext.updateCompletedContainers(dsResp.getAllocateResponse());
// Check if we have NM tokens for all the allocated containers. If not // Check if we have NM tokens for all the allocated containers. If not
// generate one and update the response. // generate one and update the response.
updateAllocateResponse( updateAllocateResponse(
dsResp.getAllocateResponse(), nmTokens, allocatedContainers); dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
if (LOG.isDebugEnabled()) {
LOG.debug("Number of opportunistic containers currently" +
"allocated by application: " + oppContainerContext
.getContainersAllocated().size());
}
return dsResp; return dsResp;
} }
} }

View File

@ -400,7 +400,6 @@ public AllocateResponse allocate(AllocateRequest request)
ApplicationAttemptId appAttemptId = ApplicationAttemptId appAttemptId =
amrmTokenIdentifier.getApplicationAttemptId(); amrmTokenIdentifier.getApplicationAttemptId();
ApplicationId applicationId = appAttemptId.getApplicationId();
this.amLivelinessMonitor.receivedPing(appAttemptId); this.amLivelinessMonitor.receivedPing(appAttemptId);
@ -417,8 +416,10 @@ public AllocateResponse allocate(AllocateRequest request)
AllocateResponse lastResponse = lock.getAllocateResponse(); AllocateResponse lastResponse = lock.getAllocateResponse();
if (!hasApplicationMasterRegistered(appAttemptId)) { if (!hasApplicationMasterRegistered(appAttemptId)) {
String message = String message =
"AM is not registered for known application attempt: " + appAttemptId "AM is not registered for known application attempt: "
+ " or RM had restarted after AM registered . AM should re-register."; + appAttemptId
+ " or RM had restarted after AM registered. "
+ " AM should re-register.";
throw new ApplicationMasterNotRegisteredException(message); throw new ApplicationMasterNotRegisteredException(message);
} }
@ -433,179 +434,10 @@ public AllocateResponse allocate(AllocateRequest request)
throw new InvalidApplicationMasterRequestException(message); throw new InvalidApplicationMasterRequestException(message);
} }
//filter illegal progress values AllocateResponse response =
float filteredProgress = request.getProgress();
if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY
|| filteredProgress < 0) {
request.setProgress(0);
} else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) {
request.setProgress(1);
}
// Send the status update to the appAttempt.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptStatusupdateEvent(appAttemptId, request
.getProgress()));
List<ResourceRequest> ask = request.getAskList();
List<ContainerId> release = request.getReleaseList();
ResourceBlacklistRequest blacklistRequest =
request.getResourceBlacklistRequest();
List<String> blacklistAdditions =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
List<String> blacklistRemovals =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
RMApp app =
this.rmContext.getRMApps().get(applicationId);
// set label expression for Resource Requests if resourceName=ANY
ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
for (ResourceRequest req : ask) {
if (null == req.getNodeLabelExpression()
&& ResourceRequest.ANY.equals(req.getResourceName())) {
req.setNodeLabelExpression(asc.getNodeLabelExpression());
}
}
Resource maximumCapacity = rScheduler.getMaximumResourceCapability();
// sanity check
try {
RMServerUtils.normalizeAndValidateRequests(ask,
maximumCapacity, app.getQueue(),
rScheduler, rmContext);
} catch (InvalidResourceRequestException e) {
LOG.warn("Invalid resource ask by application " + appAttemptId, e);
throw e;
}
try {
RMServerUtils.validateBlacklistRequest(blacklistRequest);
} catch (InvalidResourceBlacklistRequestException e) {
LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
throw e;
}
// In the case of work-preserving AM restart, it's possible for the
// AM to release containers from the earlier attempt.
if (!app.getApplicationSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
try {
RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
} catch (InvalidContainerReleaseException e) {
LOG.warn("Invalid container release by application " + appAttemptId,
e);
throw e;
}
}
// Split Update Resource Requests into increase and decrease.
// No Exceptions are thrown here. All update errors are aggregated
// and returned to the AM.
List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>();
List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>();
List<UpdateContainerError> updateContainerErrors =
RMServerUtils.validateAndSplitUpdateResourceRequests(rmContext,
request, maximumCapacity, increaseResourceReqs,
decreaseResourceReqs);
// Send new requests to appAttempt.
Allocation allocation;
RMAppAttemptState state =
app.getRMAppAttempt(appAttemptId).getAppAttemptState();
if (state.equals(RMAppAttemptState.FINAL_SAVING) ||
state.equals(RMAppAttemptState.FINISHING) ||
app.isAppFinalStateStored()) {
LOG.warn(appAttemptId + " is in " + state +
" state, ignore container allocate request.");
allocation = EMPTY_ALLOCATION;
} else {
allocation =
this.rScheduler.allocate(appAttemptId, ask, release,
blacklistAdditions, blacklistRemovals,
increaseResourceReqs, decreaseResourceReqs);
}
if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
LOG.info("blacklist are updated in Scheduler." +
"blacklistAdditions: " + blacklistAdditions + ", " +
"blacklistRemovals: " + blacklistRemovals);
}
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
AllocateResponse allocateResponse =
recordFactory.newRecordInstance(AllocateResponse.class); recordFactory.newRecordInstance(AllocateResponse.class);
if (allocation.getNMTokens() != null && allocateInternal(amrmTokenIdentifier.getApplicationAttemptId(),
!allocation.getNMTokens().isEmpty()) { request, response);
allocateResponse.setNMTokens(allocation.getNMTokens());
}
// Notify the AM of container update errors
if (!updateContainerErrors.isEmpty()) {
allocateResponse.setUpdateErrors(updateContainerErrors);
}
// update the response with the deltas of node status changes
List<RMNode> updatedNodes = new ArrayList<RMNode>();
if(app.pullRMNodeUpdates(updatedNodes) > 0) {
List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>();
for(RMNode rmNode: updatedNodes) {
SchedulerNodeReport schedulerNodeReport =
rScheduler.getNodeReport(rmNode.getNodeID());
Resource used = BuilderUtils.newResource(0, 0);
int numContainers = 0;
if (schedulerNodeReport != null) {
used = schedulerNodeReport.getUsedResource();
numContainers = schedulerNodeReport.getNumContainers();
}
NodeId nodeId = rmNode.getNodeID();
NodeReport report =
BuilderUtils.newNodeReport(nodeId, rmNode.getState(),
rmNode.getHttpAddress(), rmNode.getRackName(), used,
rmNode.getTotalCapability(), numContainers,
rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
rmNode.getNodeLabels());
updatedNodeReports.add(report);
}
allocateResponse.setUpdatedNodes(updatedNodeReports);
}
allocateResponse.setAllocatedContainers(allocation.getContainers());
allocateResponse.setCompletedContainersStatuses(appAttempt
.pullJustFinishedContainers());
allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
allocateResponse.setAvailableResources(allocation.getResourceLimit());
// Handling increased/decreased containers
List<UpdatedContainer> updatedContainers = new ArrayList<>();
if (allocation.getIncreasedContainers() != null) {
for (Container c : allocation.getIncreasedContainers()) {
updatedContainers.add(
UpdatedContainer.newInstance(
ContainerUpdateType.INCREASE_RESOURCE, c));
}
}
if (allocation.getDecreasedContainers() != null) {
for (Container c : allocation.getDecreasedContainers()) {
updatedContainers.add(
UpdatedContainer.newInstance(
ContainerUpdateType.DECREASE_RESOURCE, c));
}
}
allocateResponse.setUpdatedContainers(updatedContainers);
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
// add preemption to the allocateResponse message (if any)
allocateResponse
.setPreemptionMessage(generatePreemptionMessage(allocation));
// Set application priority
allocateResponse.setApplicationPriority(app
.getApplicationPriority());
// update AMRMToken if the token is rolled-up // update AMRMToken if the token is rolled-up
MasterKeyData nextMasterKey = MasterKeyData nextMasterKey =
@ -613,21 +445,24 @@ public AllocateResponse allocate(AllocateRequest request)
if (nextMasterKey != null if (nextMasterKey != null
&& nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
.getKeyId()) { .getKeyId()) {
RMApp app =
this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
RMAppAttemptImpl appAttemptImpl = (RMAppAttemptImpl)appAttempt; RMAppAttemptImpl appAttemptImpl = (RMAppAttemptImpl)appAttempt;
Token<AMRMTokenIdentifier> amrmToken = appAttempt.getAMRMToken(); Token<AMRMTokenIdentifier> amrmToken = appAttempt.getAMRMToken();
if (nextMasterKey.getMasterKey().getKeyId() != if (nextMasterKey.getMasterKey().getKeyId() !=
appAttemptImpl.getAMRMTokenKeyId()) { appAttemptImpl.getAMRMTokenKeyId()) {
LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back" LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back"
+ " to application: " + applicationId); + " to application: " + appAttemptId.getApplicationId());
amrmToken = rmContext.getAMRMTokenSecretManager() amrmToken = rmContext.getAMRMTokenSecretManager()
.createAndGetAMRMToken(appAttemptId); .createAndGetAMRMToken(appAttemptId);
appAttemptImpl.setAMRMToken(amrmToken); appAttemptImpl.setAMRMToken(amrmToken);
} }
allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token response.setAMRMToken(org.apache.hadoop.yarn.api.records.Token
.newInstance(amrmToken.getIdentifier(), amrmToken.getKind() .newInstance(amrmToken.getIdentifier(), amrmToken.getKind()
.toString(), amrmToken.getPassword(), amrmToken.getService() .toString(), amrmToken.getPassword(), amrmToken.getService()
.toString())); .toString()));
} }
/* /*
@ -635,11 +470,220 @@ public AllocateResponse allocate(AllocateRequest request)
* need to worry about unregister call occurring in between (which * need to worry about unregister call occurring in between (which
* removes the lock object). * removes the lock object).
*/ */
lock.setAllocateResponse(allocateResponse); response.setResponseId(lastResponse.getResponseId() + 1);
return allocateResponse; lock.setAllocateResponse(response);
return response;
} }
} }
protected void allocateInternal(ApplicationAttemptId appAttemptId,
AllocateRequest request, AllocateResponse allocateResponse)
throws YarnException {
//filter illegal progress values
float filteredProgress = request.getProgress();
if (Float.isNaN(filteredProgress) ||
filteredProgress == Float.NEGATIVE_INFINITY ||
filteredProgress < 0) {
request.setProgress(0);
} else if (filteredProgress > 1 ||
filteredProgress == Float.POSITIVE_INFINITY) {
request.setProgress(1);
}
// Send the status update to the appAttempt.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptStatusupdateEvent(appAttemptId, request
.getProgress()));
List<ResourceRequest> ask = request.getAskList();
List<ContainerId> release = request.getReleaseList();
ResourceBlacklistRequest blacklistRequest =
request.getResourceBlacklistRequest();
List<String> blacklistAdditions =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
List<String> blacklistRemovals =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
RMApp app =
this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
// set label expression for Resource Requests if resourceName=ANY
ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
for (ResourceRequest req : ask) {
if (null == req.getNodeLabelExpression()
&& ResourceRequest.ANY.equals(req.getResourceName())) {
req.setNodeLabelExpression(asc.getNodeLabelExpression());
}
}
Resource maximumCapacity = rScheduler.getMaximumResourceCapability();
// sanity check
try {
RMServerUtils.normalizeAndValidateRequests(ask,
maximumCapacity, app.getQueue(),
rScheduler, rmContext);
} catch (InvalidResourceRequestException e) {
LOG.warn("Invalid resource ask by application " + appAttemptId, e);
throw e;
}
try {
RMServerUtils.validateBlacklistRequest(blacklistRequest);
} catch (InvalidResourceBlacklistRequestException e) {
LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
throw e;
}
// In the case of work-preserving AM restart, it's possible for the
// AM to release containers from the earlier attempt.
if (!app.getApplicationSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
try {
RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
} catch (InvalidContainerReleaseException e) {
LOG.warn("Invalid container release by application " + appAttemptId,
e);
throw e;
}
}
// Split Update Resource Requests into increase and decrease.
// No Exceptions are thrown here. All update errors are aggregated
// and returned to the AM.
List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>();
List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>();
List<UpdateContainerError> updateContainerErrors =
RMServerUtils.validateAndSplitUpdateResourceRequests(
rmContext, request, maximumCapacity,
increaseResourceReqs, decreaseResourceReqs);
// Send new requests to appAttempt.
Allocation allocation;
RMAppAttemptState state =
app.getRMAppAttempt(appAttemptId).getAppAttemptState();
if (state.equals(RMAppAttemptState.FINAL_SAVING) ||
state.equals(RMAppAttemptState.FINISHING) ||
app.isAppFinalStateStored()) {
LOG.warn(appAttemptId + " is in " + state +
" state, ignore container allocate request.");
allocation = EMPTY_ALLOCATION;
} else {
allocation =
this.rScheduler.allocate(appAttemptId, ask, release,
blacklistAdditions, blacklistRemovals,
increaseResourceReqs, decreaseResourceReqs);
}
if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
LOG.info("blacklist are updated in Scheduler." +
"blacklistAdditions: " + blacklistAdditions + ", " +
"blacklistRemovals: " + blacklistRemovals);
}
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
if (allocation.getNMTokens() != null &&
!allocation.getNMTokens().isEmpty()) {
allocateResponse.setNMTokens(allocation.getNMTokens());
}
// Notify the AM of container update errors
addToUpdateContainerErrors(allocateResponse, updateContainerErrors);
// update the response with the deltas of node status changes
List<RMNode> updatedNodes = new ArrayList<RMNode>();
if(app.pullRMNodeUpdates(updatedNodes) > 0) {
List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>();
for(RMNode rmNode: updatedNodes) {
SchedulerNodeReport schedulerNodeReport =
rScheduler.getNodeReport(rmNode.getNodeID());
Resource used = BuilderUtils.newResource(0, 0);
int numContainers = 0;
if (schedulerNodeReport != null) {
used = schedulerNodeReport.getUsedResource();
numContainers = schedulerNodeReport.getNumContainers();
}
NodeId nodeId = rmNode.getNodeID();
NodeReport report =
BuilderUtils.newNodeReport(nodeId, rmNode.getState(),
rmNode.getHttpAddress(), rmNode.getRackName(), used,
rmNode.getTotalCapability(), numContainers,
rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
rmNode.getNodeLabels());
updatedNodeReports.add(report);
}
allocateResponse.setUpdatedNodes(updatedNodeReports);
}
addToAllocatedContainers(allocateResponse, allocation.getContainers());
allocateResponse.setCompletedContainersStatuses(appAttempt
.pullJustFinishedContainers());
allocateResponse.setAvailableResources(allocation.getResourceLimit());
// Handling increased containers
addToUpdatedContainers(
allocateResponse, ContainerUpdateType.INCREASE_RESOURCE,
allocation.getIncreasedContainers());
// Handling decreased containers
addToUpdatedContainers(
allocateResponse, ContainerUpdateType.DECREASE_RESOURCE,
allocation.getDecreasedContainers());
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
// add preemption to the allocateResponse message (if any)
allocateResponse
.setPreemptionMessage(generatePreemptionMessage(allocation));
// Set application priority
allocateResponse.setApplicationPriority(app
.getApplicationPriority());
}
protected void addToUpdateContainerErrors(AllocateResponse allocateResponse,
List<UpdateContainerError> updateContainerErrors) {
if (!updateContainerErrors.isEmpty()) {
if (allocateResponse.getUpdateErrors() != null
&& !allocateResponse.getUpdateErrors().isEmpty()) {
updateContainerErrors = new ArrayList<>(updateContainerErrors);
updateContainerErrors.addAll(allocateResponse.getUpdateErrors());
}
allocateResponse.setUpdateErrors(updateContainerErrors);
}
}
protected void addToUpdatedContainers(AllocateResponse allocateResponse,
ContainerUpdateType updateType, List<Container> updatedContainers) {
if (updatedContainers != null && updatedContainers.size() > 0) {
ArrayList<UpdatedContainer> containersToSet = new ArrayList<>();
if (allocateResponse.getUpdatedContainers() != null &&
!allocateResponse.getUpdatedContainers().isEmpty()) {
containersToSet.addAll(allocateResponse.getUpdatedContainers());
}
for (Container updatedContainer : updatedContainers) {
containersToSet.add(
UpdatedContainer.newInstance(updateType, updatedContainer));
}
allocateResponse.setUpdatedContainers(containersToSet);
}
}
protected void addToAllocatedContainers(AllocateResponse allocateResponse,
List<Container> allocatedContainers) {
if (allocateResponse.getAllocatedContainers() != null
&& !allocateResponse.getAllocatedContainers().isEmpty()) {
allocatedContainers = new ArrayList<>(allocatedContainers);
allocatedContainers.addAll(allocateResponse.getAllocatedContainers());
}
allocateResponse.setAllocatedContainers(allocatedContainers);
}
private PreemptionMessage generatePreemptionMessage(Allocation allocation){ private PreemptionMessage generatePreemptionMessage(Allocation allocation){
PreemptionMessage pMsg = null; PreemptionMessage pMsg = null;
// assemble strict preemption request // assemble strict preemption request

View File

@ -101,8 +101,8 @@ public class OpportunisticContainerAllocatorAMService
private final int k; private final int k;
private final long cacheRefreshInterval; private final long cacheRefreshInterval;
private List<RemoteNode> cachedNodes; private volatile List<RemoteNode> cachedNodes;
private long lastCacheUpdateTime; private volatile long lastCacheUpdateTime;
public OpportunisticContainerAllocatorAMService(RMContext rmContext, public OpportunisticContainerAllocatorAMService(RMContext rmContext,
YarnScheduler scheduler) { YarnScheduler scheduler) {
@ -218,8 +218,9 @@ public long generateContainerId() {
} }
@Override @Override
public AllocateResponse allocate(AllocateRequest request) throws protected void allocateInternal(ApplicationAttemptId appAttemptId,
YarnException, IOException { AllocateRequest request, AllocateResponse allocateResponse)
throws YarnException {
// Partition requests to GUARANTEED and OPPORTUNISTIC. // Partition requests to GUARANTEED and OPPORTUNISTIC.
OpportunisticContainerAllocator.PartitionedResourceRequests OpportunisticContainerAllocator.PartitionedResourceRequests
@ -227,40 +228,30 @@ public AllocateResponse allocate(AllocateRequest request) throws
oppContainerAllocator.partitionAskList(request.getAskList()); oppContainerAllocator.partitionAskList(request.getAskList());
// Allocate OPPORTUNISTIC containers. // Allocate OPPORTUNISTIC containers.
request.setAskList(partitionedAsks.getOpportunistic()); SchedulerApplicationAttempt appAttempt =
final ApplicationAttemptId appAttemptId = getAppAttemptId(); ((AbstractYarnScheduler)rmContext.getScheduler())
SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler) .getApplicationAttempt(appAttemptId);
rmContext.getScheduler()).getApplicationAttempt(appAttemptId);
OpportunisticContainerContext oppCtx = OpportunisticContainerContext oppCtx =
appAttempt.getOpportunisticContainerContext(); appAttempt.getOpportunisticContainerContext();
oppCtx.updateNodeList(getLeastLoadedNodes()); oppCtx.updateNodeList(getLeastLoadedNodes());
List<Container> oppContainers = List<Container> oppContainers =
oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx, oppContainerAllocator.allocateContainers(
ResourceManager.getClusterTimeStamp(), appAttempt.getUser()); request.getResourceBlacklistRequest(),
partitionedAsks.getOpportunistic(), appAttemptId, oppCtx,
ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
// Create RMContainers and update the NMTokens. // Create RMContainers and update the NMTokens.
if (!oppContainers.isEmpty()) { if (!oppContainers.isEmpty()) {
handleNewContainers(oppContainers, false); handleNewContainers(oppContainers, false);
appAttempt.updateNMTokens(oppContainers); appAttempt.updateNMTokens(oppContainers);
addToAllocatedContainers(allocateResponse, oppContainers);
} }
// Allocate GUARANTEED containers. // Allocate GUARANTEED containers.
request.setAskList(partitionedAsks.getGuaranteed()); request.setAskList(partitionedAsks.getGuaranteed());
AllocateResponse allocateResp = super.allocate(request); super.allocateInternal(appAttemptId, request, allocateResponse);
// Add allocated OPPORTUNISTIC containers to the AllocateResponse.
if (!oppContainers.isEmpty()) {
allocateResp.getAllocatedContainers().addAll(oppContainers);
}
// Update opportunistic container context with the allocated GUARANTEED
// containers.
oppCtx.updateCompletedContainers(allocateResp);
// Add all opportunistic containers
return allocateResp;
} }
@Override @Override
@ -304,7 +295,7 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
} }
private void handleNewContainers(List<Container> allocContainers, private void handleNewContainers(List<Container> allocContainers,
boolean isRemotelyAllocated) { boolean isRemotelyAllocated) {
for (Container container : allocContainers) { for (Container container : allocContainers) {
// Create RMContainer // Create RMContainer
SchedulerApplicationAttempt appAttempt = SchedulerApplicationAttempt appAttempt =
@ -387,10 +378,12 @@ public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
private synchronized List<RemoteNode> getLeastLoadedNodes() { private synchronized List<RemoteNode> getLeastLoadedNodes() {
long currTime = System.currentTimeMillis(); long currTime = System.currentTimeMillis();
if ((currTime - lastCacheUpdateTime > cacheRefreshInterval) if ((currTime - lastCacheUpdateTime > cacheRefreshInterval)
|| cachedNodes == null) { || (cachedNodes == null)) {
cachedNodes = convertToRemoteNodes( cachedNodes = convertToRemoteNodes(
this.nodeMonitor.selectLeastLoadedNodes(this.k)); this.nodeMonitor.selectLeastLoadedNodes(this.k));
lastCacheUpdateTime = currTime; if (cachedNodes.size() > 0) {
lastCacheUpdateTime = currTime;
}
} }
return cachedNodes; return cachedNodes;
} }

View File

@ -25,14 +25,13 @@
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
/** /**

View File

@ -53,7 +53,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode import org.apache.hadoop.yarn.server.resourcemanager.rmnode
.RMNodeDecreaseContainerEvent; .RMNodeDecreaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition;

View File

@ -21,7 +21,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
/** /**
* The event signifying that a container has been reserved. * The event signifying that a container has been reserved.

View File

@ -37,6 +37,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalitySchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalitySchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList; import java.util.ArrayList;

View File

@ -51,7 +51,6 @@
import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -73,12 +72,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
@ -2216,7 +2217,8 @@ public void collectSchedulerApplications(
@Override @Override
public void attachContainer(Resource clusterResource, public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) { FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) { if (application != null && rmContainer != null
&& rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
FiCaSchedulerNode node = FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId()); scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, application, rmContainer.getContainer() allocateResource(clusterResource, application, rmContainer.getContainer()
@ -2234,7 +2236,8 @@ public void attachContainer(Resource clusterResource,
@Override @Override
public void detachContainer(Resource clusterResource, public void detachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) { FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) { if (application != null && rmContainer != null
&& rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
FiCaSchedulerNode node = FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId()); scheduler.getNode(rmContainer.getContainer().getNodeId());
releaseResource(clusterResource, application, rmContainer.getContainer() releaseResource(clusterResource, application, rmContainer.getContainer()

View File

@ -31,7 +31,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;

View File

@ -38,7 +38,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;

View File

@ -22,7 +22,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
/** /**
* Contexts for a container inside scheduler * Contexts for a container inside scheduler

View File

@ -52,7 +52,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
@ -69,6 +68,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;

View File

@ -29,7 +29,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;

View File

@ -30,7 +30,6 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -61,9 +60,9 @@ public enum LoadComparator implements Comparator<ClusterNode> {
@Override @Override
public int compare(ClusterNode o1, ClusterNode o2) { public int compare(ClusterNode o1, ClusterNode o2) {
if (getMetric(o1) == getMetric(o2)) { if (getMetric(o1) == getMetric(o2)) {
return o1.timestamp < o2.timestamp ? +1 : -1; return (int)(o2.timestamp - o1.timestamp);
} }
return getMetric(o1) > getMetric(o2) ? +1 : -1; return getMetric(o1) - getMetric(o2);
} }
public int getMetric(ClusterNode c) { public int getMetric(ClusterNode c) {
@ -115,8 +114,13 @@ public void run() {
ReentrantReadWriteLock.WriteLock writeLock = sortedNodesLock.writeLock(); ReentrantReadWriteLock.WriteLock writeLock = sortedNodesLock.writeLock();
writeLock.lock(); writeLock.lock();
try { try {
sortedNodes.clear(); try {
sortedNodes.addAll(sortNodes()); List<NodeId> nodeIds = sortNodes();
sortedNodes.clear();
sortedNodes.addAll(nodeIds);
} catch (Exception ex) {
LOG.warn("Got Exception while sorting nodes..", ex);
}
if (thresholdCalculator != null) { if (thresholdCalculator != null) {
thresholdCalculator.update(); thresholdCalculator.update();
} }
@ -273,7 +277,7 @@ public List<NodeId> selectLeastLoadedNodes(int k) {
List<NodeId> retVal = ((k < this.sortedNodes.size()) && (k >= 0)) ? List<NodeId> retVal = ((k < this.sortedNodes.size()) && (k >= 0)) ?
new ArrayList<>(this.sortedNodes).subList(0, k) : new ArrayList<>(this.sortedNodes).subList(0, k) :
new ArrayList<>(this.sortedNodes); new ArrayList<>(this.sortedNodes);
return Collections.unmodifiableList(retVal); return retVal;
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }

View File

@ -56,7 +56,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;

View File

@ -26,7 +26,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@Private @Private

View File

@ -33,10 +33,12 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import java.util.List; import java.util.List;
public class FifoAppAttempt extends FiCaSchedulerApp { public class FifoAppAttempt extends FiCaSchedulerApp {

View File

@ -69,7 +69,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -709,7 +709,7 @@ private int assignContainer(FiCaSchedulerNode node, FifoAppAttempt application,
// Inform the application // Inform the application
RMContainer rmContainer = application.allocate(type, node, schedulerKey, RMContainer rmContainer = application.allocate(type, node, schedulerKey,
request, container); request, container);
// Inform the node // Inform the node
node.allocateContainer(rmContainer); node.allocateContainer(rmContainer);

View File

@ -25,7 +25,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -177,11 +177,15 @@ private void decrementOutstanding(ResourceRequest offSwitchRequest) {
offSwitchRequest.getCapability()); offSwitchRequest.getCapability());
} }
private ResourceRequest cloneResourceRequest(ResourceRequest request) { public ResourceRequest cloneResourceRequest(ResourceRequest request) {
ResourceRequest newRequest = ResourceRequest newRequest = ResourceRequest.newBuilder()
ResourceRequest.newInstance(request.getPriority(), .priority(request.getPriority())
request.getResourceName(), request.getCapability(), 1, .allocationRequestId(request.getAllocationRequestId())
request.getRelaxLocality(), request.getNodeLabelExpression()); .resourceName(request.getResourceName())
.capability(request.getCapability())
.numContainers(1)
.relaxLocality(request.getRelaxLocality())
.nodeLabelExpression(request.getNodeLabelExpression()).build();
return newRequest; return newRequest;
} }

View File

@ -21,7 +21,7 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;

View File

@ -61,7 +61,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity

View File

@ -31,7 +31,7 @@
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.TestUtils; .TestUtils;

View File

@ -36,8 +36,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;

View File

@ -35,8 +35,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;

View File

@ -41,6 +41,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;

View File

@ -120,7 +120,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;

View File

@ -77,7 +77,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;

View File

@ -60,7 +60,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;

View File

@ -53,7 +53,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;

View File

@ -30,7 +30,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;

View File

@ -38,7 +38,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.TestUtils; .TestUtils;

View File

@ -34,13 +34,14 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.TestUtils; .TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;