YARN-6102. RMActiveService context to be updated with new RMContext on failover. Contributed by Rohith Sharma K S.

This commit is contained in:
Sunil G 2017-07-24 20:57:25 +05:30
parent db7c5636b6
commit a657472b42
10 changed files with 407 additions and 200 deletions

View File

@ -57,7 +57,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC);
private RMContext rmContext;
private ResourceManager rm;
private byte[] localActiveNodeInfo;
private ActiveStandbyElector elector;
@ -66,9 +66,9 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
@VisibleForTesting
final Object zkDisconnectLock = new Object();
ActiveStandbyElectorBasedElectorService(RMContext rmContext) {
ActiveStandbyElectorBasedElectorService(ResourceManager rm) {
super(ActiveStandbyElectorBasedElectorService.class.getName());
this.rmContext = rmContext;
this.rm = rm;
}
@Override
@ -140,7 +140,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
cancelDisconnectTimer();
try {
rmContext.getRMAdminService().transitionToActive(req);
rm.getRMContext().getRMAdminService().transitionToActive(req);
} catch (Exception e) {
throw new ServiceFailedException("RM could not transition to Active", e);
}
@ -151,7 +151,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
cancelDisconnectTimer();
try {
rmContext.getRMAdminService().transitionToStandby(req);
rm.getRMContext().getRMAdminService().transitionToStandby(req);
} catch (Exception e) {
LOG.error("RM could not transition to Standby", e);
}
@ -205,7 +205,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService
@SuppressWarnings(value = "unchecked")
@Override
public void notifyFatalError(String errorMessage) {
rmContext.getDispatcher().getEventHandler().handle(
rm.getRMContext().getDispatcher().getEventHandler().handle(
new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED,
errorMessage));
}

View File

@ -102,7 +102,6 @@ public class AdminService extends CompositeService implements
private static final Log LOG = LogFactory.getLog(AdminService.class);
private final RMContext rmContext;
private final ResourceManager rm;
private String rmId;
@ -123,16 +122,16 @@ public class AdminService extends CompositeService implements
@VisibleForTesting
boolean isCentralizedNodeLabelConfiguration = true;
public AdminService(ResourceManager rm, RMContext rmContext) {
public AdminService(ResourceManager rm) {
super(AdminService.class.getName());
this.rm = rm;
this.rmContext = rmContext;
}
@Override
public void serviceInit(Configuration conf) throws Exception {
autoFailoverEnabled =
rmContext.isHAEnabled() && HAUtil.isAutomaticFailoverEnabled(conf);
rm.getRMContext().isHAEnabled()
&& HAUtil.isAutomaticFailoverEnabled(conf);
masterServiceBindAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
@ -189,7 +188,7 @@ public class AdminService extends CompositeService implements
RMPolicyProvider.getInstance());
}
if (rmContext.isHAEnabled()) {
if (rm.getRMContext().isHAEnabled()) {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
@ -265,7 +264,7 @@ public class AdminService extends CompositeService implements
}
private synchronized boolean isRMActive() {
return HAServiceState.ACTIVE == rmContext.getHAServiceState();
return HAServiceState.ACTIVE == rm.getRMContext().getHAServiceState();
}
private void throwStandbyException() throws StandbyException {
@ -304,7 +303,7 @@ public class AdminService extends CompositeService implements
// call all refresh*s for active RM to get the updated configurations.
refreshAll();
} catch (Exception e) {
rmContext
rm.getRMContext()
.getDispatcher()
.getEventHandler()
.handle(
@ -363,7 +362,7 @@ public class AdminService extends CompositeService implements
@Override
public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState");
HAServiceState haState = rmContext.getHAServiceState();
HAServiceState haState = rm.getRMContext().getHAServiceState();
HAServiceStatus ret = new HAServiceStatus(haState);
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
ret.setReadyToBecomeActive();
@ -395,11 +394,12 @@ public class AdminService extends CompositeService implements
}
private void refreshQueues() throws IOException, YarnException {
rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
rm.getRMContext().getScheduler().reinitialize(getConfig(),
this.rm.getRMContext());
// refresh the reservation system
ReservationSystem rSystem = rmContext.getReservationSystem();
ReservationSystem rSystem = rm.getRMContext().getReservationSystem();
if (rSystem != null) {
rSystem.reinitialize(getConfig(), rmContext);
rSystem.reinitialize(getConfig(), rm.getRMContext());
}
}
@ -418,14 +418,14 @@ public class AdminService extends CompositeService implements
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
switch (request.getDecommissionType()) {
case NORMAL:
rmContext.getNodesListManager().refreshNodes(conf);
rm.getRMContext().getNodesListManager().refreshNodes(conf);
break;
case GRACEFUL:
rmContext.getNodesListManager().refreshNodesGracefully(
rm.getRMContext().getNodesListManager().refreshNodesGracefully(
conf, request.getDecommissionTimeout());
break;
case FORCEFUL:
rmContext.getNodesListManager().refreshNodesForcefully();
rm.getRMContext().getNodesListManager().refreshNodesForcefully();
break;
}
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
@ -440,7 +440,7 @@ public class AdminService extends CompositeService implements
Configuration conf =
getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
rmContext.getNodesListManager().refreshNodes(conf);
rm.getRMContext().getNodesListManager().refreshNodes(conf);
}
@Override
@ -559,10 +559,11 @@ public class AdminService extends CompositeService implements
Configuration conf =
getConfiguration(new Configuration(false),
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider);
rmContext.getApplicationMasterService().refreshServiceAcls(
rm.getRMContext().getClientRMService().refreshServiceAcls(conf,
policyProvider);
rm.getRMContext().getApplicationMasterService().refreshServiceAcls(
conf, policyProvider);
rmContext.getResourceTrackerService().refreshServiceAcls(
rm.getRMContext().getResourceTrackerService().refreshServiceAcls(
conf, policyProvider);
}
@ -601,7 +602,7 @@ public class AdminService extends CompositeService implements
// if any invalid nodes, throw exception instead of partially updating
// valid nodes.
for (NodeId nodeId : nodeIds) {
RMNode node = this.rmContext.getRMNodes().get(nodeId);
RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId);
if (node == null) {
LOG.error("Resource update get failed on all nodes due to change "
+ "resource on an unrecognized node: " + nodeId);
@ -619,14 +620,14 @@ public class AdminService extends CompositeService implements
for (Map.Entry<NodeId, ResourceOption> entry : nodeResourceMap.entrySet()) {
ResourceOption newResourceOption = entry.getValue();
NodeId nodeId = entry.getKey();
RMNode node = this.rmContext.getRMNodes().get(nodeId);
RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId);
if (node == null) {
LOG.warn("Resource update get failed on an unrecognized node: " + nodeId);
allSuccess = false;
} else {
// update resource to RMNode
this.rmContext.getDispatcher().getEventHandler()
this.rm.getRMContext().getDispatcher().getEventHandler()
.handle(new RMNodeResourceUpdateEvent(nodeId, newResourceOption));
LOG.info("Update resource on node(" + node.getNodeID()
+ ") with resource(" + newResourceOption.toString() + ")");
@ -661,7 +662,8 @@ public class AdminService extends CompositeService implements
DynamicResourceConfiguration newConf;
InputStream drInputStream =
this.rmContext.getConfigurationProvider().getConfigurationInputStream(
this.rm.getRMContext().getConfigurationProvider()
.getConfigurationInputStream(
configuration, YarnConfiguration.DR_CONFIGURATION_FILE);
if (drInputStream != null) {
@ -679,7 +681,7 @@ public class AdminService extends CompositeService implements
updateNodeResource(updateRequest);
}
// refresh dynamic resource in ResourceTrackerService
this.rmContext.getResourceTrackerService().
this.rm.getRMContext().getResourceTrackerService().
updateDynamicResourceConfiguration(newConf);
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
@ -692,7 +694,8 @@ public class AdminService extends CompositeService implements
private synchronized Configuration getConfiguration(Configuration conf,
String... confFileNames) throws YarnException, IOException {
for (String confFileName : confFileNames) {
InputStream confFileInputStream = this.rmContext.getConfigurationProvider()
InputStream confFileInputStream =
this.rm.getRMContext().getConfigurationProvider()
.getConfigurationInputStream(conf, confFileName);
if (confFileInputStream != null) {
conf.addResource(confFileInputStream);
@ -746,7 +749,7 @@ public class AdminService extends CompositeService implements
AddToClusterNodeLabelsResponse response =
recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class);
try {
rmContext.getNodeLabelManager()
rm.getRMContext().getNodeLabelManager()
.addToCluserNodeLabels(request.getNodeLabels());
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
@ -769,7 +772,8 @@ public class AdminService extends CompositeService implements
RemoveFromClusterNodeLabelsResponse response =
recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class);
try {
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels());
rm.getRMContext().getNodeLabelManager()
.removeFromClusterNodeLabels(request.getNodeLabels());
RMAuditLogger
.logSuccess(user.getShortUserName(), operation, "AdminService");
return response;
@ -805,19 +809,20 @@ public class AdminService extends CompositeService implements
boolean isKnown = false;
// both active and inactive nodes are recognized as known nodes
if (requestedNode.getPort() != 0) {
if (rmContext.getRMNodes().containsKey(requestedNode)
|| rmContext.getInactiveRMNodes().containsKey(requestedNode)) {
if (rm.getRMContext().getRMNodes().containsKey(requestedNode) || rm
.getRMContext().getInactiveRMNodes().containsKey(requestedNode)) {
isKnown = true;
}
} else {
for (NodeId knownNode : rmContext.getRMNodes().keySet()) {
for (NodeId knownNode : rm.getRMContext().getRMNodes().keySet()) {
if (knownNode.getHost().equals(requestedNode.getHost())) {
isKnown = true;
break;
}
}
if (!isKnown) {
for (NodeId knownNode : rmContext.getInactiveRMNodes().keySet()) {
for (NodeId knownNode : rm.getRMContext().getInactiveRMNodes()
.keySet()) {
if (knownNode.getHost().equals(requestedNode.getHost())) {
isKnown = true;
break;
@ -841,7 +846,7 @@ public class AdminService extends CompositeService implements
}
}
try {
rmContext.getNodeLabelManager().replaceLabelsOnNode(
rm.getRMContext().getNodeLabelManager().replaceLabelsOnNode(
request.getNodeToLabels());
RMAuditLogger
.logSuccess(user.getShortUserName(), operation, "AdminService");
@ -878,7 +883,7 @@ public class AdminService extends CompositeService implements
checkRMStatus(user.getShortUserName(), operation, msg);
Set<NodeId> decommissioningNodes = rmContext.getNodesListManager()
Set<NodeId> decommissioningNodes = rm.getRMContext().getNodesListManager()
.checkForDecommissioningNodes();
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
"AdminService");
@ -914,6 +919,6 @@ public class AdminService extends CompositeService implements
getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
rmContext.getScheduler().setClusterMaxPriority(conf);
rm.getRMContext().getScheduler().setClusterMaxPriority(conf);
}
}

View File

@ -45,14 +45,12 @@ public class CuratorBasedElectorService extends AbstractService
LogFactory.getLog(CuratorBasedElectorService.class);
private LeaderLatch leaderLatch;
private CuratorFramework curator;
private RMContext rmContext;
private String latchPath;
private String rmId;
private ResourceManager rm;
public CuratorBasedElectorService(RMContext rmContext, ResourceManager rm) {
public CuratorBasedElectorService(ResourceManager rm) {
super(CuratorBasedElectorService.class.getName());
this.rmContext = rmContext;
this.rm = rm;
}
@ -102,7 +100,8 @@ public class CuratorBasedElectorService extends AbstractService
public void isLeader() {
LOG.info(rmId + "is elected leader, transitioning to active");
try {
rmContext.getRMAdminService().transitionToActive(
rm.getRMContext().getRMAdminService()
.transitionToActive(
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
} catch (Exception e) {
@ -123,7 +122,8 @@ public class CuratorBasedElectorService extends AbstractService
public void notLeader() {
LOG.info(rmId + " relinquish leadership");
try {
rmContext.getRMAdminService().transitionToStandby(
rm.getRMContext().getRMAdminService()
.transitionToStandby(
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
} catch (Exception e) {

View File

@ -30,8 +30,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
@ -44,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@ -106,6 +105,7 @@ public class RMActiveServiceContext {
private PlacementManager queuePlacementManager = null;
private RMAppLifetimeMonitor rmAppLifetimeMonitor;
private QueueLimitCalculator queueLimitCalculator;
public RMActiveServiceContext() {
queuePlacementManager = new PlacementManager();
@ -469,4 +469,17 @@ public class RMActiveServiceContext {
public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
return this.rmAppLifetimeMonitor;
}
@Private
@Unstable
public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
return this.queueLimitCalculator;
}
@Private
@Unstable
public void setContainerQueueLimitCalculator(
QueueLimitCalculator limitCalculator) {
this.queueLimitCalculator = limitCalculator;
}
}

View File

@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -56,37 +55,39 @@ import org.apache.hadoop.yarn.util.Clock;
import com.google.common.annotations.VisibleForTesting;
/**
* RMContextImpl class holds two services context.
* <ul>
* <li>serviceContext : These services called as <b>Always On</b> services.
* Services that need to run always irrespective of the HA state of the RM.</li>
* <li>activeServiceCotext : Active services context. Services that need to run
* only on the Active RM.</li>
* </ul>
* <p>
* <b>Note:</b> If any new service to be added to context, add it to a right
* context as per above description.
*/
public class RMContextImpl implements RMContext {
private Dispatcher rmDispatcher;
private boolean isHAEnabled;
private HAServiceState haServiceState =
HAServiceProtocol.HAServiceState.INITIALIZING;
private AdminService adminService;
private ConfigurationProvider configurationProvider;
/**
* RM service contexts which runs through out RM life span. These are created
* once during start of RM.
*/
private RMServiceContext serviceContext;
/**
* RM Active service context. This will be recreated for every transition from
* ACTIVE->STANDBY.
*/
private RMActiveServiceContext activeServiceContext;
private Configuration yarnConfiguration;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
private EmbeddedElector elector;
private QueueLimitCalculator queueLimitCalculator;
private final Object haServiceStateLock = new Object();
private ResourceManager resourceManager;
/**
* Default constructor. To be used in conjunction with setter methods for
* individual fields.
*/
public RMContextImpl() {
this.serviceContext = new RMServiceContext();
this.activeServiceContext = new RMActiveServiceContext();
}
@VisibleForTesting
@ -137,19 +138,143 @@ public class RMContextImpl implements RMContext {
clientToAMTokenSecretManager, null);
}
@Override
public Dispatcher getDispatcher() {
return this.rmDispatcher;
/**
* RM service contexts which runs through out JVM life span. These are created
* once during start of RM.
* @return serviceContext of RM
*/
@Private
@Unstable
public RMServiceContext getServiceContext() {
return serviceContext;
}
/**
* <b>Note:</b> setting service context clears all services embedded with it.
* @param context rm service context
*/
@Private
@Unstable
public void setServiceContext(RMServiceContext context) {
this.serviceContext = context;
}
@Override
public void setLeaderElectorService(EmbeddedElector elector) {
this.elector = elector;
public ResourceManager getResourceManager() {
return serviceContext.getResourceManager();
}
public void setResourceManager(ResourceManager rm) {
serviceContext.setResourceManager(rm);
}
@Override
public EmbeddedElector getLeaderElectorService() {
return this.elector;
return serviceContext.getLeaderElectorService();
}
@Override
public void setLeaderElectorService(EmbeddedElector elector) {
serviceContext.setLeaderElectorService(elector);
}
@Override
public Dispatcher getDispatcher() {
return serviceContext.getDispatcher();
}
void setDispatcher(Dispatcher dispatcher) {
serviceContext.setDispatcher(dispatcher);
}
@Override
public AdminService getRMAdminService() {
return serviceContext.getRMAdminService();
}
void setRMAdminService(AdminService adminService) {
serviceContext.setRMAdminService(adminService);
}
@Override
public boolean isHAEnabled() {
return serviceContext.isHAEnabled();
}
void setHAEnabled(boolean isHAEnabled) {
serviceContext.setHAEnabled(isHAEnabled);
}
@Override
public HAServiceState getHAServiceState() {
return serviceContext.getHAServiceState();
}
void setHAServiceState(HAServiceState serviceState) {
serviceContext.setHAServiceState(serviceState);
}
@Override
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
return serviceContext.getRMApplicationHistoryWriter();
}
@Override
public void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
serviceContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
}
@Override
public SystemMetricsPublisher getSystemMetricsPublisher() {
return serviceContext.getSystemMetricsPublisher();
}
@Override
public void setSystemMetricsPublisher(
SystemMetricsPublisher metricsPublisher) {
serviceContext.setSystemMetricsPublisher(metricsPublisher);
}
@Override
public ConfigurationProvider getConfigurationProvider() {
return serviceContext.getConfigurationProvider();
}
public void setConfigurationProvider(
ConfigurationProvider configurationProvider) {
serviceContext.setConfigurationProvider(configurationProvider);
}
@Override
public Configuration getYarnConfiguration() {
return serviceContext.getYarnConfiguration();
}
public void setYarnConfiguration(Configuration yarnConfiguration) {
serviceContext.setYarnConfiguration(yarnConfiguration);
}
public String getHAZookeeperConnectionState() {
return serviceContext.getHAZookeeperConnectionState();
}
// ==========================================================================
/**
* RM Active service context. This will be recreated for every transition from
* ACTIVE to STANDBY.
* @return activeServiceContext of active services
*/
@Private
@Unstable
public RMActiveServiceContext getActiveServiceContext() {
return activeServiceContext;
}
@Private
@Unstable
void setActiveServiceContext(RMActiveServiceContext activeServiceContext) {
this.activeServiceContext = activeServiceContext;
}
@Override
@ -227,11 +352,6 @@ public class RMContextImpl implements RMContext {
return activeServiceContext.getClientToAMTokenSecretManager();
}
@Override
public AdminService getRMAdminService() {
return this.adminService;
}
@VisibleForTesting
public void setStateStore(RMStateStore store) {
activeServiceContext.setStateStore(store);
@ -252,24 +372,6 @@ public class RMContextImpl implements RMContext {
return activeServiceContext.getResourceTrackerService();
}
void setHAEnabled(boolean isHAEnabled) {
this.isHAEnabled = isHAEnabled;
}
void setHAServiceState(HAServiceState serviceState) {
synchronized (haServiceStateLock) {
this.haServiceState = serviceState;
}
}
void setDispatcher(Dispatcher dispatcher) {
this.rmDispatcher = dispatcher;
}
void setRMAdminService(AdminService adminService) {
this.adminService = adminService;
}
@Override
public void setClientRMService(ClientRMService clientRMService) {
activeServiceContext.setClientRMService(clientRMService);
@ -347,18 +449,6 @@ public class RMContextImpl implements RMContext {
activeServiceContext.setResourceTrackerService(resourceTrackerService);
}
@Override
public boolean isHAEnabled() {
return isHAEnabled;
}
@Override
public HAServiceState getHAServiceState() {
synchronized (haServiceStateLock) {
return haServiceState;
}
}
public void setWorkPreservingRecoveryEnabled(boolean enabled) {
activeServiceContext.setWorkPreservingRecoveryEnabled(enabled);
}
@ -368,39 +458,6 @@ public class RMContextImpl implements RMContext {
return activeServiceContext.isWorkPreservingRecoveryEnabled();
}
@Override
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
return this.rmApplicationHistoryWriter;
}
@Override
public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) {
this.systemMetricsPublisher = systemMetricsPublisher;
}
@Override
public SystemMetricsPublisher getSystemMetricsPublisher() {
return this.systemMetricsPublisher;
}
@Override
public void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
}
@Override
public ConfigurationProvider getConfigurationProvider() {
return this.configurationProvider;
}
public void setConfigurationProvider(
ConfigurationProvider configurationProvider) {
this.configurationProvider = configurationProvider;
}
@Override
public long getEpoch() {
return activeServiceContext.getEpoch();
@ -450,27 +507,6 @@ public class RMContextImpl implements RMContext {
return activeServiceContext.getSystemCredentialsForApps();
}
@Private
@Unstable
public RMActiveServiceContext getActiveServiceContext() {
return activeServiceContext;
}
@Private
@Unstable
void setActiveServiceContext(RMActiveServiceContext activeServiceContext) {
this.activeServiceContext = activeServiceContext;
}
@Override
public Configuration getYarnConfiguration() {
return this.yarnConfiguration;
}
public void setYarnConfiguration(Configuration yarnConfiguration) {
this.yarnConfiguration=yarnConfiguration;
}
@Override
public PlacementManager getQueuePlacementManager() {
return this.activeServiceContext.getQueuePlacementManager();
@ -483,12 +519,12 @@ public class RMContextImpl implements RMContext {
@Override
public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
return this.queueLimitCalculator;
return activeServiceContext.getNodeManagerQueueLimitCalculator();
}
public void setContainerQueueLimitCalculator(
QueueLimitCalculator limitCalculator) {
this.queueLimitCalculator = limitCalculator;
activeServiceContext.setContainerQueueLimitCalculator(limitCalculator);
}
@Override
@ -502,21 +538,5 @@ public class RMContextImpl implements RMContext {
return this.activeServiceContext.getRMAppLifetimeMonitor();
}
public String getHAZookeeperConnectionState() {
if (elector == null) {
return "Could not find leader elector. Verify both HA and automatic " +
"failover are enabled.";
} else {
return elector.getZookeeperConnectionState();
}
}
@Override
public ResourceManager getResourceManager() {
return resourceManager;
}
public void setResourceManager(ResourceManager rm) {
this.resourceManager = rm;
}
// Note: Read java doc before adding any services over here.
}

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
/**
* RMServiceContext class maintains "Always On" services. Services that need to
* run always irrespective of the HA state of the RM. This is created during
* initialization of RMContextImpl.
* <p>
* <b>Note:</b> If any services to be added in this class, make sure service
* will be running always irrespective of the HA state of the RM
*/
@Private
@Unstable
public class RMServiceContext {
private Dispatcher rmDispatcher;
private boolean isHAEnabled;
private HAServiceState haServiceState =
HAServiceProtocol.HAServiceState.INITIALIZING;
private AdminService adminService;
private ConfigurationProvider configurationProvider;
private Configuration yarnConfiguration;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
private EmbeddedElector elector;
private final Object haServiceStateLock = new Object();
private ResourceManager resourceManager;
public ResourceManager getResourceManager() {
return resourceManager;
}
public void setResourceManager(ResourceManager rm) {
this.resourceManager = rm;
}
public ConfigurationProvider getConfigurationProvider() {
return this.configurationProvider;
}
public void setConfigurationProvider(
ConfigurationProvider configurationProvider) {
this.configurationProvider = configurationProvider;
}
public Dispatcher getDispatcher() {
return this.rmDispatcher;
}
void setDispatcher(Dispatcher dispatcher) {
this.rmDispatcher = dispatcher;
}
public EmbeddedElector getLeaderElectorService() {
return this.elector;
}
public void setLeaderElectorService(EmbeddedElector embeddedElector) {
this.elector = embeddedElector;
}
public AdminService getRMAdminService() {
return this.adminService;
}
void setRMAdminService(AdminService service) {
this.adminService = service;
}
void setHAEnabled(boolean rmHAEnabled) {
this.isHAEnabled = rmHAEnabled;
}
public boolean isHAEnabled() {
return isHAEnabled;
}
public HAServiceState getHAServiceState() {
synchronized (haServiceStateLock) {
return haServiceState;
}
}
void setHAServiceState(HAServiceState serviceState) {
synchronized (haServiceStateLock) {
this.haServiceState = serviceState;
}
}
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
return this.rmApplicationHistoryWriter;
}
public void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter applicationHistoryWriter) {
this.rmApplicationHistoryWriter = applicationHistoryWriter;
}
public void setSystemMetricsPublisher(
SystemMetricsPublisher metricsPublisher) {
this.systemMetricsPublisher = metricsPublisher;
}
public SystemMetricsPublisher getSystemMetricsPublisher() {
return this.systemMetricsPublisher;
}
public Configuration getYarnConfiguration() {
return this.yarnConfiguration;
}
public void setYarnConfiguration(Configuration yarnConfiguration) {
this.yarnConfiguration = yarnConfiguration;
}
public String getHAZookeeperConnectionState() {
if (elector == null) {
return "Could not find leader elector. Verify both HA and automatic "
+ "failover are enabled.";
} else {
return elector.getZookeeperConnectionState();
}
}
}

View File

@ -317,9 +317,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
this.curator = createAndStartCurator(conf);
elector = new CuratorBasedElectorService(rmContext, this);
elector = new CuratorBasedElectorService(this);
} else {
elector = new ActiveStandbyElectorBasedElectorService(rmContext);
elector = new ActiveStandbyElectorBasedElectorService(this);
}
return elector;
}
@ -510,7 +510,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
private ApplicationMasterLauncher applicationMasterLauncher;
private ContainerAllocationExpirer containerAllocationExpirer;
private ResourceManager rm;
private RMActiveServiceContext activeServiceContext;
private boolean fromActive = false;
private StandByTransitionRunnable standByTransitionRunnable;
@ -523,9 +522,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected void serviceInit(Configuration configuration) throws Exception {
standByTransitionRunnable = new StandByTransitionRunnable();
activeServiceContext = new RMActiveServiceContext();
rmContext.setActiveServiceContext(activeServiceContext);
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
rmSecretManagerService = createRMSecretManagerService();
addService(rmSecretManagerService);
@ -1031,7 +1027,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
ClusterMetrics.destroy();
QueueMetrics.clearQueueMetrics();
if (initialize) {
resetDispatcher();
resetRMContext();
createAndInitActiveServices(true);
}
}
@ -1176,7 +1172,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
protected AdminService createAdminService() {
return new AdminService(this, rmContext);
return new AdminService(this);
}
protected RMSecretManagerService createRMSecretManagerService() {
@ -1299,16 +1295,24 @@ public class ResourceManager extends CompositeService implements Recoverable {
return dispatcher;
}
private void resetDispatcher() {
private void resetRMContext() {
RMContextImpl rmContextImpl = new RMContextImpl();
// transfer service context to new RM service Context
rmContextImpl.setServiceContext(rmContext.getServiceContext());
// reset dispatcher
Dispatcher dispatcher = setupDispatcher();
((Service)dispatcher).init(this.conf);
((Service)dispatcher).start();
removeService((Service)rmDispatcher);
((Service) dispatcher).init(this.conf);
((Service) dispatcher).start();
removeService((Service) rmDispatcher);
// Need to stop previous rmDispatcher before assigning new dispatcher
// otherwise causes "AsyncDispatcher event handler" thread leak
((Service) rmDispatcher).stop();
rmDispatcher = dispatcher;
addIfService(rmDispatcher);
rmContextImpl.setDispatcher(dispatcher);
rmContext = rmContextImpl;
rmContext.setDispatcher(rmDispatcher);
}

View File

@ -1055,7 +1055,7 @@ public class MockRM extends ResourceManager {
@Override
protected AdminService createAdminService() {
return new AdminService(this, getRMContext()) {
return new AdminService(this) {
@Override
protected void startServer() {
// override to not start rpc handler

View File

@ -123,13 +123,15 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
throws IOException, InterruptedException {
AdminService as = mock(AdminService.class);
RMContext rc = mock(RMContext.class);
ResourceManager rm = mock(ResourceManager.class);
Configuration myConf = new Configuration(conf);
myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50);
when(rm.getRMContext()).thenReturn(rc);
when(rc.getRMAdminService()).thenReturn(as);
ActiveStandbyElectorBasedElectorService
ees = new ActiveStandbyElectorBasedElectorService(rc);
ActiveStandbyElectorBasedElectorService ees =
new ActiveStandbyElectorBasedElectorService(rm);
ees.init(myConf);
ees.enterNeutralMode();
@ -291,7 +293,7 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
@Override
protected EmbeddedElector createEmbeddedElector() {
return new ActiveStandbyElectorBasedElectorService(getRMContext()) {
return new ActiveStandbyElectorBasedElectorService(this) {
@Override
public void becomeActive() throws
ServiceFailedException {

View File

@ -70,6 +70,7 @@ public class TestRMHA {
private Log LOG = LogFactory.getLog(TestRMHA.class);
private Configuration configuration;
private MockRM rm = null;
private MockNM nm = null;
private RMApp app = null;
private RMAppAttempt attempt = null;
private static final String STATE_ERR =
@ -134,7 +135,7 @@ public class TestRMHA {
try {
rm.getNewAppId();
rm.registerNode("127.0.0.1:1", 2048);
nm = rm.registerNode("127.0.0.1:1", 2048);
app = rm.submitApp(1024);
attempt = app.getCurrentAppAttempt();
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED);
@ -549,6 +550,17 @@ public class TestRMHA {
verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
assertEquals(1, rm.getRMContext().getRMNodes().size());
assertEquals(1, rm.getRMContext().getRMApps().size());
Assert.assertNotNull("Node not registered", nm);
rm.adminService.transitionToStandby(requestInfo);
checkMonitorHealth();
checkStandbyRMFunctionality();
// race condition causes to register/node heartbeat node even after service
// is stopping/stopped. New RMContext is being created on every transition
// to standby, so metrics should be 0 which indicates new context reference
// has taken.
nm.registerNode();
verifyClusterMetrics(0, 0, 0, 0, 0, 0);
// 3. Create new RM
rm = new MockRM(conf, memStore) {
@ -590,7 +602,7 @@ public class TestRMHA {
rm = new MockRM(configuration) {
@Override
protected AdminService createAdminService() {
return new AdminService(this, getRMContext()) {
return new AdminService(this) {
int counter = 0;
@Override
protected void setConfig(Configuration conf) {