diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index c6773458ab6..39820f7860b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -26,8 +26,8 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -59,7 +59,6 @@ public class AppSchedulingInfo { private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class); - private static final int EPOCH_BIT_SHIFT = 40; private final ApplicationId applicationId; private final ApplicationAttemptId applicationAttemptId; @@ -79,7 +78,8 @@ public class AppSchedulingInfo { private Set requestedPartitions = new HashSet<>(); - final Set schedulerKeys = new TreeSet<>(); + private final ConcurrentSkipListMap + schedulerKeys = new ConcurrentSkipListMap<>(); final Map> resourceRequestMap = new ConcurrentHashMap<>(); final Map(); requestsOnNode.put(schedulerKey, requestsOnNodeWithPriority); + incrementSchedulerKeyReference(schedulerKey); } requestsOnNodeWithPriority.put(containerId, request); @@ -250,11 +251,30 @@ private void insertIncreaseRequest(SchedContainerChangeRequest request) { LOG.debug("Added increase request:" + request.getContainerId() + " delta=" + delta); } - - // update Scheduler Keys - schedulerKeys.add(schedulerKey); } - + + private void incrementSchedulerKeyReference( + SchedulerRequestKey schedulerKey) { + Integer schedulerKeyCount = schedulerKeys.get(schedulerKey); + if (schedulerKeyCount == null) { + schedulerKeys.put(schedulerKey, 1); + } else { + schedulerKeys.put(schedulerKey, schedulerKeyCount + 1); + } + } + + private void decrementSchedulerKeyReference( + SchedulerRequestKey schedulerKey) { + Integer schedulerKeyCount = schedulerKeys.get(schedulerKey); + if (schedulerKeyCount != null) { + if (schedulerKeyCount > 1) { + schedulerKeys.put(schedulerKey, schedulerKeyCount - 1); + } else { + schedulerKeys.remove(schedulerKey); + } + } + } + public synchronized boolean removeIncreaseRequest(NodeId nodeId, SchedulerRequestKey schedulerKey, ContainerId containerId) { Map> @@ -275,6 +295,7 @@ public synchronized boolean removeIncreaseRequest(NodeId nodeId, // remove hierarchies if it becomes empty if (requestsOnNodeWithPriority.isEmpty()) { requestsOnNode.remove(schedulerKey); + decrementSchedulerKeyReference(schedulerKey); } if (requestsOnNode.isEmpty()) { containerIncreaseRequestMap.remove(nodeId); @@ -341,7 +362,6 @@ public synchronized boolean updateResourceRequests( if (asks == null) { asks = new ConcurrentHashMap<>(); this.resourceRequestMap.put(schedulerKey, asks); - this.schedulerKeys.add(schedulerKey); } // Increment number of containers if recovering preempted resources @@ -360,29 +380,34 @@ public synchronized boolean updateResourceRequests( anyResourcesUpdated = true; - // Activate application. Metrics activation is done here. - // TODO: Shouldn't we activate even if numContainers = 0? - if (request.getNumContainers() > 0) { - activeUsersManager.activateApplication(user, applicationId); - } - // Update pendingResources - updatePendingResources(lastRequest, request, queue.getMetrics()); + updatePendingResources(lastRequest, request, schedulerKey, + queue.getMetrics()); } } return anyResourcesUpdated; } private void updatePendingResources(ResourceRequest lastRequest, - ResourceRequest request, QueueMetrics metrics) { + ResourceRequest request, SchedulerRequestKey schedulerKey, + QueueMetrics metrics) { + int lastRequestContainers = + (lastRequest != null) ? lastRequest.getNumContainers() : 0; if (request.getNumContainers() <= 0) { + if (lastRequestContainers >= 0) { + decrementSchedulerKeyReference(schedulerKey); + } LOG.info("checking for deactivate of application :" + this.applicationId); checkForDeactivation(); + } else { + // Activate application. Metrics activation is done here. + if (lastRequestContainers <= 0) { + incrementSchedulerKeyReference(schedulerKey); + activeUsersManager.activateApplication(user, applicationId); + } } - int lastRequestContainers = - (lastRequest != null) ? lastRequest.getNumContainers() : 0; Resource lastRequestCapability = lastRequest != null ? lastRequest.getCapability() : Resources.none(); metrics.incrPendingResources(user, @@ -505,7 +530,7 @@ public boolean getAndResetBlacklistChanged() { } public synchronized Collection getSchedulerKeys() { - return schedulerKeys; + return schedulerKeys.keySet(); } public synchronized Map getResourceRequests( @@ -617,7 +642,7 @@ public synchronized List allocate(NodeType type, } else if (type == NodeType.RACK_LOCAL) { allocateRackLocal(node, schedulerKey, request, resourceRequests); } else { - allocateOffSwitch(request, resourceRequests); + allocateOffSwitch(request, resourceRequests, schedulerKey); } QueueMetrics metrics = queue.getMetrics(); if (pending) { @@ -656,7 +681,7 @@ private synchronized void allocateNodeLocal(SchedulerNode node, ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get( ResourceRequest.ANY); - decrementOutstanding(offRackRequest); + decrementOutstanding(offRackRequest, schedulerKey); // Update cloned NodeLocal, RackLocal and OffRack requests for recovery resourceRequests.add(cloneResourceRequest(nodeLocalRequest)); @@ -684,7 +709,7 @@ private synchronized void allocateRackLocal(SchedulerNode node, ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get( ResourceRequest.ANY); - decrementOutstanding(offRackRequest); + decrementOutstanding(offRackRequest, schedulerKey); // Update cloned RackLocal and OffRack requests for recovery resourceRequests.add(cloneResourceRequest(rackLocalRequest)); @@ -696,15 +721,16 @@ private synchronized void allocateRackLocal(SchedulerNode node, * application. */ private synchronized void allocateOffSwitch( - ResourceRequest offSwitchRequest, List resourceRequests) { + ResourceRequest offSwitchRequest, List resourceRequests, + SchedulerRequestKey schedulerKey) { // Update future requirements - decrementOutstanding(offSwitchRequest); + decrementOutstanding(offSwitchRequest, schedulerKey); // Update cloned OffRack requests for recovery resourceRequests.add(cloneResourceRequest(offSwitchRequest)); } private synchronized void decrementOutstanding( - ResourceRequest offSwitchRequest) { + ResourceRequest offSwitchRequest, SchedulerRequestKey schedulerKey) { int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1; // Do not remove ANY @@ -713,6 +739,7 @@ private synchronized void decrementOutstanding( // Do we have any outstanding requests? // If there is nothing, we need to deactivate this application if (numOffSwitchContainers == 0) { + decrementSchedulerKeyReference(schedulerKey); checkForDeactivation(); } @@ -723,24 +750,7 @@ private synchronized void decrementOutstanding( } private synchronized void checkForDeactivation() { - boolean deactivate = true; - for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) { - ResourceRequest request = - getResourceRequest(schedulerKey, ResourceRequest.ANY); - if (request != null) { - if (request.getNumContainers() > 0) { - deactivate = false; - break; - } - } - } - - // also we need to check increase request - if (!deactivate) { - deactivate = containerIncreaseRequestMap.isEmpty(); - } - - if (deactivate) { + if (schedulerKeys.isEmpty()) { activeUsersManager.deactivateApplication(user, applicationId); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java index 503ea34b06b..7f9c71977d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -23,11 +23,14 @@ import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.TreeSet; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; import org.junit.Assert; @@ -105,4 +108,66 @@ public void testSchedulerRequestKeyOrdering() { Assert.assertEquals(2, sk.getPriority().getPriority()); Assert.assertEquals(6, sk.getAllocationRequestId()); } + + @Test + public void testSchedulerKeyAccounting() { + ApplicationId appIdImpl = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appIdImpl, 1); + + Queue queue = mock(Queue.class); + doReturn(mock(QueueMetrics.class)).when(queue).getMetrics(); + AppSchedulingInfo info = new AppSchedulingInfo( + appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0, + new ResourceUsage()); + Assert.assertEquals(0, info.getSchedulerKeys().size()); + + Priority pri1 = Priority.newInstance(1); + ResourceRequest req1 = ResourceRequest.newInstance(pri1, + ResourceRequest.ANY, Resource.newInstance(1024, 1), 1); + Priority pri2 = Priority.newInstance(2); + ResourceRequest req2 = ResourceRequest.newInstance(pri2, + ResourceRequest.ANY, Resource.newInstance(1024, 1), 2); + List reqs = new ArrayList<>(); + reqs.add(req1); + reqs.add(req2); + info.updateResourceRequests(reqs, false); + ArrayList keys = + new ArrayList<>(info.getSchedulerKeys()); + Assert.assertEquals(2, keys.size()); + Assert.assertEquals(SchedulerRequestKey.create(req1), keys.get(0)); + Assert.assertEquals(SchedulerRequestKey.create(req2), keys.get(1)); + + // iterate to verify no ConcurrentModificationException + for (SchedulerRequestKey schedulerKey : info.getSchedulerKeys()) { + info.allocate(NodeType.OFF_SWITCH, null, schedulerKey, req1, null); + } + Assert.assertEquals(1, info.getSchedulerKeys().size()); + Assert.assertEquals(SchedulerRequestKey.create(req2), + info.getSchedulerKeys().iterator().next()); + + req2 = ResourceRequest.newInstance(pri2, + ResourceRequest.ANY, Resource.newInstance(1024, 1), 1); + reqs.clear(); + reqs.add(req2); + info.updateResourceRequests(reqs, false); + info.allocate(NodeType.OFF_SWITCH, null, SchedulerRequestKey.create(req2), + req2, null); + Assert.assertEquals(0, info.getSchedulerKeys().size()); + + req1 = ResourceRequest.newInstance(pri1, + ResourceRequest.ANY, Resource.newInstance(1024, 1), 5); + reqs.clear(); + reqs.add(req1); + info.updateResourceRequests(reqs, false); + Assert.assertEquals(1, info.getSchedulerKeys().size()); + Assert.assertEquals(SchedulerRequestKey.create(req1), + info.getSchedulerKeys().iterator().next()); + req1 = ResourceRequest.newInstance(pri1, + ResourceRequest.ANY, Resource.newInstance(1024, 1), 0); + reqs.clear(); + reqs.add(req1); + info.updateResourceRequests(reqs, false); + Assert.assertEquals(0, info.getSchedulerKeys().size()); + } }