diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index 9b136272885..15c2a896e6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -417,7 +418,8 @@ public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { return nodeMonitor.getThresholdCalculator(); } - private synchronized List getLeastLoadedNodes() { + @VisibleForTesting + synchronized List getLeastLoadedNodes() { long currTime = System.currentTimeMillis(); if ((currTime - lastCacheUpdateTime > cacheRefreshInterval) || (cachedNodes == null)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 0b7e87cc0b1..f14d440bbbf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -757,9 +757,11 @@ protected void serviceInit(Configuration configuration) throws Exception { } masterService = createApplicationMasterService(); + createAndRegisterOpportunisticDispatcher(masterService); addService(masterService) ; rmContext.setApplicationMasterService(masterService); + applicationACLsManager = new ApplicationACLsManager(conf); queueACLsManager = createQueueACLsManager(scheduler, conf); @@ -807,6 +809,23 @@ protected void serviceInit(Configuration configuration) throws Exception { super.serviceInit(conf); } + private void createAndRegisterOpportunisticDispatcher( + ApplicationMasterService service) { + if (!isOpportunisticSchedulingEnabled(conf)) { + return; + } + EventDispatcher oppContainerAllocEventDispatcher = new EventDispatcher( + (OpportunisticContainerAllocatorAMService) service, + OpportunisticContainerAllocatorAMService.class.getName()); + // Add an event dispatcher for the + // OpportunisticContainerAllocatorAMService to handle node + // additions, updates and removals. Since the SchedulerEvent is currently + // a super set of theses, we register interest for it. + addService(oppContainerAllocEventDispatcher); + rmDispatcher + .register(SchedulerEventType.class, oppContainerAllocEventDispatcher); + } + @Override protected void serviceStart() throws Exception { RMStateStore rmStore = rmContext.getStateStore(); @@ -1335,8 +1354,7 @@ protected ClientRMService createClientRMService() { protected ApplicationMasterService createApplicationMasterService() { Configuration config = this.rmContext.getYarnConfiguration(); - if (YarnConfiguration.isOpportunisticContainerAllocationEnabled(config) - || YarnConfiguration.isDistSchedulingEnabled(config)) { + if (isOpportunisticSchedulingEnabled(conf)) { if (YarnConfiguration.isDistSchedulingEnabled(config) && !YarnConfiguration .isOpportunisticContainerAllocationEnabled(config)) { @@ -1348,16 +1366,6 @@ protected ApplicationMasterService createApplicationMasterService() { oppContainerAllocatingAMService = new OpportunisticContainerAllocatorAMService(this.rmContext, scheduler); - EventDispatcher oppContainerAllocEventDispatcher = - new EventDispatcher(oppContainerAllocatingAMService, - OpportunisticContainerAllocatorAMService.class.getName()); - // Add an event dispatcher for the - // OpportunisticContainerAllocatorAMService to handle node - // additions, updates and removals. Since the SchedulerEvent is currently - // a super set of theses, we register interest for it. - addService(oppContainerAllocEventDispatcher); - rmDispatcher.register(SchedulerEventType.class, - oppContainerAllocEventDispatcher); this.rmContext.setContainerQueueLimitCalculator( oppContainerAllocatingAMService.getNodeManagerQueueLimitCalculator()); return oppContainerAllocatingAMService; @@ -1373,6 +1381,11 @@ protected RMSecretManagerService createRMSecretManagerService() { return new RMSecretManagerService(conf, rmContext); } + private boolean isOpportunisticSchedulingEnabled(Configuration conf) { + return YarnConfiguration.isOpportunisticContainerAllocationEnabled(conf) + || YarnConfiguration.isDistSchedulingEnabled(conf); + } + /** * Create RMDelegatedNodeLabelsUpdater based on configuration. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index 385e8dbedd3..c17dee80d37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -18,6 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import com.google.common.base.Supplier; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -658,6 +662,46 @@ protected Dispatcher createDispatcher() { assertEquals(HAServiceState.STANDBY, rm.getRMContext().getHAServiceState()); } + @Test + public void testOpportunisticAllocatorAfterFailover() throws Exception { + configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + Configuration conf = new YarnConfiguration(configuration); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setBoolean( + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); + // 1. start RM + rm = new MockRM(conf); + rm.init(conf); + rm.start(); + + StateChangeRequestInfo requestInfo = new StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + // 2. Transition to active + rm.adminService.transitionToActive(requestInfo); + // 3. Transition to standby + rm.adminService.transitionToStandby(requestInfo); + // 4. Transition to active + rm.adminService.transitionToActive(requestInfo); + + MockNM nm1 = rm.registerNode("h1:1234", 8 * 1024); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + rmNode1.getRMContext().getDispatcher().getEventHandler() + .handle(new NodeUpdateSchedulerEvent(rmNode1)); + OpportunisticContainerAllocatorAMService appMaster = + (OpportunisticContainerAllocatorAMService) rm.getRMContext() + .getApplicationMasterService(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return appMaster.getLeastLoadedNodes().size() == 1; + } + }, 100, 3000); + rm.stop(); + Assert.assertEquals(1, appMaster.getLeastLoadedNodes().size()); + + } + @Test public void testResourceProfilesManagerAfterRMWentStandbyThenBackToActive() throws Exception {