From d6c36c295446218748aeb7c61c25c01009211cfa Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 19 Sep 2016 20:37:22 +0000 Subject: [PATCH] YARN-5540. Scheduler spends too much time looking at empty priorities. Contributed by Jason Lowe --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/AppSchedulingInfo.java | 22 ++--- .../scheduler/TestAppSchedulingInfo.java | 95 +++++++++++++++++++ 3 files changed, 104 insertions(+), 16 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a976bc4a8d1..5a80210224a 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -16,6 +16,9 @@ Release 2.7.4 - UNRELEASED YARN-5483. Optimize RMAppAttempt#pullJustFinishedContainers (sandflee via jlowe) + YARN-5540. Scheduler spends too much time looking at empty priorities + (jlowe) + BUG FIXES YARN-5197. RM leaks containers if running container disappears from diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 7358f03c821..7cd0d50867e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -20,14 +20,12 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -63,7 +61,7 @@ public class AppSchedulingInfo { private final AtomicLong containerIdCounter; private final int EPOCH_BIT_SHIFT = 40; - final Set priorities = new TreeSet( + final Set priorities = new ConcurrentSkipListSet( new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator()); final Map> requests = new ConcurrentHashMap>(); @@ -154,6 +152,7 @@ synchronized public void updateResourceRequests( // Thus we don't need another loop ala the one in decrementOutstanding() // which is needed during deactivate. if (request.getNumContainers() > 0) { + priorities.add(priority); activeUsersManager.activateApplication(user, applicationId); } } @@ -163,7 +162,6 @@ synchronized public void updateResourceRequests( if (asks == null) { asks = new ConcurrentHashMap(); this.requests.put(priority, asks); - this.priorities.add(priority); } lastRequest = asks.get(resourceName); @@ -178,6 +176,7 @@ synchronized public void updateResourceRequests( // Similarly, deactivate application? if (request.getNumContainers() <= 0) { + priorities.remove(priority); LOG.info("checking for deactivate of application :" + this.applicationId); checkForDeactivation(); @@ -376,22 +375,13 @@ synchronized private void decrementOutstanding( // Do we have any outstanding requests? // If there is nothing, we need to deactivate this application if (numOffSwitchContainers == 0) { + priorities.remove(offSwitchRequest.getPriority()); checkForDeactivation(); } } synchronized private void checkForDeactivation() { - boolean deactivate = true; - for (Priority priority : getPriorities()) { - ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY); - if (request != null) { - if (request.getNumContainers() > 0) { - deactivate = false; - break; - } - } - } - if (deactivate) { + if (priorities.isEmpty()) { activeUsersManager.deactivateApplication(user, applicationId); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java new file mode 100644 index 00000000000..e4ed06eff9a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doReturn; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.junit.Assert; +import org.junit.Test; + +public class TestAppSchedulingInfo { + + @Test + public void testSchedulerKeyAccounting() { + ApplicationId appIdImpl = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appIdImpl, 1); + + Queue queue = mock(Queue.class); + doReturn(mock(QueueMetrics.class)).when(queue).getMetrics(); + AppSchedulingInfo info = new AppSchedulingInfo( + appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0); + Assert.assertEquals(0, info.getPriorities().size()); + + Priority pri1 = Priority.newInstance(1); + ResourceRequest req1 = ResourceRequest.newInstance(pri1, + ResourceRequest.ANY, Resource.newInstance(1024, 1), 1); + Priority pri2 = Priority.newInstance(2); + ResourceRequest req2 = ResourceRequest.newInstance(pri2, + ResourceRequest.ANY, Resource.newInstance(1024, 1), 2); + List reqs = new ArrayList<>(); + reqs.add(req1); + reqs.add(req2); + info.updateResourceRequests(reqs, false); + ArrayList priorities = new ArrayList<>(info.getPriorities()); + Assert.assertEquals(2, priorities.size()); + Assert.assertEquals(req1.getPriority(), priorities.get(0)); + Assert.assertEquals(req2.getPriority(), priorities.get(1)); + + // iterate to verify no ConcurrentModificationException + for (Priority priority: info.getPriorities()) { + info.allocate(NodeType.OFF_SWITCH, null, priority, req1, null); + } + Assert.assertEquals(1, info.getPriorities().size()); + Assert.assertEquals(req2.getPriority(), + info.getPriorities().iterator().next()); + + req2 = ResourceRequest.newInstance(pri2, + ResourceRequest.ANY, Resource.newInstance(1024, 1), 1); + reqs.clear(); + reqs.add(req2); + info.updateResourceRequests(reqs, false); + info.allocate(NodeType.OFF_SWITCH, null, req2.getPriority(), req2, null); + Assert.assertEquals(0, info.getPriorities().size()); + + req1 = ResourceRequest.newInstance(pri1, + ResourceRequest.ANY, Resource.newInstance(1024, 1), 5); + reqs.clear(); + reqs.add(req1); + info.updateResourceRequests(reqs, false); + Assert.assertEquals(1, info.getPriorities().size()); + Assert.assertEquals(req1.getPriority(), + info.getPriorities().iterator().next()); + req1 = ResourceRequest.newInstance(pri1, + ResourceRequest.ANY, Resource.newInstance(1024, 1), 0); + reqs.clear(); + reqs.add(req1); + info.updateResourceRequests(reqs, false); + Assert.assertEquals(0, info.getPriorities().size()); + } +}