From 109e528ef5d8df07443373751266b4417acc981a Mon Sep 17 00:00:00 2001 From: Jian He Date: Fri, 8 Jan 2016 15:51:10 -0800 Subject: [PATCH] YARN-4479. Change CS LeafQueue pendingOrderingPolicy to hornor recovered apps. Contributed by Rohith Sharma K S --- hadoop-yarn-project/CHANGES.txt | 3 + .../dev-support/findbugs-exclude.xml | 2 + .../SchedulerApplicationAttempt.java | 9 + .../scheduler/capacity/CapacityScheduler.java | 2 +- .../scheduler/capacity/LeafQueue.java | 63 ++++++- .../common/fica/FiCaSchedulerApp.java | 5 +- .../capacity/TestApplicationPriority.java | 164 ++++++++++++++++++ .../scheduler/capacity/TestLeafQueue.java | 6 +- 8 files changed, 241 insertions(+), 13 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 00d31d8d195..b896b06dd3f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1229,6 +1229,9 @@ Release 2.8.0 - UNRELEASED YARN-4546. ResourceManager crash due to scheduling opportunity overflow. (Jason Lowe via junping_du) + YARN-4479. Change CS LeafQueue pendingOrderingPolicy to hornor recovered apps. + (Rohith Sharma K S via jianhe) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 2d0d5d6211b..c79a35ea85f 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -205,6 +205,8 @@ + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index c1f1c3db441..b43c1060d81 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -109,6 +109,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { private LogAggregationContext logAggregationContext; private volatile Priority appPriority = null; + private boolean isAttemptRecovering; protected ResourceUsage attemptResourceUsage = new ResourceUsage(); private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0); @@ -967,6 +968,14 @@ protected void getActivedAppDiagnosticMessage( // queue's resource usage for specific partition } + public boolean isAttemptRecovering() { + return isAttemptRecovering; + } + + protected void setAttemptRecovering(boolean isRecovering) { + this.isAttemptRecovering = isRecovering; + } + public static enum AMState { UNMANAGED("User launched the Application Master, since it's unmanaged. "), INACTIVATED("Application is added to the scheduler and is not yet activated. "), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index c7b73fb6c93..b3b971399af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -783,7 +783,7 @@ private synchronized void addApplicationAttempt( FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, application.getUser(), queue, queue.getActiveUsersManager(), rmContext, - application.getPriority()); + application.getPriority(), isAttemptRecovering); if (transferStateFromPreviousAttempt) { attempt.transferStateFromPreviousAttempt( application.getCurrentAppAttempt()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 5c3f4b9d090..ff7d04f6a57 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -95,6 +95,9 @@ public class LeafQueue extends AbstractCSQueue { private OrderingPolicy pendingOrderingPolicy = null; + // Always give preference to this while activating the application attempts. + private OrderingPolicy pendingOPForRecoveredApps = null; + private volatile float minimumAllocationFactor; private Map users = new HashMap(); @@ -156,6 +159,8 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) setOrderingPolicy(conf.getOrderingPolicy(getQueuePath())); setPendingAppsOrderingPolicy(conf . getOrderingPolicy(getQueuePath())); + setPendingAppsOrderingPolicyRecovery(conf + . getOrderingPolicy(getQueuePath())); userLimit = conf.getUserLimit(getQueuePath()); userLimitFactor = conf.getUserLimitFactor(getQueuePath()); @@ -320,7 +325,8 @@ public synchronized int getNumApplications() { } public synchronized int getNumPendingApplications() { - return pendingOrderingPolicy.getNumSchedulableEntities(); + return pendingOrderingPolicy.getNumSchedulableEntities() + + pendingOPForRecoveredApps.getNumSchedulableEntities(); } public synchronized int getNumActiveApplications() { @@ -599,9 +605,19 @@ private synchronized void activateApplications() { Map userAmPartitionLimit = new HashMap(); - for (Iterator i = getPendingAppsOrderingPolicy() - .getAssignmentIterator(); i.hasNext();) { - FiCaSchedulerApp application = i.next(); + activateApplications(getPendingAppsOrderingPolicyRecovery() + .getAssignmentIterator(), amPartitionLimit, userAmPartitionLimit); + + activateApplications( + getPendingAppsOrderingPolicy().getAssignmentIterator(), + amPartitionLimit, userAmPartitionLimit); + } + + private synchronized void activateApplications( + Iterator fsApp, Map amPartitionLimit, + Map userAmPartitionLimit) { + while (fsApp.hasNext()) { + FiCaSchedulerApp application = fsApp.next(); ApplicationId applicationId = application.getApplicationId(); // Get the am-node-partition associated with each application @@ -692,7 +708,7 @@ private synchronized void activateApplications() { metrics.incAMUsed(application.getUser(), application.getAMResource(partitionName)); metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit); - i.remove(); + fsApp.remove(); LOG.info("Application " + applicationId + " from user: " + application.getUser() + " activated in queue: " + getQueueName()); } @@ -702,7 +718,11 @@ private synchronized void addApplicationAttempt(FiCaSchedulerApp application, User user) { // Accept user.submitApplication(); - getPendingAppsOrderingPolicy().addSchedulableEntity(application); + if (application.isAttemptRecovering()) { + getPendingAppsOrderingPolicyRecovery().addSchedulableEntity(application); + } else { + getPendingAppsOrderingPolicy().addSchedulableEntity(application); + } applicationAttemptMap.put(application.getApplicationAttemptId(), application); // Activate applications @@ -742,7 +762,11 @@ public synchronized void removeApplicationAttempt( boolean wasActive = orderingPolicy.removeSchedulableEntity(application); if (!wasActive) { - pendingOrderingPolicy.removeSchedulableEntity(application); + if (application.isAttemptRecovering()) { + pendingOPForRecoveredApps.removeSchedulableEntity(application); + } else { + pendingOrderingPolicy.removeSchedulableEntity(application); + } } else { queueUsage.decAMUsed(partitionName, application.getAMResource(partitionName)); @@ -1491,7 +1515,11 @@ public void recoverContainer(Resource clusterResource, * Obtain (read-only) collection of pending applications. */ public Collection getPendingApplications() { - return pendingOrderingPolicy.getSchedulableEntities(); + Collection pendingApps = + new ArrayList(); + pendingApps.addAll(pendingOPForRecoveredApps.getSchedulableEntities()); + pendingApps.addAll(pendingOrderingPolicy.getSchedulableEntities()); + return pendingApps; } /** @@ -1535,6 +1563,10 @@ public synchronized Resource getTotalPendingResourcesConsideringUserLimit( @Override public synchronized void collectSchedulerApplications( Collection apps) { + for (FiCaSchedulerApp pendingApp : pendingOPForRecoveredApps + .getSchedulableEntities()) { + apps.add(pendingApp.getApplicationAttemptId()); + } for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy .getSchedulableEntities()) { apps.add(pendingApp.getApplicationAttemptId()); @@ -1670,6 +1702,21 @@ public synchronized void setPendingAppsOrderingPolicy( this.pendingOrderingPolicy = pendingOrderingPolicy; } + public synchronized OrderingPolicy + getPendingAppsOrderingPolicyRecovery() { + return pendingOPForRecoveredApps; + } + + public synchronized void setPendingAppsOrderingPolicyRecovery( + OrderingPolicy pendingOrderingPolicyRecovery) { + if (null != this.pendingOPForRecoveredApps) { + pendingOrderingPolicyRecovery + .addAllSchedulableEntities(this.pendingOPForRecoveredApps + .getSchedulableEntities()); + } + this.pendingOPForRecoveredApps = pendingOrderingPolicyRecovery; + } + /* * Holds shared values used by all applications in * the queue to calculate headroom on demand diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index c9c792e3528..4b88415ad8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -99,12 +99,12 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { this(applicationAttemptId, user, queue, activeUsersManager, rmContext, - Priority.newInstance(0)); + Priority.newInstance(0), false); } public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, - RMContext rmContext, Priority appPriority) { + RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); @@ -129,6 +129,7 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, setAppAMNodePartitionName(partition); setAMResource(partition, amResource); setPriority(appPriority); + setAttemptRecovering(isAttemptRecovering); scheduler = rmContext.getScheduler(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index 169e9f6a4a2..2ad805a2066 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -567,4 +569,166 @@ public void testApplicationPriorityAllocationWithChangeInPriority() Assert.assertEquals(6, schedulerAppAttemptApp1.getLiveContainers().size()); rm.stop(); } + + /** + *

+ * Test case verifies the order of applications activated after RM Restart. + *

+ *
  • App-1 and app-2 submitted and scheduled and running with a priority + * 5 and 6 Respectively
  • + *
  • App-3 submitted and scheduled with a priority 7. This + * is not activated since AMResourceLimit is reached
  • + *
  • RM restarted
  • + *
  • App-1 get activated nevertheless of AMResourceLimit
  • + *
  • App-2 and app-3 put in pendingOrderingPolicy
  • + *
  • After NM registration, app-3 is activated
  • + *

    + * Expected Output : App-2 must get activated since app-2 was running earlier + *

    + * @throws Exception + */ + @Test + public void testOrderOfActivatingThePriorityApplicationOnRMRestart() + throws Exception { + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); + final DrainDispatcher dispatcher = new DrainDispatcher(); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + MockRM rm1 = new MockRM(conf, memStore) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm1.start(); + + MockNM nm1 = + new MockNM("127.0.0.1:1234", 16384, rm1.getResourceTrackerService()); + nm1.registerNode(); + + dispatcher.await(); + + ResourceScheduler scheduler = rm1.getRMContext().getScheduler(); + LeafQueue defaultQueue = + (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default"); + int memory = defaultQueue.getAMResourceLimit().getMemory() / 2; + + // App-1 with priority 5 submitted and running + Priority appPriority1 = Priority.newInstance(5); + RMApp app1 = rm1.submitApp(memory, appPriority1); + MockAM am1 = MockRM.launchAM(app1, rm1, nm1); + am1.registerAppAttempt(); + + // App-2 with priority 6 submitted and running + Priority appPriority2 = Priority.newInstance(6); + RMApp app2 = rm1.submitApp(memory, appPriority2); + MockAM am2 = MockRM.launchAM(app2, rm1, nm1); + am2.registerAppAttempt(); + + dispatcher.await(); + Assert.assertEquals(2, defaultQueue.getNumActiveApplications()); + Assert.assertEquals(0, defaultQueue.getNumPendingApplications()); + + // App-3 with priority 7 submitted and scheduled. But not activated since + // AMResourceLimit threshold + Priority appPriority3 = Priority.newInstance(7); + RMApp app3 = rm1.submitApp(memory, appPriority3); + + dispatcher.await(); + Assert.assertEquals(2, defaultQueue.getNumActiveApplications()); + Assert.assertEquals(1, defaultQueue.getNumPendingApplications()); + + Iterator iterator = + defaultQueue.getOrderingPolicy().getSchedulableEntities().iterator(); + FiCaSchedulerApp fcApp2 = iterator.next(); + Assert.assertEquals(app2.getCurrentAppAttempt().getAppAttemptId(), + fcApp2.getApplicationAttemptId()); + + FiCaSchedulerApp fcApp1 = iterator.next(); + Assert.assertEquals(app1.getCurrentAppAttempt().getAppAttemptId(), + fcApp1.getApplicationAttemptId()); + + iterator = defaultQueue.getPendingApplications().iterator(); + FiCaSchedulerApp fcApp3 = iterator.next(); + Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(), + fcApp3.getApplicationAttemptId()); + + final DrainDispatcher dispatcher1 = new DrainDispatcher(); + // create new RM to represent restart and recover state + MockRM rm2 = new MockRM(conf, memStore) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher1; + } + }; + + // start new RM + rm2.start(); + // change NM to point to new RM + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + + // Verify RM Apps after this restart + Assert.assertEquals(3, rm2.getRMContext().getRMApps().size()); + + dispatcher1.await(); + scheduler = rm2.getRMContext().getScheduler(); + defaultQueue = + (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default"); + + // wait for all applications to get added to scheduler + int count = 5; + while (count-- > 0) { + if ((defaultQueue.getNumActiveApplications() + defaultQueue + .getNumPendingApplications()) == 3) { + break; + } + Thread.sleep(500); + } + + // Before NM registration, AMResourceLimit threshold is 0. So 1st + // applications get activated nevertheless of AMResourceLimit threshold + // Two applications are in pending + Assert.assertEquals(1, defaultQueue.getNumActiveApplications()); + Assert.assertEquals(2, defaultQueue.getNumPendingApplications()); + + // NM resync to new RM + nm1.registerNode(); + dispatcher1.await(); + + // wait for activating one applications + count = 5; + while (count-- > 0) { + if (defaultQueue.getOrderingPolicy().getSchedulableEntities().size() == 2) { + break; + } + Thread.sleep(500); + } + + // verify for order of activated applications iterator + iterator = + defaultQueue.getOrderingPolicy().getSchedulableEntities().iterator(); + fcApp2 = iterator.next(); + Assert.assertEquals(app2.getCurrentAppAttempt().getAppAttemptId(), + fcApp2.getApplicationAttemptId()); + + fcApp1 = iterator.next(); + Assert.assertEquals(app1.getCurrentAppAttempt().getAppAttemptId(), + fcApp1.getApplicationAttemptId()); + + // verify for pending application iterator. It should be app-3 attempt + iterator = defaultQueue.getPendingApplications().iterator(); + fcApp3 = iterator.next(); + Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(), + fcApp3.getApplicationAttemptId()); + + rm2.stop(); + rm1.stop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 479e25a880e..d4b8dcd3a9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -2413,14 +2413,16 @@ public void testFifoAssignment() throws Exception { TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3))); + mock(ActiveUsersManager.class), spyRMContext, + Priority.newInstance(3), false)); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5))); + mock(ActiveUsersManager.class), spyRMContext, + Priority.newInstance(5), false)); a.submitApplicationAttempt(app_1, user_0); Priority priority = TestUtils.createMockPriority(1);