YARN-4329. [YARN-5437] Allow fetching exact reason as to why a submitted app
is in ACCEPTED state in Fair Scheduler (Contributed by Yufei Gu)
This commit is contained in:
parent
822ae88f7d
commit
59ee8b7a88
|
@ -766,8 +766,18 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
// The desired container won't fit here, so reserve
|
// The desired container won't fit here, so reserve
|
||||||
if (isReservable(capability) &&
|
if (isReservable(capability) &&
|
||||||
reserve(request, node, reservedContainer, type, schedulerKey)) {
|
reserve(request, node, reservedContainer, type, schedulerKey)) {
|
||||||
|
if (isWaitingForAMContainer()) {
|
||||||
|
updateAMDiagnosticMsg(capability,
|
||||||
|
" exceed the available resources of the node and the request is"
|
||||||
|
+ " reserved");
|
||||||
|
}
|
||||||
return FairScheduler.CONTAINER_RESERVED;
|
return FairScheduler.CONTAINER_RESERVED;
|
||||||
} else {
|
} else {
|
||||||
|
if (isWaitingForAMContainer()) {
|
||||||
|
updateAMDiagnosticMsg(capability,
|
||||||
|
" exceed the available resources of the node and the request cannot"
|
||||||
|
+ " be reserved");
|
||||||
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Couldn't creating reservation for " +
|
LOG.debug("Couldn't creating reservation for " +
|
||||||
getName() + ",at priority " + request.getPriority());
|
getName() + ",at priority " + request.getPriority());
|
||||||
|
@ -920,23 +930,31 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
ResourceRequest rackRequest = getResourceRequest(key, node.getRackName());
|
ResourceRequest rackRequest = getResourceRequest(key, node.getRackName());
|
||||||
ResourceRequest nodeRequest = getResourceRequest(key, node.getNodeName());
|
ResourceRequest nodeRequest = getResourceRequest(key, node.getNodeName());
|
||||||
|
|
||||||
return
|
boolean ret = true;
|
||||||
// There must be outstanding requests at the given priority:
|
if (!(// There must be outstanding requests at the given priority:
|
||||||
anyRequest != null && anyRequest.getNumContainers() > 0 &&
|
anyRequest != null && anyRequest.getNumContainers() > 0 &&
|
||||||
// If locality relaxation is turned off at *-level, there must be a
|
// If locality relaxation is turned off at *-level, there must be a
|
||||||
// non-zero request for the node's rack:
|
// non-zero request for the node's rack:
|
||||||
(anyRequest.getRelaxLocality() ||
|
(anyRequest.getRelaxLocality() ||
|
||||||
(rackRequest != null && rackRequest.getNumContainers() > 0)) &&
|
(rackRequest != null && rackRequest.getNumContainers() > 0)) &&
|
||||||
// If locality relaxation is turned off at rack-level, there must be a
|
// If locality relaxation is turned off at rack-level, there must be a
|
||||||
// non-zero request at the node:
|
// non-zero request at the node:
|
||||||
(rackRequest == null || rackRequest.getRelaxLocality() ||
|
(rackRequest == null || rackRequest.getRelaxLocality() ||
|
||||||
(nodeRequest != null && nodeRequest.getNumContainers() > 0)) &&
|
(nodeRequest != null && nodeRequest.getNumContainers() > 0)) &&
|
||||||
// The requested container must be able to fit on the node:
|
// The requested container must be able to fit on the node:
|
||||||
Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
|
Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
|
||||||
anyRequest.getCapability(),
|
anyRequest.getCapability(), node.getRMNode().getTotalCapability()))) {
|
||||||
node.getRMNode().getTotalCapability()) &&
|
ret = false;
|
||||||
// The requested container must fit in queue maximum share:
|
} else if (!getQueue().fitsInMaxShare(anyRequest.getCapability())) {
|
||||||
getQueue().fitsInMaxShare(anyRequest.getCapability());
|
// The requested container must fit in queue maximum share
|
||||||
|
if (isWaitingForAMContainer()) {
|
||||||
|
updateAMDiagnosticMsg(anyRequest.getCapability(),
|
||||||
|
" exceeds current queue or its parents maximum resource allowed).");
|
||||||
|
}
|
||||||
|
ret = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isValidReservation(FSSchedulerNode node) {
|
private boolean isValidReservation(FSSchedulerNode node) {
|
||||||
|
@ -1083,6 +1101,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
@Override
|
@Override
|
||||||
public Resource assignContainer(FSSchedulerNode node) {
|
public Resource assignContainer(FSSchedulerNode node) {
|
||||||
if (isOverAMShareLimit()) {
|
if (isOverAMShareLimit()) {
|
||||||
|
if (isWaitingForAMContainer()) {
|
||||||
|
List<ResourceRequest> ask = appSchedulingInfo.getAllResourceRequests();
|
||||||
|
updateAMDiagnosticMsg(ask.get(0).getCapability(), " exceeds maximum "
|
||||||
|
+ "AM resource allowed).");
|
||||||
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Skipping allocation because maxAMShare limit would " +
|
LOG.debug("Skipping allocation because maxAMShare limit would " +
|
||||||
"be exceeded");
|
"be exceeded");
|
||||||
|
@ -1092,6 +1116,21 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
return assignContainer(node, false);
|
return assignContainer(node, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build the diagnostic message and update it.
|
||||||
|
*
|
||||||
|
* @param resource resource request
|
||||||
|
* @param reason the reason why AM doesn't get the resource
|
||||||
|
*/
|
||||||
|
private void updateAMDiagnosticMsg(Resource resource, String reason) {
|
||||||
|
StringBuilder diagnosticMessageBldr = new StringBuilder();
|
||||||
|
diagnosticMessageBldr.append(" (Resource request: ");
|
||||||
|
diagnosticMessageBldr.append(resource);
|
||||||
|
diagnosticMessageBldr.append(reason);
|
||||||
|
updateAMContainerDiagnostics(AMState.INACTIVATED,
|
||||||
|
diagnosticMessageBldr.toString());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Preempt a running container according to the priority
|
* Preempt a running container according to the priority
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -710,7 +710,7 @@ public class FairScheduler extends
|
||||||
}
|
}
|
||||||
application.setCurrentAppAttempt(attempt);
|
application.setCurrentAppAttempt(attempt);
|
||||||
|
|
||||||
boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
|
boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, attempt);
|
||||||
queue.addApp(attempt, runnable);
|
queue.addApp(attempt, runnable);
|
||||||
if (runnable) {
|
if (runnable) {
|
||||||
maxRunningEnforcer.trackRunnableApp(attempt);
|
maxRunningEnforcer.trackRunnableApp(attempt);
|
||||||
|
@ -1714,7 +1714,7 @@ public class FairScheduler extends
|
||||||
boolean wasRunnable = oldQueue.removeApp(attempt);
|
boolean wasRunnable = oldQueue.removeApp(attempt);
|
||||||
// if app was not runnable before, it may be runnable now
|
// if app was not runnable before, it may be runnable now
|
||||||
boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue,
|
boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue,
|
||||||
attempt.getUser());
|
attempt);
|
||||||
if (wasRunnable && !nowRunnable) {
|
if (wasRunnable && !nowRunnable) {
|
||||||
throw new IllegalStateException("Should have already verified that app "
|
throw new IllegalStateException("Should have already verified that app "
|
||||||
+ attempt.getApplicationId() + " would be runnable in new queue");
|
+ attempt.getApplicationId() + " would be runnable in new queue");
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.ArrayListMultimap;
|
import com.google.common.collect.ArrayListMultimap;
|
||||||
import com.google.common.collect.ListMultimap;
|
import com.google.common.collect.ListMultimap;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handles tracking and enforcement for user and queue maxRunningApps
|
* Handles tracking and enforcement for user and queue maxRunningApps
|
||||||
|
@ -54,25 +55,64 @@ public class MaxRunningAppsEnforcer {
|
||||||
/**
|
/**
|
||||||
* Checks whether making the application runnable would exceed any
|
* Checks whether making the application runnable would exceed any
|
||||||
* maxRunningApps limits.
|
* maxRunningApps limits.
|
||||||
|
*
|
||||||
|
* @param queue the current queue
|
||||||
|
* @param attempt the app attempt being checked
|
||||||
|
* @return true if the application is runnable; false otherwise
|
||||||
*/
|
*/
|
||||||
public boolean canAppBeRunnable(FSQueue queue, String user) {
|
public boolean canAppBeRunnable(FSQueue queue, FSAppAttempt attempt) {
|
||||||
|
boolean ret = true;
|
||||||
|
if (exceedUserMaxApps(attempt.getUser())) {
|
||||||
|
attempt.updateAMContainerDiagnostics(AMState.INACTIVATED,
|
||||||
|
"The user \"" + attempt.getUser() + "\" has reached the maximum limit"
|
||||||
|
+ " of runnable applications.");
|
||||||
|
ret = false;
|
||||||
|
} else if (exceedQueueMaxRunningApps(queue)) {
|
||||||
|
attempt.updateAMContainerDiagnostics(AMState.INACTIVATED,
|
||||||
|
"The queue \"" + queue.getName() + "\" has reached the maximum limit"
|
||||||
|
+ " of runnable applications.");
|
||||||
|
ret = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks whether the number of user runnable apps exceeds the limitation.
|
||||||
|
*
|
||||||
|
* @param user the user name
|
||||||
|
* @return true if the number hits the limit; false otherwise
|
||||||
|
*/
|
||||||
|
public boolean exceedUserMaxApps(String user) {
|
||||||
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
|
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
|
||||||
Integer userNumRunnable = usersNumRunnableApps.get(user);
|
Integer userNumRunnable = usersNumRunnableApps.get(user);
|
||||||
if (userNumRunnable == null) {
|
if (userNumRunnable == null) {
|
||||||
userNumRunnable = 0;
|
userNumRunnable = 0;
|
||||||
}
|
}
|
||||||
if (userNumRunnable >= allocConf.getUserMaxApps(user)) {
|
if (userNumRunnable >= allocConf.getUserMaxApps(user)) {
|
||||||
return false;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Recursively checks whether the number of queue runnable apps exceeds the
|
||||||
|
* limitation.
|
||||||
|
*
|
||||||
|
* @param queue the current queue
|
||||||
|
* @return true if the number hits the limit; false otherwise
|
||||||
|
*/
|
||||||
|
public boolean exceedQueueMaxRunningApps(FSQueue queue) {
|
||||||
// Check queue and all parent queues
|
// Check queue and all parent queues
|
||||||
while (queue != null) {
|
while (queue != null) {
|
||||||
if (queue.getNumRunnableApps() >= queue.getMaxRunningApps()) {
|
if (queue.getNumRunnableApps() >= queue.getMaxRunningApps()) {
|
||||||
return false;
|
return true;
|
||||||
}
|
}
|
||||||
queue = queue.getParent();
|
queue = queue.getParent();
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -198,7 +238,7 @@ public class MaxRunningAppsEnforcer {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (canAppBeRunnable(next.getQueue(), next.getUser())) {
|
if (canAppBeRunnable(next.getQueue(), next)) {
|
||||||
trackRunnableApp(next);
|
trackRunnableApp(next);
|
||||||
FSAppAttempt appSched = next;
|
FSAppAttempt appSched = next;
|
||||||
next.getQueue().addApp(appSched, true);
|
next.getQueue().addApp(appSched, true);
|
||||||
|
|
|
@ -68,9 +68,9 @@ public class TestMaxRunningAppsEnforcer {
|
||||||
private FSAppAttempt addApp(FSLeafQueue queue, String user) {
|
private FSAppAttempt addApp(FSLeafQueue queue, String user) {
|
||||||
ApplicationId appId = ApplicationId.newInstance(0l, appNum++);
|
ApplicationId appId = ApplicationId.newInstance(0l, appNum++);
|
||||||
ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0);
|
ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0);
|
||||||
boolean runnable = maxAppsEnforcer.canAppBeRunnable(queue, user);
|
|
||||||
FSAppAttempt app = new FSAppAttempt(scheduler, attId, user, queue, null,
|
FSAppAttempt app = new FSAppAttempt(scheduler, attId, user, queue, null,
|
||||||
rmContext);
|
rmContext);
|
||||||
|
boolean runnable = maxAppsEnforcer.canAppBeRunnable(queue, app);
|
||||||
queue.addApp(app, runnable);
|
queue.addApp(app, runnable);
|
||||||
if (runnable) {
|
if (runnable) {
|
||||||
maxAppsEnforcer.trackRunnableApp(app);
|
maxAppsEnforcer.trackRunnableApp(app);
|
||||||
|
|
Loading…
Reference in New Issue