YARN-3966. Fix excessive loggings in CapacityScheduler. (Jian He via wangda)
(cherry picked from commit 4bc42d76e7
)
This commit is contained in:
parent
0c4c7b3b62
commit
cdb61b5fb2
|
@ -330,6 +330,8 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-3974. Refactor the reservation system test cases to use parameterized
|
YARN-3974. Refactor the reservation system test cases to use parameterized
|
||||||
base test. (subru via curino)
|
base test. (subru via curino)
|
||||||
|
|
||||||
|
YARN-3966. Fix excessive loggings in CapacityScheduler. (Jian He via wangda)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||||
|
|
|
@ -215,12 +215,12 @@ public abstract class AbstractYarnScheduler
|
||||||
protected synchronized void containerLaunchedOnNode(
|
protected synchronized void containerLaunchedOnNode(
|
||||||
ContainerId containerId, SchedulerNode node) {
|
ContainerId containerId, SchedulerNode node) {
|
||||||
// Get the application for the finished container
|
// Get the application for the finished container
|
||||||
SchedulerApplicationAttempt application = getCurrentAttemptForContainer
|
SchedulerApplicationAttempt application =
|
||||||
(containerId);
|
getCurrentAttemptForContainer(containerId);
|
||||||
if (application == null) {
|
if (application == null) {
|
||||||
LOG.info("Unknown application "
|
LOG.info("Unknown application " + containerId.getApplicationAttemptId()
|
||||||
+ containerId.getApplicationAttemptId().getApplicationId()
|
.getApplicationId() + " launched container " + containerId
|
||||||
+ " launched container " + containerId + " on node: " + node);
|
+ " on node: " + node);
|
||||||
this.rmContext.getDispatcher().getEventHandler()
|
this.rmContext.getDispatcher().getEventHandler()
|
||||||
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
|
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -795,8 +795,8 @@ public class CapacityScheduler extends
|
||||||
application.getUser(), queue, queue.getActiveUsersManager(), rmContext,
|
application.getUser(), queue, queue.getActiveUsersManager(), rmContext,
|
||||||
application.getPriority());
|
application.getPriority());
|
||||||
if (transferStateFromPreviousAttempt) {
|
if (transferStateFromPreviousAttempt) {
|
||||||
attempt.transferStateFromPreviousAttempt(application
|
attempt.transferStateFromPreviousAttempt(
|
||||||
.getCurrentAppAttempt());
|
application.getCurrentAppAttempt());
|
||||||
}
|
}
|
||||||
application.setCurrentAppAttempt(attempt);
|
application.setCurrentAppAttempt(attempt);
|
||||||
|
|
||||||
|
@ -899,8 +899,6 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
|
FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
|
||||||
if (application == null) {
|
if (application == null) {
|
||||||
LOG.info("Calling allocate on removed " +
|
|
||||||
"or non existant application " + applicationAttemptId);
|
|
||||||
return EMPTY_ALLOCATION;
|
return EMPTY_ALLOCATION;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -921,33 +919,26 @@ public class CapacityScheduler extends
|
||||||
// make sure we aren't stopping/removing the application
|
// make sure we aren't stopping/removing the application
|
||||||
// when the allocate comes in
|
// when the allocate comes in
|
||||||
if (application.isStopped()) {
|
if (application.isStopped()) {
|
||||||
LOG.info("Calling allocate on a stopped " +
|
|
||||||
"application " + applicationAttemptId);
|
|
||||||
return EMPTY_ALLOCATION;
|
return EMPTY_ALLOCATION;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!ask.isEmpty()) {
|
if (!ask.isEmpty()) {
|
||||||
|
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
LOG.debug("allocate: pre-update" +
|
LOG.debug("allocate: pre-update " + applicationAttemptId +
|
||||||
" applicationAttemptId=" + applicationAttemptId +
|
" ask size =" + ask.size());
|
||||||
" application=" + application);
|
application.showRequests();
|
||||||
}
|
}
|
||||||
application.showRequests();
|
|
||||||
|
|
||||||
// Update application requests
|
// Update application requests
|
||||||
if (application.updateResourceRequests(ask)) {
|
if (application.updateResourceRequests(ask)) {
|
||||||
updateDemandForQueue = (LeafQueue) application.getQueue();
|
updateDemandForQueue = (LeafQueue) application.getQueue();
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug("allocate: post-update");
|
if(LOG.isDebugEnabled()) {
|
||||||
application.showRequests();
|
LOG.debug("allocate: post-update");
|
||||||
}
|
application.showRequests();
|
||||||
|
}
|
||||||
if(LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("allocate:" +
|
|
||||||
" applicationAttemptId=" + applicationAttemptId +
|
|
||||||
" #ask=" + ask.size());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||||
|
@ -1018,7 +1009,6 @@ public class CapacityScheduler extends
|
||||||
for (ContainerStatus completedContainer : completedContainers) {
|
for (ContainerStatus completedContainer : completedContainers) {
|
||||||
ContainerId containerId = completedContainer.getContainerId();
|
ContainerId containerId = completedContainer.getContainerId();
|
||||||
RMContainer container = getRMContainer(containerId);
|
RMContainer container = getRMContainer(containerId);
|
||||||
LOG.debug("Container FINISHED: " + containerId);
|
|
||||||
completedContainer(container, completedContainer,
|
completedContainer(container, completedContainer,
|
||||||
RMContainerEventType.FINISHED);
|
RMContainerEventType.FINISHED);
|
||||||
if (container != null) {
|
if (container != null) {
|
||||||
|
@ -1481,9 +1471,6 @@ public class CapacityScheduler extends
|
||||||
queue.completedContainer(clusterResource, application, node,
|
queue.completedContainer(clusterResource, application, node,
|
||||||
rmContainer, containerStatus, event, null, true);
|
rmContainer, containerStatus, event, null, true);
|
||||||
|
|
||||||
LOG.info("Application attempt " + application.getApplicationAttemptId()
|
|
||||||
+ " released container " + container.getId() + " on node: " + node
|
|
||||||
+ " with event: " + event);
|
|
||||||
if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) {
|
if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) {
|
||||||
schedulerHealth.updatePreemption(Time.now(), container.getNodeId(),
|
schedulerHealth.updatePreemption(Time.now(), container.getNodeId(),
|
||||||
container.getId(), queue.getQueuePath());
|
container.getId(), queue.getQueuePath());
|
||||||
|
@ -1783,8 +1770,7 @@ public class CapacityScheduler extends
|
||||||
.equals(DefaultResourceCalculator.class.getName())) {
|
.equals(DefaultResourceCalculator.class.getName())) {
|
||||||
return EnumSet.of(SchedulerResourceTypes.MEMORY);
|
return EnumSet.of(SchedulerResourceTypes.MEMORY);
|
||||||
}
|
}
|
||||||
return EnumSet
|
return EnumSet.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU);
|
||||||
.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1125,12 +1125,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
// Inform the ordering policy
|
// Inform the ordering policy
|
||||||
orderingPolicy.containerReleased(application, rmContainer);
|
orderingPolicy.containerReleased(application, rmContainer);
|
||||||
|
|
||||||
releaseResource(clusterResource, application,
|
releaseResource(clusterResource, application, container.getResource(),
|
||||||
container.getResource(), node.getPartition(), rmContainer);
|
node.getPartition(), rmContainer);
|
||||||
LOG.info("completedContainer" +
|
|
||||||
" container=" + container +
|
|
||||||
" queue=" + this +
|
|
||||||
" cluster=" + clusterResource);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1203,10 +1199,12 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
User user = getUser(userName);
|
User user = getUser(userName);
|
||||||
user.releaseContainer(resource, nodePartition);
|
user.releaseContainer(resource, nodePartition);
|
||||||
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
|
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
|
||||||
|
|
||||||
LOG.info(getQueueName() +
|
if (LOG.isDebugEnabled()) {
|
||||||
" used=" + queueUsage.getUsed() + " numContainers=" + numContainers +
|
LOG.debug(getQueueName() +
|
||||||
" user=" + userName + " user-resources=" + user.getUsed());
|
" used=" + queueUsage.getUsed() + " numContainers=" + numContainers +
|
||||||
|
" user=" + userName + " user-resources=" + user.getUsed());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateAbsoluteCapacityResource(Resource clusterResource) {
|
private void updateAbsoluteCapacityResource(Resource clusterResource) {
|
||||||
|
|
|
@ -629,12 +629,9 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
super.releaseResource(clusterResource, rmContainer.getContainer()
|
super.releaseResource(clusterResource, rmContainer.getContainer()
|
||||||
.getResource(), node.getPartition());
|
.getResource(), node.getPartition());
|
||||||
|
|
||||||
LOG.info("completedContainer" +
|
if (LOG.isDebugEnabled()) {
|
||||||
" queue=" + getQueueName() +
|
LOG.debug("completedContainer " + this + ", cluster=" + clusterResource);
|
||||||
" usedCapacity=" + getUsedCapacity() +
|
}
|
||||||
" absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
|
|
||||||
" used=" + queueUsage.getUsed() +
|
|
||||||
" cluster=" + clusterResource);
|
|
||||||
|
|
||||||
// Note that this is using an iterator on the childQueues so this can't
|
// Note that this is using an iterator on the childQueues so this can't
|
||||||
// be called if already within an iterator for the childQueues. Like
|
// be called if already within an iterator for the childQueues. Like
|
||||||
|
@ -646,8 +643,9 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
CSQueue csqueue = iter.next();
|
CSQueue csqueue = iter.next();
|
||||||
if(csqueue.equals(completedChildQueue)) {
|
if(csqueue.equals(completedChildQueue)) {
|
||||||
iter.remove();
|
iter.remove();
|
||||||
LOG.info("Re-sorting completed queue: " + csqueue.getQueuePath() +
|
if (LOG.isDebugEnabled()) {
|
||||||
" stats: " + csqueue);
|
LOG.debug("Re-sorting completed queue: " + csqueue);
|
||||||
|
}
|
||||||
childQueues.add(csqueue);
|
childQueues.add(csqueue);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,13 +140,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
||||||
|
|
||||||
// Inform the container
|
// Inform the container
|
||||||
rmContainer.handle(
|
rmContainer.handle(
|
||||||
new RMContainerFinishedEvent(
|
new RMContainerFinishedEvent(containerId, containerStatus, event));
|
||||||
containerId,
|
|
||||||
containerStatus,
|
|
||||||
event)
|
|
||||||
);
|
|
||||||
LOG.info("Completed container: " + rmContainer.getContainerId() +
|
|
||||||
" in state: " + rmContainer.getState() + " event:" + event);
|
|
||||||
|
|
||||||
containersToPreempt.remove(rmContainer.getContainerId());
|
containersToPreempt.remove(rmContainer.getContainerId());
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue