YARN-6042. Dump scheduler and queue state information into FairScheduler DEBUG log. (Yufei Gu via rchiang)
This commit is contained in:
parent
0e7879052a
commit
fa59f4e490
|
@ -321,3 +321,12 @@ log4j.appender.EWMA=org.apache.hadoop.yarn.util.Log4jWarningErrorMetricsAppender
|
||||||
log4j.appender.EWMA.cleanupInterval=${yarn.ewma.cleanupInterval}
|
log4j.appender.EWMA.cleanupInterval=${yarn.ewma.cleanupInterval}
|
||||||
log4j.appender.EWMA.messageAgeLimitSeconds=${yarn.ewma.messageAgeLimitSeconds}
|
log4j.appender.EWMA.messageAgeLimitSeconds=${yarn.ewma.messageAgeLimitSeconds}
|
||||||
log4j.appender.EWMA.maxUniqueMessages=${yarn.ewma.maxUniqueMessages}
|
log4j.appender.EWMA.maxUniqueMessages=${yarn.ewma.maxUniqueMessages}
|
||||||
|
|
||||||
|
# Fair scheduler requests log on state dump
|
||||||
|
log4j.logger.org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.statedump=DEBUG,FSLOGGER
|
||||||
|
log4j.appender.FSLOGGER=org.apache.log4j.RollingFileAppender
|
||||||
|
log4j.appender.FSLOGGER.File=${hadoop.log.dir}/fairscheduler-statedump.log
|
||||||
|
log4j.appender.FSLOGGER.layout=org.apache.log4j.PatternLayout
|
||||||
|
log4j.appender.FSLOGGER.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
|
||||||
|
log4j.appender.FSLOGGER.MaxFileSize=${hadoop.log.maxfilesize}
|
||||||
|
log4j.appender.FSLOGGER.MaxBackupIndex=${hadoop.log.maxbackupindex}
|
|
@ -834,24 +834,26 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
return capability;
|
return capability;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Resource request: " + capability + " exceeds the available"
|
||||||
|
+ " resources of the node.");
|
||||||
|
}
|
||||||
|
|
||||||
// The desired container won't fit here, so reserve
|
// The desired container won't fit here, so reserve
|
||||||
if (isReservable(capability) &&
|
if (isReservable(capability) &&
|
||||||
reserve(request, node, reservedContainer, type, schedulerKey)) {
|
reserve(request, node, reservedContainer, type, schedulerKey)) {
|
||||||
if (isWaitingForAMContainer()) {
|
updateAMDiagnosticMsg(capability, " exceeds the available resources of "
|
||||||
updateAMDiagnosticMsg(capability,
|
+ "the node and the request is reserved)");
|
||||||
" exceed the available resources of the node and the request is"
|
if (LOG.isDebugEnabled()) {
|
||||||
+ " reserved");
|
LOG.debug(getName() + "'s resource request is reserved.");
|
||||||
}
|
}
|
||||||
return FairScheduler.CONTAINER_RESERVED;
|
return FairScheduler.CONTAINER_RESERVED;
|
||||||
} else {
|
} else {
|
||||||
if (isWaitingForAMContainer()) {
|
updateAMDiagnosticMsg(capability, " exceeds the available resources of "
|
||||||
updateAMDiagnosticMsg(capability,
|
+ "the node and the request cannot be reserved)");
|
||||||
" exceed the available resources of the node and the request cannot"
|
|
||||||
+ " be reserved");
|
|
||||||
}
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Couldn't creating reservation for " +
|
LOG.debug("Couldn't create reservation for app: " + getName()
|
||||||
getName() + ",at priority " + request.getPriority());
|
+ ", at priority " + schedulerKey.getPriority());
|
||||||
}
|
}
|
||||||
return Resources.none();
|
return Resources.none();
|
||||||
}
|
}
|
||||||
|
@ -1023,10 +1025,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
ret = false;
|
ret = false;
|
||||||
} else if (!getQueue().fitsInMaxShare(anyRequest.getCapability())) {
|
} else if (!getQueue().fitsInMaxShare(anyRequest.getCapability())) {
|
||||||
// The requested container must fit in queue maximum share
|
// The requested container must fit in queue maximum share
|
||||||
if (isWaitingForAMContainer()) {
|
|
||||||
updateAMDiagnosticMsg(anyRequest.getCapability(),
|
updateAMDiagnosticMsg(anyRequest.getCapability(),
|
||||||
" exceeds current queue or its parents maximum resource allowed).");
|
" exceeds current queue or its parents maximum resource allowed).");
|
||||||
}
|
|
||||||
ret = false;
|
ret = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1301,16 +1302,16 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
@Override
|
@Override
|
||||||
public Resource assignContainer(FSSchedulerNode node) {
|
public Resource assignContainer(FSSchedulerNode node) {
|
||||||
if (isOverAMShareLimit()) {
|
if (isOverAMShareLimit()) {
|
||||||
if (isWaitingForAMContainer()) {
|
Resource amResourceRequest = appSchedulingInfo.getAllResourceRequests()
|
||||||
List<ResourceRequest> ask = appSchedulingInfo.getAllResourceRequests();
|
.get(0).getCapability();
|
||||||
updateAMDiagnosticMsg(ask.get(0).getCapability(), " exceeds maximum "
|
updateAMDiagnosticMsg(amResourceRequest,
|
||||||
+ "AM resource allowed).");
|
" exceeds maximum AM resource allowed).");
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("AM resource request: " + amResourceRequest
|
||||||
|
+ " exceeds maximum AM resource allowed, "
|
||||||
|
+ getQueue().dumpState());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Skipping allocation because maxAMShare limit would " +
|
|
||||||
"be exceeded");
|
|
||||||
}
|
|
||||||
return Resources.none();
|
return Resources.none();
|
||||||
}
|
}
|
||||||
return assignContainer(node, false);
|
return assignContainer(node, false);
|
||||||
|
@ -1323,6 +1324,10 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
* @param reason the reason why AM doesn't get the resource
|
* @param reason the reason why AM doesn't get the resource
|
||||||
*/
|
*/
|
||||||
private void updateAMDiagnosticMsg(Resource resource, String reason) {
|
private void updateAMDiagnosticMsg(Resource resource, String reason) {
|
||||||
|
if (!isWaitingForAMContainer()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
StringBuilder diagnosticMessageBldr = new StringBuilder();
|
StringBuilder diagnosticMessageBldr = new StringBuilder();
|
||||||
diagnosticMessageBldr.append(" (Resource request: ");
|
diagnosticMessageBldr.append(" (Resource request: ");
|
||||||
diagnosticMessageBldr.append(resource);
|
diagnosticMessageBldr.append(resource);
|
||||||
|
|
|
@ -619,4 +619,25 @@ public class FSLeafQueue extends FSQueue {
|
||||||
boolean isStarved() {
|
boolean isStarved() {
|
||||||
return isStarvedForMinShare() || isStarvedForFairShare();
|
return isStarvedForMinShare() || isStarvedForFairShare();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void dumpStateInternal(StringBuilder sb) {
|
||||||
|
sb.append("{Name: " + getName() +
|
||||||
|
", Weight: " + weights +
|
||||||
|
", Policy: " + policy.getName() +
|
||||||
|
", FairShare: " + getFairShare() +
|
||||||
|
", SteadyFairShare: " + getSteadyFairShare() +
|
||||||
|
", MaxShare: " + maxShare +
|
||||||
|
", MinShare: " + minShare +
|
||||||
|
", ResourceUsage: " + getResourceUsage() +
|
||||||
|
", Demand: " + getDemand() +
|
||||||
|
", Runnable: " + getNumRunnableApps() +
|
||||||
|
", NumPendingApps: " + getNumPendingApps() +
|
||||||
|
", NonRunnable: " + getNumNonRunnableApps() +
|
||||||
|
", MaxAMShare: " + maxAMShare +
|
||||||
|
", MaxAMResource: " + computeMaxAMResource() +
|
||||||
|
", AMResourceUsage: " + getAmResourceUsage() +
|
||||||
|
", LastTimeAtMinShare: " + lastTimeAtMinShare +
|
||||||
|
"}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -292,4 +292,25 @@ public class FSParentQueue extends FSQueue {
|
||||||
// TODO Auto-generated method stub
|
// TODO Auto-generated method stub
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void dumpStateInternal(StringBuilder sb) {
|
||||||
|
sb.append("{Name: " + getName() +
|
||||||
|
", Weight: " + weights +
|
||||||
|
", Policy: " + policy.getName() +
|
||||||
|
", FairShare: " + getFairShare() +
|
||||||
|
", SteadyFairShare: " + getSteadyFairShare() +
|
||||||
|
", MaxShare: " + maxShare +
|
||||||
|
", MinShare: " + minShare +
|
||||||
|
", ResourceUsage: " + getResourceUsage() +
|
||||||
|
", Demand: " + getDemand() +
|
||||||
|
", MaxAMShare: " + maxAMShare +
|
||||||
|
", Runnable: " + getNumRunnableApps() +
|
||||||
|
"}");
|
||||||
|
|
||||||
|
for(FSQueue child : getChildQueues()) {
|
||||||
|
sb.append(", ");
|
||||||
|
child.dumpStateInternal(sb);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -393,12 +393,23 @@ public abstract class FSQueue implements Queue, Schedulable {
|
||||||
* @return true if check passes (can assign) or false otherwise
|
* @return true if check passes (can assign) or false otherwise
|
||||||
*/
|
*/
|
||||||
boolean assignContainerPreCheck(FSSchedulerNode node) {
|
boolean assignContainerPreCheck(FSSchedulerNode node) {
|
||||||
if (!Resources.fitsIn(getResourceUsage(), maxShare)
|
if (node.getReservedContainer() != null) {
|
||||||
|| node.getReservedContainer() != null) {
|
if (LOG.isDebugEnabled()) {
|
||||||
return false;
|
LOG.debug("Assigning container failed on node '" + node.getNodeName()
|
||||||
|
+ " because it has reserved containers.");
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
|
} else if (!Resources.fitsIn(getResourceUsage(), maxShare)) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Assigning container failed on node '" + node.getNodeName()
|
||||||
|
+ " because queue resource usage is larger than MaxShare: "
|
||||||
|
+ dumpState());
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if queue has at least one app running.
|
* Returns true if queue has at least one app running.
|
||||||
|
@ -453,6 +464,11 @@ public abstract class FSQueue implements Queue, Schedulable {
|
||||||
Resources.add(getResourceUsage(), additionalResource);
|
Resources.add(getResourceUsage(), additionalResource);
|
||||||
|
|
||||||
if (!Resources.fitsIn(usagePlusAddition, getMaxShare())) {
|
if (!Resources.fitsIn(usagePlusAddition, getMaxShare())) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Resource usage plus resource request: " + usagePlusAddition
|
||||||
|
+ " exceeds maximum resource allowed:" + getMaxShare()
|
||||||
|
+ " in queue " + getName());
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -491,4 +507,23 @@ public abstract class FSQueue implements Queue, Schedulable {
|
||||||
setPolicy(queuePolicy);
|
setPolicy(queuePolicy);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Recursively dump states of all queues.
|
||||||
|
*
|
||||||
|
* @return a string which holds all queue states
|
||||||
|
*/
|
||||||
|
public String dumpState() {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
dumpStateInternal(sb);
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Recursively dump states of all queues.
|
||||||
|
*
|
||||||
|
* @param sb the {code StringBuilder} which holds queue states
|
||||||
|
*/
|
||||||
|
protected abstract void dumpStateInternal(StringBuilder sb);
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,6 +140,8 @@ public class FairScheduler extends
|
||||||
private boolean usePortForNodeName;
|
private boolean usePortForNodeName;
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(FairScheduler.class);
|
private static final Log LOG = LogFactory.getLog(FairScheduler.class);
|
||||||
|
private static final Log STATE_DUMP_LOG =
|
||||||
|
LogFactory.getLog(FairScheduler.class.getName() + ".statedump");
|
||||||
|
|
||||||
private static final ResourceCalculator RESOURCE_CALCULATOR =
|
private static final ResourceCalculator RESOURCE_CALCULATOR =
|
||||||
new DefaultResourceCalculator();
|
new DefaultResourceCalculator();
|
||||||
|
@ -152,7 +154,7 @@ public class FairScheduler extends
|
||||||
|
|
||||||
// How often fair shares are re-calculated (ms)
|
// How often fair shares are re-calculated (ms)
|
||||||
protected long updateInterval;
|
protected long updateInterval;
|
||||||
private final int UPDATE_DEBUG_FREQUENCY = 5;
|
private final int UPDATE_DEBUG_FREQUENCY = 25;
|
||||||
private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
|
private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -348,6 +350,21 @@ public class FairScheduler extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dump scheduler state including states of all queues.
|
||||||
|
*/
|
||||||
|
private void dumpSchedulerState() {
|
||||||
|
FSQueue rootQueue = queueMgr.getRootQueue();
|
||||||
|
Resource clusterResource = getClusterResource();
|
||||||
|
LOG.debug("FairScheduler state: Cluster Capacity: " + clusterResource +
|
||||||
|
" Allocations: " + rootMetrics.getAllocatedResources() +
|
||||||
|
" Availability: " + Resource.newInstance(
|
||||||
|
rootMetrics.getAvailableMB(), rootMetrics.getAvailableVirtualCores()) +
|
||||||
|
" Demand: " + rootQueue.getDemand());
|
||||||
|
|
||||||
|
STATE_DUMP_LOG.debug(rootQueue.dumpState());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Recompute the internal variables used by the scheduler - per-job weights,
|
* Recompute the internal variables used by the scheduler - per-job weights,
|
||||||
* fair shares, deficits, minimum slot allocations, and amount of used and
|
* fair shares, deficits, minimum slot allocations, and amount of used and
|
||||||
|
@ -372,12 +389,7 @@ public class FairScheduler extends
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
if (--updatesToSkipForDebug < 0) {
|
if (--updatesToSkipForDebug < 0) {
|
||||||
updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
|
updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
|
||||||
LOG.debug("Cluster Capacity: " + clusterResource +
|
dumpSchedulerState();
|
||||||
" Allocations: " + rootMetrics.getAllocatedResources() +
|
|
||||||
" Availability: " + Resource.newInstance(
|
|
||||||
rootMetrics.getAvailableMB(),
|
|
||||||
rootMetrics.getAvailableVirtualCores()) +
|
|
||||||
" Demand: " + rootQueue.getDemand());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -5148,4 +5148,76 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
Resources.equals(aQueue.getDemand(), maxResource) &&
|
Resources.equals(aQueue.getDemand(), maxResource) &&
|
||||||
Resources.equals(bQueue.getDemand(), maxResource));
|
Resources.equals(bQueue.getDemand(), maxResource));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDumpState() throws IOException {
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"parent\">");
|
||||||
|
out.println(" <queue name=\"child1\">");
|
||||||
|
out.println(" <weight>1</weight>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
ControlledClock clock = new ControlledClock();
|
||||||
|
scheduler.setClock(clock);
|
||||||
|
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
FSLeafQueue child1 =
|
||||||
|
scheduler.getQueueManager().getLeafQueue("parent.child1", false);
|
||||||
|
Resource resource = Resource.newInstance(4 * GB, 4);
|
||||||
|
child1.setMaxShare(resource);
|
||||||
|
FSAppAttempt app = mock(FSAppAttempt.class);
|
||||||
|
Mockito.when(app.getDemand()).thenReturn(resource);
|
||||||
|
Mockito.when(app.getResourceUsage()).thenReturn(resource);
|
||||||
|
child1.addAppSchedulable(app);
|
||||||
|
child1.updateDemand();
|
||||||
|
|
||||||
|
String childQueueString = "{Name: root.parent.child1,"
|
||||||
|
+ " Weight: <memory weight=1.0, cpu weight=1.0>,"
|
||||||
|
+ " Policy: fair,"
|
||||||
|
+ " FairShare: <memory:0, vCores:0>,"
|
||||||
|
+ " SteadyFairShare: <memory:0, vCores:0>,"
|
||||||
|
+ " MaxShare: <memory:4096, vCores:4>,"
|
||||||
|
+ " MinShare: <memory:0, vCores:0>,"
|
||||||
|
+ " ResourceUsage: <memory:4096, vCores:4>,"
|
||||||
|
+ " Demand: <memory:4096, vCores:4>,"
|
||||||
|
+ " Runnable: 1,"
|
||||||
|
+ " NumPendingApps: 0,"
|
||||||
|
+ " NonRunnable: 0,"
|
||||||
|
+ " MaxAMShare: 0.5,"
|
||||||
|
+ " MaxAMResource: <memory:0, vCores:0>,"
|
||||||
|
+ " AMResourceUsage: <memory:0, vCores:0>,"
|
||||||
|
+ " LastTimeAtMinShare: " + clock.getTime()
|
||||||
|
+ "}";
|
||||||
|
|
||||||
|
assertTrue(child1.dumpState().equals(childQueueString));
|
||||||
|
FSParentQueue parent =
|
||||||
|
scheduler.getQueueManager().getParentQueue("parent", false);
|
||||||
|
parent.setMaxShare(resource);
|
||||||
|
parent.updateDemand();
|
||||||
|
|
||||||
|
String parentQueueString = "{Name: root.parent,"
|
||||||
|
+ " Weight: <memory weight=1.0, cpu weight=1.0>,"
|
||||||
|
+ " Policy: fair,"
|
||||||
|
+ " FairShare: <memory:0, vCores:0>,"
|
||||||
|
+ " SteadyFairShare: <memory:0, vCores:0>,"
|
||||||
|
+ " MaxShare: <memory:4096, vCores:4>,"
|
||||||
|
+ " MinShare: <memory:0, vCores:0>,"
|
||||||
|
+ " ResourceUsage: <memory:4096, vCores:4>,"
|
||||||
|
+ " Demand: <memory:4096, vCores:4>,"
|
||||||
|
+ " MaxAMShare: 0.5,"
|
||||||
|
+ " Runnable: 0}";
|
||||||
|
|
||||||
|
assertTrue(parent.dumpState().equals(
|
||||||
|
parentQueueString + ", " + childQueueString));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue