YARN-8606. Opportunistic scheduling does not work post RM failover. Contributed by Bibin A Chundatt.
(cherry picked from commit a48a0cc7fd
)
This commit is contained in:
parent
0f66a0d825
commit
e4f530a9af
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -406,7 +407,8 @@ public class OpportunisticContainerAllocatorAMService
|
|||
return nodeMonitor.getThresholdCalculator();
|
||||
}
|
||||
|
||||
private synchronized List<RemoteNode> getLeastLoadedNodes() {
|
||||
@VisibleForTesting
|
||||
synchronized List<RemoteNode> getLeastLoadedNodes() {
|
||||
long currTime = System.currentTimeMillis();
|
||||
if ((currTime - lastCacheUpdateTime > cacheRefreshInterval)
|
||||
|| (cachedNodes == null)) {
|
||||
|
|
|
@ -757,9 +757,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
}
|
||||
|
||||
masterService = createApplicationMasterService();
|
||||
createAndRegisterOpportunisticDispatcher(masterService);
|
||||
addService(masterService) ;
|
||||
rmContext.setApplicationMasterService(masterService);
|
||||
|
||||
|
||||
applicationACLsManager = new ApplicationACLsManager(conf);
|
||||
|
||||
queueACLsManager = createQueueACLsManager(scheduler, conf);
|
||||
|
@ -807,6 +809,23 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
private void createAndRegisterOpportunisticDispatcher(
|
||||
ApplicationMasterService service) {
|
||||
if (!isOpportunisticSchedulingEnabled(conf)) {
|
||||
return;
|
||||
}
|
||||
EventDispatcher oppContainerAllocEventDispatcher = new EventDispatcher(
|
||||
(OpportunisticContainerAllocatorAMService) service,
|
||||
OpportunisticContainerAllocatorAMService.class.getName());
|
||||
// Add an event dispatcher for the
|
||||
// OpportunisticContainerAllocatorAMService to handle node
|
||||
// additions, updates and removals. Since the SchedulerEvent is currently
|
||||
// a super set of theses, we register interest for it.
|
||||
addService(oppContainerAllocEventDispatcher);
|
||||
rmDispatcher
|
||||
.register(SchedulerEventType.class, oppContainerAllocEventDispatcher);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
RMStateStore rmStore = rmContext.getStateStore();
|
||||
|
@ -1335,8 +1354,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
|
||||
protected ApplicationMasterService createApplicationMasterService() {
|
||||
Configuration config = this.rmContext.getYarnConfiguration();
|
||||
if (YarnConfiguration.isOpportunisticContainerAllocationEnabled(config)
|
||||
|| YarnConfiguration.isDistSchedulingEnabled(config)) {
|
||||
if (isOpportunisticSchedulingEnabled(conf)) {
|
||||
if (YarnConfiguration.isDistSchedulingEnabled(config) &&
|
||||
!YarnConfiguration
|
||||
.isOpportunisticContainerAllocationEnabled(config)) {
|
||||
|
@ -1348,16 +1366,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
oppContainerAllocatingAMService =
|
||||
new OpportunisticContainerAllocatorAMService(this.rmContext,
|
||||
scheduler);
|
||||
EventDispatcher oppContainerAllocEventDispatcher =
|
||||
new EventDispatcher(oppContainerAllocatingAMService,
|
||||
OpportunisticContainerAllocatorAMService.class.getName());
|
||||
// Add an event dispatcher for the
|
||||
// OpportunisticContainerAllocatorAMService to handle node
|
||||
// additions, updates and removals. Since the SchedulerEvent is currently
|
||||
// a super set of theses, we register interest for it.
|
||||
addService(oppContainerAllocEventDispatcher);
|
||||
rmDispatcher.register(SchedulerEventType.class,
|
||||
oppContainerAllocEventDispatcher);
|
||||
this.rmContext.setContainerQueueLimitCalculator(
|
||||
oppContainerAllocatingAMService.getNodeManagerQueueLimitCalculator());
|
||||
return oppContainerAllocatingAMService;
|
||||
|
@ -1373,6 +1381,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
return new RMSecretManagerService(conf, rmContext);
|
||||
}
|
||||
|
||||
private boolean isOpportunisticSchedulingEnabled(Configuration conf) {
|
||||
return YarnConfiguration.isOpportunisticContainerAllocationEnabled(conf)
|
||||
|| YarnConfiguration.isDistSchedulingEnabled(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create RMDelegatedNodeLabelsUpdater based on configuration.
|
||||
*/
|
||||
|
|
|
@ -18,6 +18,10 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -658,6 +662,46 @@ public class TestRMHA {
|
|||
assertEquals(HAServiceState.STANDBY, rm.getRMContext().getHAServiceState());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOpportunisticAllocatorAfterFailover() throws Exception {
|
||||
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||
configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||
Configuration conf = new YarnConfiguration(configuration);
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
conf.setBoolean(
|
||||
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
|
||||
// 1. start RM
|
||||
rm = new MockRM(conf);
|
||||
rm.init(conf);
|
||||
rm.start();
|
||||
|
||||
StateChangeRequestInfo requestInfo = new StateChangeRequestInfo(
|
||||
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||
// 2. Transition to active
|
||||
rm.adminService.transitionToActive(requestInfo);
|
||||
// 3. Transition to standby
|
||||
rm.adminService.transitionToStandby(requestInfo);
|
||||
// 4. Transition to active
|
||||
rm.adminService.transitionToActive(requestInfo);
|
||||
|
||||
MockNM nm1 = rm.registerNode("h1:1234", 8 * 1024);
|
||||
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||
rmNode1.getRMContext().getDispatcher().getEventHandler()
|
||||
.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||
OpportunisticContainerAllocatorAMService appMaster =
|
||||
(OpportunisticContainerAllocatorAMService) rm.getRMContext()
|
||||
.getApplicationMasterService();
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return appMaster.getLeastLoadedNodes().size() == 1;
|
||||
}
|
||||
}, 100, 3000);
|
||||
rm.stop();
|
||||
Assert.assertEquals(1, appMaster.getLeastLoadedNodes().size());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResourceProfilesManagerAfterRMWentStandbyThenBackToActive()
|
||||
throws Exception {
|
||||
|
|
Loading…
Reference in New Issue