From 754cb4e30fac1c5fe8d44626968c0ddbfe459335 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Thu, 20 Oct 2016 21:17:48 -0700 Subject: [PATCH] YARN-5047. Refactor nodeUpdate across schedulers. (Ray Chiang via kasha) --- .../scheduler/AbstractYarnScheduler.java | 186 +++++++++++++++++- .../scheduler/capacity/CapacityScheduler.java | 122 ++---------- .../scheduler/fair/FairScheduler.java | 80 +------- .../scheduler/fifo/FifoScheduler.java | 94 +++------ ...tProportionalCapacityPreemptionPolicy.java | 4 +- 5 files changed, 225 insertions(+), 261 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 645e06dbf54..df595561745 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; @@ -73,7 +74,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.server.utils.Lock; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; @@ -94,10 +100,14 @@ public abstract class AbstractYarnScheduler protected Resource minimumAllocation; protected volatile RMContext rmContext; - + private volatile Priority maxClusterLevelAppPriority; protected ActivitiesManager activitiesManager; + protected SchedulerHealth schedulerHealth = new SchedulerHealth(); + protected volatile long lastNodeUpdateTime; + + private volatile Clock clock; /* * All schedulers which are inheriting AbstractYarnScheduler should use @@ -130,6 +140,7 @@ public abstract class AbstractYarnScheduler */ public AbstractYarnScheduler(String name) { super(name); + clock = SystemClock.getInstance(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); @@ -228,13 +239,25 @@ protected void initMaximumResourceCapability(Resource maximumAllocation) { nodeTracker.setConfiguredMaxAllocation(maximumAllocation); } + public SchedulerHealth getSchedulerHealth() { + return this.schedulerHealth; + } + + protected void setLastNodeUpdateTime(long time) { + this.lastNodeUpdateTime = time; + } + + public long getLastNodeUpdateTime() { + return lastNodeUpdateTime; + } + protected void containerLaunchedOnNode( ContainerId containerId, SchedulerNode node) { try { readLock.lock(); // Get the application for the finished container - SchedulerApplicationAttempt application = getCurrentAttemptForContainer( - containerId); + SchedulerApplicationAttempt application = + getCurrentAttemptForContainer(containerId); if (application == null) { LOG.info("Unknown application " + containerId.getApplicationAttemptId() .getApplicationId() + " launched container " + containerId @@ -249,7 +272,7 @@ protected void containerLaunchedOnNode( readLock.unlock(); } } - + protected void containerIncreasedOnNode(ContainerId containerId, SchedulerNode node, Container increasedContainerReportedByNM) { /* @@ -276,6 +299,7 @@ protected void containerIncreasedOnNode(ContainerId containerId, } rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(containerId, increasedContainerReportedByNM.getResource())); + } public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) { @@ -360,7 +384,7 @@ private void killOrphanContainerOnNode(RMNode node, } } - public void recoverContainersOnNode( + public synchronized void recoverContainersOnNode( List containerReports, RMNode nm) { try { writeLock.lock(); @@ -475,7 +499,7 @@ private RMContainer recoverAndCreateContainer(NMContainerStatus status, } /** - * Recover resource request back from RMContainer when a container is + * Recover resource request back from RMContainer when a container is * preempted before AM pulled the same. If container is pulled by * AM, then RMContainer will not have resource request to recover. * @param rmContainer rmContainer @@ -621,7 +645,7 @@ protected abstract void decreaseContainer( SchedulerApplicationAttempt attempt); @Override - public SchedulerNode getSchedulerNode(NodeId nodeId) { + public N getSchedulerNode(NodeId nodeId) { return nodeTracker.getNode(nodeId); } @@ -832,4 +856,152 @@ public ActivitiesManager getActivitiesManager() { return this.activitiesManager; } + public Clock getClock() { + return clock; + } + + @VisibleForTesting + public void setClock(Clock clock) { + this.clock = clock; + } + + @Lock(Lock.NoLock.class) + public SchedulerNode getNode(NodeId nodeId) { + return nodeTracker.getNode(nodeId); + } + + /** + * Get lists of new containers from NodeManager and process them. + * @param nm The RMNode corresponding to the NodeManager + * @return list of completed containers + */ + protected List updateNewContainerInfo(RMNode nm) { + SchedulerNode node = getNode(nm.getNodeID()); + + List containerInfoList = nm.pullContainerUpdates(); + List newlyLaunchedContainers = + new ArrayList<>(); + List completedContainers = + new ArrayList<>(); + + for(UpdatedContainerInfo containerInfo : containerInfoList) { + newlyLaunchedContainers + .addAll(containerInfo.getNewlyLaunchedContainers()); + completedContainers.addAll(containerInfo.getCompletedContainers()); + } + + // Processing the newly launched containers + for (ContainerStatus launchedContainer : newlyLaunchedContainers) { + containerLaunchedOnNode(launchedContainer.getContainerId(), node); + } + + // Processing the newly increased containers + List newlyIncreasedContainers = + nm.pullNewlyIncreasedContainers(); + for (Container container : newlyIncreasedContainers) { + containerIncreasedOnNode(container.getId(), node, container); + } + + return completedContainers; + } + + /** + * Process completed container list. + * @param completedContainers Extracted list of completed containers + * @param releasedResources Reference resource object for completed containers + * @return The total number of released containers + */ + protected int updateCompletedContainers(List + completedContainers, Resource releasedResources) { + int releasedContainers = 0; + for (ContainerStatus completedContainer : completedContainers) { + ContainerId containerId = completedContainer.getContainerId(); + LOG.debug("Container FINISHED: " + containerId); + RMContainer container = getRMContainer(containerId); + completedContainer(getRMContainer(containerId), + completedContainer, RMContainerEventType.FINISHED); + if (container != null) { + releasedContainers++; + Resource ars = container.getAllocatedResource(); + if (ars != null) { + Resources.addTo(releasedResources, ars); + } + Resource rrs = container.getReservedResource(); + if (rrs != null) { + Resources.addTo(releasedResources, rrs); + } + } + } + return releasedContainers; + } + + /** + * Update schedulerHealth information. + * @param releasedResources Reference resource object for completed containers + * @param releasedContainers Count of released containers + */ + protected void updateSchedulerHealthInformation(Resource releasedResources, + int releasedContainers) { + + schedulerHealth.updateSchedulerReleaseDetails(getLastNodeUpdateTime(), + releasedResources); + schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); + } + + /** + * Update container and utilization information on the NodeManager. + * @param nm The NodeManager to update + */ + protected void updateNodeResourceUtilization(RMNode nm) { + SchedulerNode node = getNode(nm.getNodeID()); + // Updating node resource utilization + node.setAggregatedContainersUtilization( + nm.getAggregatedContainersUtilization()); + node.setNodeUtilization(nm.getNodeUtilization()); + + } + + /** + * Process a heartbeat update from a node. + * @param nm The RMNode corresponding to the NodeManager + */ + protected synchronized void nodeUpdate(RMNode nm) { + if (LOG.isDebugEnabled()) { + LOG.debug("nodeUpdate: " + nm + + " cluster capacity: " + getClusterResource()); + } + + // Process new container information + List completedContainers = updateNewContainerInfo(nm); + + // Process completed containers + Resource releasedResources = Resource.newInstance(0, 0); + int releasedContainers = updateCompletedContainers(completedContainers, + releasedResources); + + // If the node is decommissioning, send an update to have the total + // resource equal to the used resource, so no available resource to + // schedule. + // TODO YARN-5128: Fix possible race-condition when request comes in before + // update is propagated + if (nm.getState() == NodeState.DECOMMISSIONING) { + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption + .newInstance(getSchedulerNode(nm.getNodeID()) + .getAllocatedResource(), 0))); + } + + updateSchedulerHealthInformation(releasedResources, releasedContainers); + updateNodeResourceUtilization(nm); + + // Now node data structures are up-to-date and ready for scheduling. + if(LOG.isDebugEnabled()) { + SchedulerNode node = getNode(nm.getNodeID()); + LOG.debug("Node being looked for scheduling " + nm + + " availableResource: " + node.getUnallocatedResource()); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6d00beee8f4..cfdcb10dece 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -89,8 +88,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; @@ -105,7 +102,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; @@ -235,8 +231,6 @@ public Configuration getConf() { private boolean scheduleAsynchronously; private AsyncScheduleThread asyncSchedulerThread; private RMNodeLabelsManager labelManager; - private SchedulerHealth schedulerHealth = new SchedulerHealth(); - volatile long lastNodeUpdateTime; /** * EXPERT @@ -1099,93 +1093,24 @@ public List getQueueUserAclInfo() { return root.getQueueUserAclInfo(user); } - private void nodeUpdate(RMNode nm) { + @Override + protected synchronized void nodeUpdate(RMNode nm) { try { writeLock.lock(); - if (LOG.isDebugEnabled()) { - LOG.debug( - "nodeUpdate: " + nm + " clusterResources: " + getClusterResource()); - } - - Resource releaseResources = Resource.newInstance(0, 0); - - FiCaSchedulerNode node = getNode(nm.getNodeID()); - - List containerInfoList = nm.pullContainerUpdates(); - List newlyLaunchedContainers = - new ArrayList(); - List completedContainers = - new ArrayList(); - for (UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll( - containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } - - // Processing the newly increased containers - List newlyIncreasedContainers = - nm.pullNewlyIncreasedContainers(); - for (Container container : newlyIncreasedContainers) { - containerIncreasedOnNode(container.getId(), node, container); - } - - // Process completed containers - int releasedContainers = 0; - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - RMContainer container = getRMContainer(containerId); - super.completedContainer(container, completedContainer, - RMContainerEventType.FINISHED); - if (container != null) { - releasedContainers++; - Resource rs = container.getAllocatedResource(); - if (rs != null) { - Resources.addTo(releaseResources, rs); - } - rs = container.getReservedResource(); - if (rs != null) { - Resources.addTo(releaseResources, rs); - } - } - } - - // If the node is decommissioning, send an update to have the total - // resource equal to the used resource, so no available resource to - // schedule. - // TODO: Fix possible race-condition when request comes in before - // update is propagated - if (nm.getState() == NodeState.DECOMMISSIONING) { - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption - .newInstance( - getSchedulerNode(nm.getNodeID()).getAllocatedResource(), - 0))); - } - schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime, - releaseResources); - schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); - - // Updating node resource utilization - node.setAggregatedContainersUtilization( - nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); - - // Now node data structures are upto date and ready for scheduling. - if (LOG.isDebugEnabled()) { - LOG.debug( - "Node being looked for scheduling " + nm + " availableResource: " - + node.getUnallocatedResource()); + setLastNodeUpdateTime(Time.now()); + super.nodeUpdate(nm); + if (!scheduleAsynchronously) { + ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager, + nm.getNodeID()); + allocateContainersToNode(getNode(nm.getNodeID())); + ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager, + nm.getNodeID()); } } finally { writeLock.unlock(); } } - + /** * Process resource update on a node. */ @@ -1458,16 +1383,7 @@ public void handle(SchedulerEvent event) { case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; - RMNode node = nodeUpdatedEvent.getRMNode(); - setLastNodeUpdateTime(Time.now()); - nodeUpdate(node); - if (!scheduleAsynchronously) { - ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager, - node.getNodeID()); - allocateContainersToNode(getNode(node.getNodeID())); - ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager, - node.getNodeID()); - } + nodeUpdate(nodeUpdatedEvent.getRMNode()); } break; case APP_ADDED: @@ -2193,20 +2109,6 @@ public Set getPlanQueues() { return ret; } - @Override - public SchedulerHealth getSchedulerHealth() { - return this.schedulerHealth; - } - - private void setLastNodeUpdateTime(long time) { - this.lastNodeUpdateTime = time; - } - - @Override - public long getLastNodeUpdateTime() { - return lastNodeUpdateTime; - } - @Override public Priority checkAndGetApplicationPriority(Priority priorityFromContext, String user, String queueName, ApplicationId applicationId) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index d33c214ca43..94fdb7c37d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -70,8 +69,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -92,8 +89,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -130,7 +125,6 @@ public class FairScheduler extends private Resource incrAllocation; private QueueManager queueMgr; - private volatile Clock clock; private boolean usePortForNodeName; private static final Log LOG = LogFactory.getLog(FairScheduler.class); @@ -217,7 +211,6 @@ public class FairScheduler extends public FairScheduler() { super(FairScheduler.class.getName()); - clock = SystemClock.getInstance(); allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); maxRunningEnforcer = new MaxRunningAppsEnforcer(this); @@ -383,7 +376,7 @@ protected void update() { * threshold for each type of task. */ private void updateStarvationStats() { - lastPreemptionUpdateTime = clock.getTime(); + lastPreemptionUpdateTime = getClock().getTime(); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { sched.updateStarvationStats(); } @@ -616,15 +609,6 @@ public int getContinuousSchedulingSleepMs() { return continuousSchedulingSleepMs; } - public Clock getClock() { - return clock; - } - - @VisibleForTesting - void setClock(Clock clock) { - this.clock = clock; - } - public FairSchedulerEventLog getEventLog() { return eventLog; } @@ -1053,67 +1037,17 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, preemptionContainerIds, null, null, application.pullUpdatedNMTokens()); } - - /** - * Process a heartbeat update from a node. - */ - private void nodeUpdate(RMNode nm) { + + @Override + protected synchronized void nodeUpdate(RMNode nm) { try { writeLock.lock(); long start = getClock().getTime(); - if (LOG.isDebugEnabled()) { - LOG.debug( - "nodeUpdate: " + nm + " cluster capacity: " + getClusterResource()); - } eventLog.log("HEARTBEAT", nm.getHostName()); - FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID()); + super.nodeUpdate(nm); - List containerInfoList = nm.pullContainerUpdates(); - List newlyLaunchedContainers = - new ArrayList(); - List completedContainers = - new ArrayList(); - for (UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll( - containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } - - // Process completed containers - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - LOG.debug("Container FINISHED: " + containerId); - super.completedContainer(getRMContainer(containerId), - completedContainer, RMContainerEventType.FINISHED); - } - - // If the node is decommissioning, send an update to have the total - // resource equal to the used resource, so no available resource to - // schedule. - if (nm.getState() == NodeState.DECOMMISSIONING) { - this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption - .newInstance( - getSchedulerNode(nm.getNodeID()).getAllocatedResource(), - 0))); - } - - if (continuousSchedulingEnabled) { - if (!completedContainers.isEmpty()) { - attemptScheduling(node); - } - } else{ - attemptScheduling(node); - } - - // Updating node resource utilization - node.setAggregatedContainersUtilization( - nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); + FSSchedulerNode fsNode = getFSSchedulerNode(nm.getNodeID()); + attemptScheduling(fsNode); long duration = getClock().getTime() - start; fsOpDurations.addNodeUpdateDuration(duration); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index e9ffd09eca2..92acf75ef8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -42,14 +42,12 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -69,8 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -385,10 +381,6 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, } } - private FiCaSchedulerNode getNode(NodeId nodeId) { - return nodeTracker.getNode(nodeId); - } - @VisibleForTesting public synchronized void addApplication(ApplicationId applicationId, String queue, String user, boolean isAppRecovering) { @@ -733,66 +725,6 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application return assignedContainers; } - private synchronized void nodeUpdate(RMNode rmNode) { - FiCaSchedulerNode node = getNode(rmNode.getNodeID()); - - List containerInfoList = rmNode.pullContainerUpdates(); - List newlyLaunchedContainers = new ArrayList(); - List completedContainers = new ArrayList(); - for(UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } - - // Process completed containers - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - LOG.debug("Container FINISHED: " + containerId); - super.completedContainer(getRMContainer(containerId), - completedContainer, RMContainerEventType.FINISHED); - } - - // Updating node resource utilization - node.setAggregatedContainersUtilization( - rmNode.getAggregatedContainersUtilization()); - node.setNodeUtilization(rmNode.getNodeUtilization()); - - // If the node is decommissioning, send an update to have the total - // resource equal to the used resource, so no available resource to - // schedule. - if (rmNode.getState() == NodeState.DECOMMISSIONING) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMNodeResourceUpdateEvent(rmNode.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(rmNode.getNodeID()) - .getAllocatedResource(), 0))); - } - - if (rmContext.isWorkPreservingRecoveryEnabled() - && !rmContext.isSchedulerReadyForAllocatingContainers()) { - return; - } - - if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), - node.getUnallocatedResource(), minimumAllocation)) { - LOG.debug("Node heartbeat " + rmNode.getNodeID() + - " available resource = " + node.getUnallocatedResource()); - - assignContainers(node); - - LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = " - + node.getUnallocatedResource()); - } - - updateAvailableResourcesMetrics(); - } - private void increaseUsedResources(RMContainer rmContainer) { Resources.addTo(usedResource, rmContainer.getAllocatedResource()); } @@ -910,7 +842,7 @@ protected synchronized void completedContainerInternal( container.getId().getApplicationAttemptId().getApplicationId(); // Get the node on which the container was allocated - FiCaSchedulerNode node = getNode(container.getNodeId()); + FiCaSchedulerNode node = (FiCaSchedulerNode) getNode(container.getNodeId()); if (application == null) { LOG.info("Unknown application: " + appId + @@ -1025,4 +957,28 @@ protected void decreaseContainer( // TODO Auto-generated method stub } + + @Override + protected synchronized void nodeUpdate(RMNode nm) { + super.nodeUpdate(nm); + + FiCaSchedulerNode node = (FiCaSchedulerNode) getNode(nm.getNodeID()); + if (rmContext.isWorkPreservingRecoveryEnabled() + && !rmContext.isSchedulerReadyForAllocatingContainers()) { + return; + } + + if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), + node.getUnallocatedResource(), minimumAllocation)) { + LOG.debug("Node heartbeat " + nm.getNodeID() + + " available resource = " + node.getUnallocatedResource()); + + assignContainers(node); + + LOG.debug("Node after allocation " + nm.getNodeID() + " resource = " + + node.getUnallocatedResource()); + } + + updateAvailableResourcesMetrics(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index a115aac5c75..b6329b78bff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler .SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; @@ -46,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; @@ -1061,7 +1061,7 @@ private void setResourceAndNodeDetails() { when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn( clusterResources); - SchedulerNode mNode = mock(SchedulerNode.class); + FiCaSchedulerNode mNode = mock(FiCaSchedulerNode.class); when(mNode.getPartition()).thenReturn(RMNodeLabelsManager.NO_LABEL); when(mCS.getSchedulerNode(any(NodeId.class))).thenReturn(mNode); }