From 10732d515f62258309f98e4d7d23249f80b1847d Mon Sep 17 00:00:00 2001 From: Jian He Date: Tue, 26 May 2015 12:00:51 -0700 Subject: [PATCH] YARN-3632. Ordering policy should be allowed to reorder an application when demand changes. Contributed by Craig Welch --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/AppSchedulingInfo.java | 7 +- .../SchedulerApplicationAttempt.java | 5 +- .../scheduler/capacity/CapacityScheduler.java | 19 ++- .../AbstractComparatorOrderingPolicy.java | 25 ++++ .../scheduler/policy/FairOrderingPolicy.java | 11 +- .../scheduler/policy/FifoOrderingPolicy.java | 6 +- .../scheduler/policy/OrderingPolicy.java | 5 + .../capacity/TestCapacityScheduler.java | 115 ++++++++++++++++++ 9 files changed, 187 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 481c5a9fc11..6cdd9f3a5aa 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -443,6 +443,9 @@ Release 2.8.0 - UNRELEASED YARN-3707. RM Web UI queue filter doesn't work. (Wangda Tan via jianhe) + YARN-3632. Ordering policy should be allowed to reorder an application when + demand changes. (Craig Welch via jianhe) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 5604f0f3396..77ac5b3e640 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -128,11 +128,14 @@ public class AppSchedulingInfo { * * @param requests resources to be acquired * @param recoverPreemptedRequest recover Resource Request on preemption + * @return true if any resource was updated, false else */ - synchronized public void updateResourceRequests( + synchronized public boolean updateResourceRequests( List requests, boolean recoverPreemptedRequest) { QueueMetrics metrics = queue.getMetrics(); + boolean anyResourcesUpdated = false; + // Update resource requests for (ResourceRequest request : requests) { Priority priority = request.getPriority(); @@ -146,6 +149,7 @@ public class AppSchedulingInfo { + request); } updatePendingResources = true; + anyResourcesUpdated = true; // Premature optimization? // Assumes that we won't see more than one priority request updated @@ -209,6 +213,7 @@ public class AppSchedulingInfo { } } } + return anyResourcesUpdated; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 0554c041380..dbc3cb57139 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -284,11 +284,12 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { return queue; } - public synchronized void updateResourceRequests( + public synchronized boolean updateResourceRequests( List requests) { if (!isStopped) { - appSchedulingInfo.updateResourceRequests(requests, false); + return appSchedulingInfo.updateResourceRequests(requests, false); } + return false; } public synchronized void recoverResourceRequests( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 48c7f2fd4a3..06d282df148 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -895,6 +895,10 @@ public class CapacityScheduler extends // Release containers releaseContainers(release, application); + Allocation allocation; + + LeafQueue updateDemandForQueue = null; + synchronized (application) { // make sure we aren't stopping/removing the application @@ -915,8 +919,10 @@ public class CapacityScheduler extends application.showRequests(); // Update application requests - application.updateResourceRequests(ask); - + if (application.updateResourceRequests(ask)) { + updateDemandForQueue = (LeafQueue) application.getQueue(); + } + LOG.debug("allocate: post-update"); application.showRequests(); } @@ -929,9 +935,16 @@ public class CapacityScheduler extends application.updateBlacklist(blacklistAdditions, blacklistRemovals); - return application.getAllocation(getResourceCalculator(), + allocation = application.getAllocation(getResourceCalculator(), clusterResource, getMinimumResourceCapability()); } + + if (updateDemandForQueue != null) { + updateDemandForQueue.getOrderingPolicy().demandUpdated(application); + } + + return allocation; + } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java index e046fcf66bc..c4d2aae318b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java @@ -37,6 +37,7 @@ public abstract class AbstractComparatorOrderingPolicy schedulableEntities; protected Comparator comparator; + protected Map entitiesToReorder = new HashMap(); public AbstractComparatorOrderingPolicy() { } @@ -47,11 +48,13 @@ public abstract class AbstractComparatorOrderingPolicy getAssignmentIterator() { + reorderScheduleEntities(); return schedulableEntities.iterator(); } @Override public Iterator getPreemptionIterator() { + reorderScheduleEntities(); return schedulableEntities.descendingIterator(); } @@ -68,6 +71,22 @@ public abstract class AbstractComparatorOrderingPolicy entry : + entitiesToReorder.entrySet()) { + reorderSchedulableEntity(entry.getValue()); + } + entitiesToReorder.clear(); + } + } + + protected void entityRequiresReordering(S schedulableEntity) { + synchronized (entitiesToReorder) { + entitiesToReorder.put(schedulableEntity.getId(), schedulableEntity); + } + } + @VisibleForTesting public Comparator getComparator() { return comparator; @@ -80,6 +99,9 @@ public abstract class AbstractComparatorOrderingPolicy extends AbstractCom @Override public void containerAllocated(S schedulableEntity, RMContainer r) { - reorderSchedulableEntity(schedulableEntity); + entityRequiresReordering(schedulableEntity); } @Override public void containerReleased(S schedulableEntity, RMContainer r) { - reorderSchedulableEntity(schedulableEntity); + entityRequiresReordering(schedulableEntity); } + @Override + public void demandUpdated(S schedulableEntity) { + if (sizeBasedWeight) { + entityRequiresReordering(schedulableEntity); + } + } + @Override public String getInfo() { String sbw = sizeBasedWeight ? " with sizeBasedWeight" : ""; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java index 932a5f9e19c..74a422cfbc6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java @@ -46,7 +46,11 @@ public class FifoOrderingPolicy extends AbstractCom public void containerReleased(S schedulableEntity, RMContainer r) { } - + + @Override + public void demandUpdated(S schedulableEntity) { + } + @Override public String getInfo() { return "FifoOrderingPolicy"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java index aebdcdee990..e3f67ce5660 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java @@ -101,6 +101,11 @@ public interface OrderingPolicy { public void containerReleased(S schedulableEntity, RMContainer r); + /** + * Demand Updated for the passed schedulableEntity, reorder if needed. + */ + void demandUpdated(S schedulableEntity); + /** * Display information regarding configuration & status */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index d36058156bb..7b665e435f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -72,6 +72,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.nodelabels.RMNodeLabel; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; @@ -126,6 +128,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedule import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -676,6 +679,118 @@ public class TestCapacityScheduler { rm.stop(); } + @Test + public void testAllocateReorder() throws Exception { + + //Confirm that allocation (resource request) alone will trigger a change in + //application ordering where appropriate + + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + LeafQueue q = (LeafQueue) cs.getQueue("default"); + Assert.assertNotNull(q); + + FairOrderingPolicy fop = new FairOrderingPolicy(); + fop.setSizeBasedWeight(true); + q.setOrderingPolicy(fop); + + String host = "127.0.0.1"; + RMNode node = + MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host); + cs.handle(new NodeAddedSchedulerEvent(node)); + + //add app begin + ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1); + ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId( + appId1, 1); + + RMAppAttemptMetrics attemptMetric1 = + new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext()); + RMAppImpl app1 = mock(RMAppImpl.class); + when(app1.getApplicationId()).thenReturn(appId1); + RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class); + when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1); + when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1); + when(app1.getCurrentAppAttempt()).thenReturn(attempt1); + + rm.getRMContext().getRMApps().put(appId1, app1); + + SchedulerEvent addAppEvent1 = + new AppAddedSchedulerEvent(appId1, "default", "user"); + cs.handle(addAppEvent1); + SchedulerEvent addAttemptEvent1 = + new AppAttemptAddedSchedulerEvent(appAttemptId1, false); + cs.handle(addAttemptEvent1); + //add app end + + //add app begin + ApplicationId appId2 = BuilderUtils.newApplicationId(100, 2); + ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId( + appId2, 1); + + RMAppAttemptMetrics attemptMetric2 = + new RMAppAttemptMetrics(appAttemptId2, rm.getRMContext()); + RMAppImpl app2 = mock(RMAppImpl.class); + when(app2.getApplicationId()).thenReturn(appId2); + RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class); + when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2); + when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2); + when(app2.getCurrentAppAttempt()).thenReturn(attempt2); + + rm.getRMContext().getRMApps().put(appId2, app2); + + SchedulerEvent addAppEvent2 = + new AppAddedSchedulerEvent(appId2, "default", "user"); + cs.handle(addAppEvent2); + SchedulerEvent addAttemptEvent2 = + new AppAttemptAddedSchedulerEvent(appAttemptId2, false); + cs.handle(addAttemptEvent2); + //add app end + + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + Priority priority = TestUtils.createMockPriority(1); + ResourceRequest r1 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory); + + //This will allocate for app1 + cs.allocate(appAttemptId1, + Collections.singletonList(r1), + Collections.emptyList(), + null, null); + + //And this will result in container assignment for app1 + CapacityScheduler.schedule(cs); + + //Verify that app1 is still first in assignment order + //This happens because app2 has no demand/a magnitude of NaN, which + //results in app1 and app2 being equal in the fairness comparison and + //failling back to fifo (start) ordering + assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(), + appId1.toString()); + + //Now, allocate for app2 (this would be the first/AM allocation) + ResourceRequest r2 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId2, + Collections.singletonList(r2), + Collections.emptyList(), + null, null); + + //In this case we do not perform container assignment because we want to + //verify re-ordering based on the allocation alone + + //Now, the first app for assignment is app2 + assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(), + appId2.toString()); + + rm.stop(); + } + @Test public void testResourceOverCommit() throws Exception { Configuration conf = new Configuration();