From a657472b42c58f87fd3165e0a746d83b72182a24 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Mon, 24 Jul 2017 20:57:25 +0530 Subject: [PATCH] YARN-6102. RMActiveService context to be updated with new RMContext on failover. Contributed by Rohith Sharma K S. --- ...tiveStandbyElectorBasedElectorService.java | 12 +- .../server/resourcemanager/AdminService.java | 71 +++-- .../CuratorBasedElectorService.java | 10 +- .../RMActiveServiceContext.java | 17 +- .../server/resourcemanager/RMContextImpl.java | 292 ++++++++++-------- .../resourcemanager/RMServiceContext.java | 151 +++++++++ .../resourcemanager/ResourceManager.java | 28 +- .../yarn/server/resourcemanager/MockRM.java | 2 +- .../TestRMEmbeddedElector.java | 8 +- .../yarn/server/resourcemanager/TestRMHA.java | 16 +- 10 files changed, 407 insertions(+), 200 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java index b59bc25f5a9..a8dcda4f797 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java @@ -57,7 +57,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC); - private RMContext rmContext; + private ResourceManager rm; private byte[] localActiveNodeInfo; private ActiveStandbyElector elector; @@ -66,9 +66,9 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService @VisibleForTesting final Object zkDisconnectLock = new Object(); - ActiveStandbyElectorBasedElectorService(RMContext rmContext) { + ActiveStandbyElectorBasedElectorService(ResourceManager rm) { super(ActiveStandbyElectorBasedElectorService.class.getName()); - this.rmContext = rmContext; + this.rm = rm; } @Override @@ -140,7 +140,7 @@ public void becomeActive() throws ServiceFailedException { cancelDisconnectTimer(); try { - rmContext.getRMAdminService().transitionToActive(req); + rm.getRMContext().getRMAdminService().transitionToActive(req); } catch (Exception e) { throw new ServiceFailedException("RM could not transition to Active", e); } @@ -151,7 +151,7 @@ public void becomeStandby() { cancelDisconnectTimer(); try { - rmContext.getRMAdminService().transitionToStandby(req); + rm.getRMContext().getRMAdminService().transitionToStandby(req); } catch (Exception e) { LOG.error("RM could not transition to Standby", e); } @@ -205,7 +205,7 @@ public void run() { @SuppressWarnings(value = "unchecked") @Override public void notifyFatalError(String errorMessage) { - rmContext.getDispatcher().getEventHandler().handle( + rm.getRMContext().getDispatcher().getEventHandler().handle( new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, errorMessage)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 75717652d97..3457ae329bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -102,7 +102,6 @@ public class AdminService extends CompositeService implements private static final Log LOG = LogFactory.getLog(AdminService.class); - private final RMContext rmContext; private final ResourceManager rm; private String rmId; @@ -123,16 +122,16 @@ public class AdminService extends CompositeService implements @VisibleForTesting boolean isCentralizedNodeLabelConfiguration = true; - public AdminService(ResourceManager rm, RMContext rmContext) { + public AdminService(ResourceManager rm) { super(AdminService.class.getName()); this.rm = rm; - this.rmContext = rmContext; } @Override public void serviceInit(Configuration conf) throws Exception { autoFailoverEnabled = - rmContext.isHAEnabled() && HAUtil.isAutomaticFailoverEnabled(conf); + rm.getRMContext().isHAEnabled() + && HAUtil.isAutomaticFailoverEnabled(conf); masterServiceBindAddress = conf.getSocketAddr( YarnConfiguration.RM_BIND_HOST, @@ -189,7 +188,7 @@ protected void startServer() throws Exception { RMPolicyProvider.getInstance()); } - if (rmContext.isHAEnabled()) { + if (rm.getRMContext().isHAEnabled()) { RPC.setProtocolEngine(conf, HAServiceProtocolPB.class, ProtobufRpcEngine.class); @@ -265,7 +264,7 @@ private void checkHaStateChange(StateChangeRequestInfo req) } private synchronized boolean isRMActive() { - return HAServiceState.ACTIVE == rmContext.getHAServiceState(); + return HAServiceState.ACTIVE == rm.getRMContext().getHAServiceState(); } private void throwStandbyException() throws StandbyException { @@ -304,7 +303,7 @@ public synchronized void transitionToActive( // call all refresh*s for active RM to get the updated configurations. refreshAll(); } catch (Exception e) { - rmContext + rm.getRMContext() .getDispatcher() .getEventHandler() .handle( @@ -363,7 +362,7 @@ public synchronized void transitionToStandby( @Override public synchronized HAServiceStatus getServiceStatus() throws IOException { checkAccess("getServiceState"); - HAServiceState haState = rmContext.getHAServiceState(); + HAServiceState haState = rm.getRMContext().getHAServiceState(); HAServiceStatus ret = new HAServiceStatus(haState); if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) { ret.setReadyToBecomeActive(); @@ -395,11 +394,12 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) } private void refreshQueues() throws IOException, YarnException { - rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); + rm.getRMContext().getScheduler().reinitialize(getConfig(), + this.rm.getRMContext()); // refresh the reservation system - ReservationSystem rSystem = rmContext.getReservationSystem(); + ReservationSystem rSystem = rm.getRMContext().getReservationSystem(); if (rSystem != null) { - rSystem.reinitialize(getConfig(), rmContext); + rSystem.reinitialize(getConfig(), rm.getRMContext()); } } @@ -418,14 +418,14 @@ public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); switch (request.getDecommissionType()) { case NORMAL: - rmContext.getNodesListManager().refreshNodes(conf); + rm.getRMContext().getNodesListManager().refreshNodes(conf); break; case GRACEFUL: - rmContext.getNodesListManager().refreshNodesGracefully( + rm.getRMContext().getNodesListManager().refreshNodesGracefully( conf, request.getDecommissionTimeout()); break; case FORCEFUL: - rmContext.getNodesListManager().refreshNodesForcefully(); + rm.getRMContext().getNodesListManager().refreshNodesForcefully(); break; } RMAuditLogger.logSuccess(user.getShortUserName(), operation, @@ -440,7 +440,7 @@ private void refreshNodes() throws IOException, YarnException { Configuration conf = getConfiguration(new Configuration(false), YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - rmContext.getNodesListManager().refreshNodes(conf); + rm.getRMContext().getNodesListManager().refreshNodes(conf); } @Override @@ -559,10 +559,11 @@ private void refreshActiveServicesAcls() throws IOException, YarnException { Configuration conf = getConfiguration(new Configuration(false), YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE); - rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider); - rmContext.getApplicationMasterService().refreshServiceAcls( + rm.getRMContext().getClientRMService().refreshServiceAcls(conf, + policyProvider); + rm.getRMContext().getApplicationMasterService().refreshServiceAcls( conf, policyProvider); - rmContext.getResourceTrackerService().refreshServiceAcls( + rm.getRMContext().getResourceTrackerService().refreshServiceAcls( conf, policyProvider); } @@ -601,7 +602,7 @@ public UpdateNodeResourceResponse updateNodeResource( // if any invalid nodes, throw exception instead of partially updating // valid nodes. for (NodeId nodeId : nodeIds) { - RMNode node = this.rmContext.getRMNodes().get(nodeId); + RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId); if (node == null) { LOG.error("Resource update get failed on all nodes due to change " + "resource on an unrecognized node: " + nodeId); @@ -619,14 +620,14 @@ public UpdateNodeResourceResponse updateNodeResource( for (Map.Entry entry : nodeResourceMap.entrySet()) { ResourceOption newResourceOption = entry.getValue(); NodeId nodeId = entry.getKey(); - RMNode node = this.rmContext.getRMNodes().get(nodeId); + RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId); if (node == null) { LOG.warn("Resource update get failed on an unrecognized node: " + nodeId); allSuccess = false; } else { // update resource to RMNode - this.rmContext.getDispatcher().getEventHandler() + this.rm.getRMContext().getDispatcher().getEventHandler() .handle(new RMNodeResourceUpdateEvent(nodeId, newResourceOption)); LOG.info("Update resource on node(" + node.getNodeID() + ") with resource(" + newResourceOption.toString() + ")"); @@ -661,7 +662,8 @@ public RefreshNodesResourcesResponse refreshNodesResources( DynamicResourceConfiguration newConf; InputStream drInputStream = - this.rmContext.getConfigurationProvider().getConfigurationInputStream( + this.rm.getRMContext().getConfigurationProvider() + .getConfigurationInputStream( configuration, YarnConfiguration.DR_CONFIGURATION_FILE); if (drInputStream != null) { @@ -679,7 +681,7 @@ public RefreshNodesResourcesResponse refreshNodesResources( updateNodeResource(updateRequest); } // refresh dynamic resource in ResourceTrackerService - this.rmContext.getResourceTrackerService(). + this.rm.getRMContext().getResourceTrackerService(). updateDynamicResourceConfiguration(newConf); RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -692,7 +694,8 @@ public RefreshNodesResourcesResponse refreshNodesResources( private synchronized Configuration getConfiguration(Configuration conf, String... confFileNames) throws YarnException, IOException { for (String confFileName : confFileNames) { - InputStream confFileInputStream = this.rmContext.getConfigurationProvider() + InputStream confFileInputStream = + this.rm.getRMContext().getConfigurationProvider() .getConfigurationInputStream(conf, confFileName); if (confFileInputStream != null) { conf.addResource(confFileInputStream); @@ -746,7 +749,7 @@ public AddToClusterNodeLabelsResponse addToClusterNodeLabels(AddToClusterNodeLab AddToClusterNodeLabelsResponse response = recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class); try { - rmContext.getNodeLabelManager() + rm.getRMContext().getNodeLabelManager() .addToCluserNodeLabels(request.getNodeLabels()); RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -769,7 +772,8 @@ public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( RemoveFromClusterNodeLabelsResponse response = recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class); try { - rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels()); + rm.getRMContext().getNodeLabelManager() + .removeFromClusterNodeLabels(request.getNodeLabels()); RMAuditLogger .logSuccess(user.getShortUserName(), operation, "AdminService"); return response; @@ -805,19 +809,20 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( boolean isKnown = false; // both active and inactive nodes are recognized as known nodes if (requestedNode.getPort() != 0) { - if (rmContext.getRMNodes().containsKey(requestedNode) - || rmContext.getInactiveRMNodes().containsKey(requestedNode)) { + if (rm.getRMContext().getRMNodes().containsKey(requestedNode) || rm + .getRMContext().getInactiveRMNodes().containsKey(requestedNode)) { isKnown = true; } } else { - for (NodeId knownNode : rmContext.getRMNodes().keySet()) { + for (NodeId knownNode : rm.getRMContext().getRMNodes().keySet()) { if (knownNode.getHost().equals(requestedNode.getHost())) { isKnown = true; break; } } if (!isKnown) { - for (NodeId knownNode : rmContext.getInactiveRMNodes().keySet()) { + for (NodeId knownNode : rm.getRMContext().getInactiveRMNodes() + .keySet()) { if (knownNode.getHost().equals(requestedNode.getHost())) { isKnown = true; break; @@ -841,7 +846,7 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( } } try { - rmContext.getNodeLabelManager().replaceLabelsOnNode( + rm.getRMContext().getNodeLabelManager().replaceLabelsOnNode( request.getNodeToLabels()); RMAuditLogger .logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -878,7 +883,7 @@ public CheckForDecommissioningNodesResponse checkForDecommissioningNodes( checkRMStatus(user.getShortUserName(), operation, msg); - Set decommissioningNodes = rmContext.getNodesListManager() + Set decommissioningNodes = rm.getRMContext().getNodesListManager() .checkForDecommissioningNodes(); RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -914,6 +919,6 @@ private void refreshClusterMaxPriority() throws IOException, YarnException { getConfiguration(new Configuration(false), YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - rmContext.getScheduler().setClusterMaxPriority(conf); + rm.getRMContext().getScheduler().setClusterMaxPriority(conf); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java index bcdf48bb2ff..d7485f531b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java @@ -45,14 +45,12 @@ public class CuratorBasedElectorService extends AbstractService LogFactory.getLog(CuratorBasedElectorService.class); private LeaderLatch leaderLatch; private CuratorFramework curator; - private RMContext rmContext; private String latchPath; private String rmId; private ResourceManager rm; - public CuratorBasedElectorService(RMContext rmContext, ResourceManager rm) { + public CuratorBasedElectorService(ResourceManager rm) { super(CuratorBasedElectorService.class.getName()); - this.rmContext = rmContext; this.rm = rm; } @@ -102,7 +100,8 @@ public String getZookeeperConnectionState() { public void isLeader() { LOG.info(rmId + "is elected leader, transitioning to active"); try { - rmContext.getRMAdminService().transitionToActive( + rm.getRMContext().getRMAdminService() + .transitionToActive( new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC)); } catch (Exception e) { @@ -123,7 +122,8 @@ private void closeLeaderLatch() throws IOException { public void notLeader() { LOG.info(rmId + " relinquish leadership"); try { - rmContext.getRMAdminService().transitionToStandby( + rm.getRMContext().getRMAdminService() + .transitionToStandby( new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC)); } catch (Exception e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index ca848e7c58b..d5eaca6059c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -30,8 +30,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; -import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; @@ -44,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -106,6 +105,7 @@ public class RMActiveServiceContext { private PlacementManager queuePlacementManager = null; private RMAppLifetimeMonitor rmAppLifetimeMonitor; + private QueueLimitCalculator queueLimitCalculator; public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); @@ -469,4 +469,17 @@ public void setRMAppLifetimeMonitor( public RMAppLifetimeMonitor getRMAppLifetimeMonitor() { return this.rmAppLifetimeMonitor; } + + @Private + @Unstable + public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { + return this.queueLimitCalculator; + } + + @Private + @Unstable + public void setContainerQueueLimitCalculator( + QueueLimitCalculator limitCalculator) { + this.queueLimitCalculator = limitCalculator; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 130c407c15b..527ad2321a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -24,7 +24,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -56,37 +55,39 @@ import com.google.common.annotations.VisibleForTesting; +/** + * RMContextImpl class holds two services context. + *
    + *
  • serviceContext : These services called as Always On services. + * Services that need to run always irrespective of the HA state of the RM.
  • + *
  • activeServiceCotext : Active services context. Services that need to run + * only on the Active RM.
  • + *
+ *

+ * Note: If any new service to be added to context, add it to a right + * context as per above description. + */ public class RMContextImpl implements RMContext { - private Dispatcher rmDispatcher; - - private boolean isHAEnabled; - - private HAServiceState haServiceState = - HAServiceProtocol.HAServiceState.INITIALIZING; - - private AdminService adminService; - - private ConfigurationProvider configurationProvider; + /** + * RM service contexts which runs through out RM life span. These are created + * once during start of RM. + */ + private RMServiceContext serviceContext; + /** + * RM Active service context. This will be recreated for every transition from + * ACTIVE->STANDBY. + */ private RMActiveServiceContext activeServiceContext; - private Configuration yarnConfiguration; - - private RMApplicationHistoryWriter rmApplicationHistoryWriter; - private SystemMetricsPublisher systemMetricsPublisher; - private EmbeddedElector elector; - - private QueueLimitCalculator queueLimitCalculator; - - private final Object haServiceStateLock = new Object(); - - private ResourceManager resourceManager; /** * Default constructor. To be used in conjunction with setter methods for * individual fields. */ public RMContextImpl() { + this.serviceContext = new RMServiceContext(); + this.activeServiceContext = new RMActiveServiceContext(); } @VisibleForTesting @@ -137,19 +138,143 @@ public RMContextImpl(Dispatcher rmDispatcher, clientToAMTokenSecretManager, null); } - @Override - public Dispatcher getDispatcher() { - return this.rmDispatcher; + /** + * RM service contexts which runs through out JVM life span. These are created + * once during start of RM. + * @return serviceContext of RM + */ + @Private + @Unstable + public RMServiceContext getServiceContext() { + return serviceContext; + } + + /** + * Note: setting service context clears all services embedded with it. + * @param context rm service context + */ + @Private + @Unstable + public void setServiceContext(RMServiceContext context) { + this.serviceContext = context; } @Override - public void setLeaderElectorService(EmbeddedElector elector) { - this.elector = elector; + public ResourceManager getResourceManager() { + return serviceContext.getResourceManager(); + } + + public void setResourceManager(ResourceManager rm) { + serviceContext.setResourceManager(rm); } @Override public EmbeddedElector getLeaderElectorService() { - return this.elector; + return serviceContext.getLeaderElectorService(); + } + + @Override + public void setLeaderElectorService(EmbeddedElector elector) { + serviceContext.setLeaderElectorService(elector); + } + + @Override + public Dispatcher getDispatcher() { + return serviceContext.getDispatcher(); + } + + void setDispatcher(Dispatcher dispatcher) { + serviceContext.setDispatcher(dispatcher); + } + + @Override + public AdminService getRMAdminService() { + return serviceContext.getRMAdminService(); + } + + void setRMAdminService(AdminService adminService) { + serviceContext.setRMAdminService(adminService); + } + + @Override + public boolean isHAEnabled() { + return serviceContext.isHAEnabled(); + } + + void setHAEnabled(boolean isHAEnabled) { + serviceContext.setHAEnabled(isHAEnabled); + } + + @Override + public HAServiceState getHAServiceState() { + return serviceContext.getHAServiceState(); + } + + void setHAServiceState(HAServiceState serviceState) { + serviceContext.setHAServiceState(serviceState); + } + + @Override + public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { + return serviceContext.getRMApplicationHistoryWriter(); + } + + @Override + public void setRMApplicationHistoryWriter( + RMApplicationHistoryWriter rmApplicationHistoryWriter) { + serviceContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); + } + + @Override + public SystemMetricsPublisher getSystemMetricsPublisher() { + return serviceContext.getSystemMetricsPublisher(); + } + + @Override + public void setSystemMetricsPublisher( + SystemMetricsPublisher metricsPublisher) { + serviceContext.setSystemMetricsPublisher(metricsPublisher); + } + + @Override + public ConfigurationProvider getConfigurationProvider() { + return serviceContext.getConfigurationProvider(); + } + + public void setConfigurationProvider( + ConfigurationProvider configurationProvider) { + serviceContext.setConfigurationProvider(configurationProvider); + } + + @Override + public Configuration getYarnConfiguration() { + return serviceContext.getYarnConfiguration(); + } + + public void setYarnConfiguration(Configuration yarnConfiguration) { + serviceContext.setYarnConfiguration(yarnConfiguration); + } + + public String getHAZookeeperConnectionState() { + return serviceContext.getHAZookeeperConnectionState(); + } + + // ========================================================================== + /** + * RM Active service context. This will be recreated for every transition from + * ACTIVE to STANDBY. + * @return activeServiceContext of active services + */ + @Private + @Unstable + public RMActiveServiceContext getActiveServiceContext() { + return activeServiceContext; + } + + @Private + @Unstable + void setActiveServiceContext(RMActiveServiceContext activeServiceContext) { + this.activeServiceContext = activeServiceContext; } @Override @@ -227,11 +352,6 @@ public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() { return activeServiceContext.getClientToAMTokenSecretManager(); } - @Override - public AdminService getRMAdminService() { - return this.adminService; - } - @VisibleForTesting public void setStateStore(RMStateStore store) { activeServiceContext.setStateStore(store); @@ -252,24 +372,6 @@ public ResourceTrackerService getResourceTrackerService() { return activeServiceContext.getResourceTrackerService(); } - void setHAEnabled(boolean isHAEnabled) { - this.isHAEnabled = isHAEnabled; - } - - void setHAServiceState(HAServiceState serviceState) { - synchronized (haServiceStateLock) { - this.haServiceState = serviceState; - } - } - - void setDispatcher(Dispatcher dispatcher) { - this.rmDispatcher = dispatcher; - } - - void setRMAdminService(AdminService adminService) { - this.adminService = adminService; - } - @Override public void setClientRMService(ClientRMService clientRMService) { activeServiceContext.setClientRMService(clientRMService); @@ -347,18 +449,6 @@ void setResourceTrackerService(ResourceTrackerService resourceTrackerService) { activeServiceContext.setResourceTrackerService(resourceTrackerService); } - @Override - public boolean isHAEnabled() { - return isHAEnabled; - } - - @Override - public HAServiceState getHAServiceState() { - synchronized (haServiceStateLock) { - return haServiceState; - } - } - public void setWorkPreservingRecoveryEnabled(boolean enabled) { activeServiceContext.setWorkPreservingRecoveryEnabled(enabled); } @@ -368,39 +458,6 @@ public boolean isWorkPreservingRecoveryEnabled() { return activeServiceContext.isWorkPreservingRecoveryEnabled(); } - @Override - public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { - return this.rmApplicationHistoryWriter; - } - - @Override - public void setSystemMetricsPublisher( - SystemMetricsPublisher systemMetricsPublisher) { - this.systemMetricsPublisher = systemMetricsPublisher; - } - - @Override - public SystemMetricsPublisher getSystemMetricsPublisher() { - return this.systemMetricsPublisher; - } - - @Override - public void setRMApplicationHistoryWriter( - RMApplicationHistoryWriter rmApplicationHistoryWriter) { - this.rmApplicationHistoryWriter = rmApplicationHistoryWriter; - - } - - @Override - public ConfigurationProvider getConfigurationProvider() { - return this.configurationProvider; - } - - public void setConfigurationProvider( - ConfigurationProvider configurationProvider) { - this.configurationProvider = configurationProvider; - } - @Override public long getEpoch() { return activeServiceContext.getEpoch(); @@ -450,27 +507,6 @@ public ConcurrentMap getSystemCredentialsForApps() { return activeServiceContext.getSystemCredentialsForApps(); } - @Private - @Unstable - public RMActiveServiceContext getActiveServiceContext() { - return activeServiceContext; - } - - @Private - @Unstable - void setActiveServiceContext(RMActiveServiceContext activeServiceContext) { - this.activeServiceContext = activeServiceContext; - } - - @Override - public Configuration getYarnConfiguration() { - return this.yarnConfiguration; - } - - public void setYarnConfiguration(Configuration yarnConfiguration) { - this.yarnConfiguration=yarnConfiguration; - } - @Override public PlacementManager getQueuePlacementManager() { return this.activeServiceContext.getQueuePlacementManager(); @@ -483,12 +519,12 @@ public void setQueuePlacementManager(PlacementManager placementMgr) { @Override public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { - return this.queueLimitCalculator; + return activeServiceContext.getNodeManagerQueueLimitCalculator(); } public void setContainerQueueLimitCalculator( QueueLimitCalculator limitCalculator) { - this.queueLimitCalculator = limitCalculator; + activeServiceContext.setContainerQueueLimitCalculator(limitCalculator); } @Override @@ -502,21 +538,5 @@ public RMAppLifetimeMonitor getRMAppLifetimeMonitor() { return this.activeServiceContext.getRMAppLifetimeMonitor(); } - public String getHAZookeeperConnectionState() { - if (elector == null) { - return "Could not find leader elector. Verify both HA and automatic " + - "failover are enabled."; - } else { - return elector.getZookeeperConnectionState(); - } - } - - @Override - public ResourceManager getResourceManager() { - return resourceManager; - } - - public void setResourceManager(ResourceManager rm) { - this.resourceManager = rm; - } + // Note: Read java doc before adding any services over here. } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java new file mode 100644 index 00000000000..fe34d630b7b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.yarn.conf.ConfigurationProvider; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; + +/** + * RMServiceContext class maintains "Always On" services. Services that need to + * run always irrespective of the HA state of the RM. This is created during + * initialization of RMContextImpl. + *

+ * Note: If any services to be added in this class, make sure service + * will be running always irrespective of the HA state of the RM + */ +@Private +@Unstable +public class RMServiceContext { + + private Dispatcher rmDispatcher; + private boolean isHAEnabled; + private HAServiceState haServiceState = + HAServiceProtocol.HAServiceState.INITIALIZING; + private AdminService adminService; + private ConfigurationProvider configurationProvider; + private Configuration yarnConfiguration; + private RMApplicationHistoryWriter rmApplicationHistoryWriter; + private SystemMetricsPublisher systemMetricsPublisher; + private EmbeddedElector elector; + private final Object haServiceStateLock = new Object(); + private ResourceManager resourceManager; + + public ResourceManager getResourceManager() { + return resourceManager; + } + + public void setResourceManager(ResourceManager rm) { + this.resourceManager = rm; + } + + public ConfigurationProvider getConfigurationProvider() { + return this.configurationProvider; + } + + public void setConfigurationProvider( + ConfigurationProvider configurationProvider) { + this.configurationProvider = configurationProvider; + } + + public Dispatcher getDispatcher() { + return this.rmDispatcher; + } + + void setDispatcher(Dispatcher dispatcher) { + this.rmDispatcher = dispatcher; + } + + public EmbeddedElector getLeaderElectorService() { + return this.elector; + } + + public void setLeaderElectorService(EmbeddedElector embeddedElector) { + this.elector = embeddedElector; + } + + public AdminService getRMAdminService() { + return this.adminService; + } + + void setRMAdminService(AdminService service) { + this.adminService = service; + } + + void setHAEnabled(boolean rmHAEnabled) { + this.isHAEnabled = rmHAEnabled; + } + + public boolean isHAEnabled() { + return isHAEnabled; + } + + public HAServiceState getHAServiceState() { + synchronized (haServiceStateLock) { + return haServiceState; + } + } + + void setHAServiceState(HAServiceState serviceState) { + synchronized (haServiceStateLock) { + this.haServiceState = serviceState; + } + } + + public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { + return this.rmApplicationHistoryWriter; + } + + public void setRMApplicationHistoryWriter( + RMApplicationHistoryWriter applicationHistoryWriter) { + this.rmApplicationHistoryWriter = applicationHistoryWriter; + } + + public void setSystemMetricsPublisher( + SystemMetricsPublisher metricsPublisher) { + this.systemMetricsPublisher = metricsPublisher; + } + + public SystemMetricsPublisher getSystemMetricsPublisher() { + return this.systemMetricsPublisher; + } + + public Configuration getYarnConfiguration() { + return this.yarnConfiguration; + } + + public void setYarnConfiguration(Configuration yarnConfiguration) { + this.yarnConfiguration = yarnConfiguration; + } + + public String getHAZookeeperConnectionState() { + if (elector == null) { + return "Could not find leader elector. Verify both HA and automatic " + + "failover are enabled."; + } else { + return elector.getZookeeperConnectionState(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 0573690cf62..dae3f176967 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -317,9 +317,9 @@ protected EmbeddedElector createEmbeddedElector() throws IOException { YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED); if (curatorEnabled) { this.curator = createAndStartCurator(conf); - elector = new CuratorBasedElectorService(rmContext, this); + elector = new CuratorBasedElectorService(this); } else { - elector = new ActiveStandbyElectorBasedElectorService(rmContext); + elector = new ActiveStandbyElectorBasedElectorService(this); } return elector; } @@ -510,7 +510,6 @@ public class RMActiveServices extends CompositeService { private ApplicationMasterLauncher applicationMasterLauncher; private ContainerAllocationExpirer containerAllocationExpirer; private ResourceManager rm; - private RMActiveServiceContext activeServiceContext; private boolean fromActive = false; private StandByTransitionRunnable standByTransitionRunnable; @@ -523,9 +522,6 @@ public class RMActiveServices extends CompositeService { protected void serviceInit(Configuration configuration) throws Exception { standByTransitionRunnable = new StandByTransitionRunnable(); - activeServiceContext = new RMActiveServiceContext(); - rmContext.setActiveServiceContext(activeServiceContext); - conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); rmSecretManagerService = createRMSecretManagerService(); addService(rmSecretManagerService); @@ -1031,7 +1027,7 @@ void reinitialize(boolean initialize) { ClusterMetrics.destroy(); QueueMetrics.clearQueueMetrics(); if (initialize) { - resetDispatcher(); + resetRMContext(); createAndInitActiveServices(true); } } @@ -1176,7 +1172,7 @@ protected ApplicationMasterService createApplicationMasterService() { } protected AdminService createAdminService() { - return new AdminService(this, rmContext); + return new AdminService(this); } protected RMSecretManagerService createRMSecretManagerService() { @@ -1299,16 +1295,24 @@ private Dispatcher setupDispatcher() { return dispatcher; } - private void resetDispatcher() { + private void resetRMContext() { + RMContextImpl rmContextImpl = new RMContextImpl(); + // transfer service context to new RM service Context + rmContextImpl.setServiceContext(rmContext.getServiceContext()); + + // reset dispatcher Dispatcher dispatcher = setupDispatcher(); - ((Service)dispatcher).init(this.conf); - ((Service)dispatcher).start(); - removeService((Service)rmDispatcher); + ((Service) dispatcher).init(this.conf); + ((Service) dispatcher).start(); + removeService((Service) rmDispatcher); // Need to stop previous rmDispatcher before assigning new dispatcher // otherwise causes "AsyncDispatcher event handler" thread leak ((Service) rmDispatcher).stop(); rmDispatcher = dispatcher; addIfService(rmDispatcher); + rmContextImpl.setDispatcher(dispatcher); + + rmContext = rmContextImpl; rmContext.setDispatcher(rmDispatcher); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index dc0bf67d75d..9272adab28d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -1055,7 +1055,7 @@ protected void serviceStop() { @Override protected AdminService createAdminService() { - return new AdminService(this, getRMContext()) { + return new AdminService(this) { @Override protected void startServer() { // override to not start rpc handler diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java index c4fcc5da4ab..47d18f30b3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java @@ -123,13 +123,15 @@ private void testCallbackSynchronization(SyncTestType type) throws IOException, InterruptedException { AdminService as = mock(AdminService.class); RMContext rc = mock(RMContext.class); + ResourceManager rm = mock(ResourceManager.class); Configuration myConf = new Configuration(conf); myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50); + when(rm.getRMContext()).thenReturn(rc); when(rc.getRMAdminService()).thenReturn(as); - ActiveStandbyElectorBasedElectorService - ees = new ActiveStandbyElectorBasedElectorService(rc); + ActiveStandbyElectorBasedElectorService ees = + new ActiveStandbyElectorBasedElectorService(rm); ees.init(myConf); ees.enterNeutralMode(); @@ -291,7 +293,7 @@ private class MockRMWithElector extends MockRM { @Override protected EmbeddedElector createEmbeddedElector() { - return new ActiveStandbyElectorBasedElectorService(getRMContext()) { + return new ActiveStandbyElectorBasedElectorService(this) { @Override public void becomeActive() throws ServiceFailedException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index 0efda9e6b02..a558dd58eb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -70,6 +70,7 @@ public class TestRMHA { private Log LOG = LogFactory.getLog(TestRMHA.class); private Configuration configuration; private MockRM rm = null; + private MockNM nm = null; private RMApp app = null; private RMAppAttempt attempt = null; private static final String STATE_ERR = @@ -134,7 +135,7 @@ private void checkActiveRMFunctionality() throws Exception { try { rm.getNewAppId(); - rm.registerNode("127.0.0.1:1", 2048); + nm = rm.registerNode("127.0.0.1:1", 2048); app = rm.submitApp(1024); attempt = app.getCurrentAppAttempt(); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED); @@ -549,6 +550,17 @@ public void testFailoverClearsRMContext() throws Exception { verifyClusterMetrics(1, 1, 1, 1, 2048, 1); assertEquals(1, rm.getRMContext().getRMNodes().size()); assertEquals(1, rm.getRMContext().getRMApps().size()); + Assert.assertNotNull("Node not registered", nm); + + rm.adminService.transitionToStandby(requestInfo); + checkMonitorHealth(); + checkStandbyRMFunctionality(); + // race condition causes to register/node heartbeat node even after service + // is stopping/stopped. New RMContext is being created on every transition + // to standby, so metrics should be 0 which indicates new context reference + // has taken. + nm.registerNode(); + verifyClusterMetrics(0, 0, 0, 0, 0, 0); // 3. Create new RM rm = new MockRM(conf, memStore) { @@ -590,7 +602,7 @@ public void testTransitionedToActiveRefreshFail() throws Exception { rm = new MockRM(configuration) { @Override protected AdminService createAdminService() { - return new AdminService(this, getRMContext()) { + return new AdminService(this) { int counter = 0; @Override protected void setConfig(Configuration conf) {