YARN-8459. Improve Capacity Scheduler logs to debug invalid states. Contributed by Wangda Tan.
This commit is contained in:
parent
344f324710
commit
51654a3962
|
@ -1234,8 +1234,10 @@ public class CapacityScheduler extends
|
||||||
updateDemandForQueue.getOrderingPolicy().demandUpdated(application);
|
updateDemandForQueue.getOrderingPolicy().demandUpdated(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Allocation for application " + applicationAttemptId + " : " +
|
if (LOG.isDebugEnabled()) {
|
||||||
allocation + " with cluster resource : " + getClusterResource());
|
LOG.info("Allocation for application " + applicationAttemptId + " : "
|
||||||
|
+ allocation + " with cluster resource : " + getClusterResource());
|
||||||
|
}
|
||||||
return allocation;
|
return allocation;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1479,10 +1481,17 @@ public class CapacityScheduler extends
|
||||||
private CSAssignment allocateContainerOnSingleNode(
|
private CSAssignment allocateContainerOnSingleNode(
|
||||||
CandidateNodeSet<FiCaSchedulerNode> candidates, FiCaSchedulerNode node,
|
CandidateNodeSet<FiCaSchedulerNode> candidates, FiCaSchedulerNode node,
|
||||||
boolean withNodeHeartbeat) {
|
boolean withNodeHeartbeat) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(
|
||||||
|
"Trying to schedule on node: " + node.getNodeName() + ", available: "
|
||||||
|
+ node.getUnallocatedResource());
|
||||||
|
}
|
||||||
|
|
||||||
// Backward compatible way to make sure previous behavior which allocation
|
// Backward compatible way to make sure previous behavior which allocation
|
||||||
// driven by node heartbeat works.
|
// driven by node heartbeat works.
|
||||||
if (getNode(node.getNodeID()) != node) {
|
if (getNode(node.getNodeID()) != node) {
|
||||||
LOG.error("Trying to schedule on a removed node, please double check.");
|
LOG.error("Trying to schedule on a removed node, please double check, "
|
||||||
|
+ "nodeId=" + node.getNodeID());
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1496,14 +1505,19 @@ public class CapacityScheduler extends
|
||||||
FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer(
|
FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer(
|
||||||
reservedContainer.getContainerId());
|
reservedContainer.getContainerId());
|
||||||
if (reservedApplication == null) {
|
if (reservedApplication == null) {
|
||||||
LOG.error("Trying to schedule for a finished app, please double check.");
|
LOG.error(
|
||||||
|
"Trying to schedule for a finished app, please double check. nodeId="
|
||||||
|
+ node.getNodeID() + " container=" + reservedContainer
|
||||||
|
.getContainerId());
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to fulfill the reservation
|
// Try to fulfill the reservation
|
||||||
LOG.info(
|
if (LOG.isDebugEnabled()) {
|
||||||
"Trying to fulfill reservation for application " + reservedApplication
|
LOG.debug("Trying to fulfill reservation for application "
|
||||||
.getApplicationId() + " on node: " + node.getNodeID());
|
+ reservedApplication.getApplicationId() + " on node: " + node
|
||||||
|
.getNodeID());
|
||||||
|
}
|
||||||
|
|
||||||
LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
|
LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
|
||||||
assignment = queue.assignContainers(getClusterResource(), candidates,
|
assignment = queue.assignContainers(getClusterResource(), candidates,
|
||||||
|
@ -1567,12 +1581,6 @@ public class CapacityScheduler extends
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(
|
|
||||||
"Trying to schedule on node: " + node.getNodeName() + ", available: "
|
|
||||||
+ node.getUnallocatedResource());
|
|
||||||
}
|
|
||||||
|
|
||||||
return allocateOrReserveNewContainers(candidates, withNodeHeartbeat);
|
return allocateOrReserveNewContainers(candidates, withNodeHeartbeat);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2888,6 +2896,11 @@ public class CapacityScheduler extends
|
||||||
LOG.info("Failed to accept allocation proposal");
|
LOG.info("Failed to accept allocation proposal");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Allocation proposal accepted=" + isSuccess + ", proposal="
|
||||||
|
+ request);
|
||||||
|
}
|
||||||
|
|
||||||
// Update unconfirmed allocated resource.
|
// Update unconfirmed allocated resource.
|
||||||
if (updateUnconfirmedAllocatedResource) {
|
if (updateUnconfirmedAllocatedResource) {
|
||||||
app.decUnconfirmedRes(request.getTotalAllocatedResource());
|
app.decUnconfirmedRes(request.getTotalAllocatedResource());
|
||||||
|
|
|
@ -90,6 +90,8 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
private QueueOrderingPolicy queueOrderingPolicy;
|
private QueueOrderingPolicy queueOrderingPolicy;
|
||||||
|
|
||||||
|
private long lastSkipQueueDebugLoggingTimestamp = -1;
|
||||||
|
|
||||||
public ParentQueue(CapacitySchedulerContext cs,
|
public ParentQueue(CapacitySchedulerContext cs,
|
||||||
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
||||||
super(cs, queueName, parent, old);
|
super(cs, queueName, parent, old);
|
||||||
|
@ -539,9 +541,14 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
|
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
|
||||||
&& !accessibleToPartition(candidates.getPartition())) {
|
&& !accessibleToPartition(candidates.getPartition())) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
// Do logging every 1 sec to avoid excessive logging.
|
||||||
|
if (now - this.lastSkipQueueDebugLoggingTimestamp > 1000) {
|
||||||
LOG.debug("Skip this queue=" + getQueuePath()
|
LOG.debug("Skip this queue=" + getQueuePath()
|
||||||
+ ", because it is not able to access partition=" + candidates
|
+ ", because it is not able to access partition=" + candidates
|
||||||
.getPartition());
|
.getPartition());
|
||||||
|
this.lastSkipQueueDebugLoggingTimestamp = now;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
||||||
|
@ -561,10 +568,15 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
if (!super.hasPendingResourceRequest(candidates.getPartition(),
|
if (!super.hasPendingResourceRequest(candidates.getPartition(),
|
||||||
clusterResource, schedulingMode)) {
|
clusterResource, schedulingMode)) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
// Do logging every 1 sec to avoid excessive logging.
|
||||||
|
if (now - this.lastSkipQueueDebugLoggingTimestamp > 1000) {
|
||||||
LOG.debug("Skip this queue=" + getQueuePath()
|
LOG.debug("Skip this queue=" + getQueuePath()
|
||||||
+ ", because it doesn't need more resource, schedulingMode="
|
+ ", because it doesn't need more resource, schedulingMode="
|
||||||
+ schedulingMode.name() + " node-partition=" + candidates
|
+ schedulingMode.name() + " node-partition=" + candidates
|
||||||
.getPartition());
|
.getPartition());
|
||||||
|
this.lastSkipQueueDebugLoggingTimestamp = now;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
||||||
|
@ -666,12 +678,12 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
assignment.setIncreasedAllocation(
|
assignment.setIncreasedAllocation(
|
||||||
assignedToChild.isIncreasedAllocation());
|
assignedToChild.isIncreasedAllocation());
|
||||||
|
|
||||||
LOG.info("assignedContainer" + " queue=" + getQueueName()
|
|
||||||
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
|
|
||||||
+ getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed()
|
|
||||||
+ " cluster=" + clusterResource);
|
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("assignedContainer reserved=" + isReserved + " queue="
|
||||||
|
+ getQueueName() + " usedCapacity=" + getUsedCapacity()
|
||||||
|
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
|
||||||
|
+ queueUsage.getUsed() + " cluster=" + clusterResource);
|
||||||
|
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
"ParentQ=" + getQueueName() + " assignedSoFarInThisIteration="
|
"ParentQ=" + getQueueName() + " assignedSoFarInThisIteration="
|
||||||
+ assignment.getResource() + " usedCapacity="
|
+ assignment.getResource() + " usedCapacity="
|
||||||
|
|
|
@ -93,11 +93,14 @@ public abstract class AbstractContainerAllocator {
|
||||||
assignment.setType(result.getContainerNodeType());
|
assignment.setType(result.getContainerNodeType());
|
||||||
|
|
||||||
if (result.getAllocationState() == AllocationState.RESERVED) {
|
if (result.getAllocationState() == AllocationState.RESERVED) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
// This is a reserved container
|
// This is a reserved container
|
||||||
LOG.info("Reserved container " + " application="
|
// Since re-reservation could happen again and again for already
|
||||||
+ application.getApplicationId() + " resource=" + allocatedResource
|
// reserved containers. only do this in debug log.
|
||||||
+ " queue=" + appInfo.getQueueName()
|
LOG.debug("Reserved container " + " application=" + application
|
||||||
+ " cluster=" + clusterResource);
|
.getApplicationId() + " resource=" + allocatedResource + " queue="
|
||||||
|
+ appInfo.getQueueName() + " cluster=" + clusterResource);
|
||||||
|
}
|
||||||
assignment.getAssignmentInformation().addReservationDetails(
|
assignment.getAssignmentInformation().addReservationDetails(
|
||||||
updatedContainer, application.getCSLeafQueue().getQueuePath());
|
updatedContainer, application.getCSLeafQueue().getQueuePath());
|
||||||
assignment.getAssignmentInformation().incrReservations();
|
assignment.getAssignmentInformation().incrReservations();
|
||||||
|
|
|
@ -607,6 +607,11 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
||||||
schedulerContainer.getRmContainer(),
|
schedulerContainer.getRmContainer(),
|
||||||
schedulerContainer.getRmContainer().getContainer(),
|
schedulerContainer.getRmContainer().getContainer(),
|
||||||
reReservation);
|
reReservation);
|
||||||
|
|
||||||
|
LOG.info("Reserved container=" + rmContainer.getContainerId()
|
||||||
|
+ ", on node=" + schedulerContainer.getSchedulerNode()
|
||||||
|
+ " with resource=" + rmContainer
|
||||||
|
.getAllocatedOrReservedResource());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
Loading…
Reference in New Issue