YARN-2865. Fixed RM to always create a new RMContext when transtions from StandBy to Active. Contributed by Rohith Sharmaks

(cherry picked from commit 9cb8b75ba5)

(cherry picked from commit db31ef7e7f)
(cherry picked from commit e669974ae94c03914c9181a4481b4879fd4acc0d)
This commit is contained in:
Jian He 2014-11-19 19:48:33 -08:00 committed by Vinod Kumar Vavilapalli
parent 0898c21e24
commit 7f97189bcf
5 changed files with 630 additions and 159 deletions

View File

@ -21,6 +21,9 @@ Release 2.6.1 - UNRELEASED
YARN-2414. RM web UI: app page will crash if app is failed before any
attempt has been created (Wangda Tan via jlowe)
YARN-2865. Fixed RM to always create a new RMContext when transtions from
StandBy to Active. (Rohith Sharmaks via jianhe)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -0,0 +1,455 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* The RMActiveServiceContext is the class that maintains all the
* RMActiveService contexts.This is expected to be used only by ResourceManager
* and RMContext.
*/
@Private
@Unstable
public class RMActiveServiceContext {
private static final Log LOG = LogFactory
.getLog(RMActiveServiceContext.class);
private final ConcurrentMap<ApplicationId, RMApp> applications =
new ConcurrentHashMap<ApplicationId, RMApp>();
private final ConcurrentMap<NodeId, RMNode> nodes =
new ConcurrentHashMap<NodeId, RMNode>();
private final ConcurrentMap<String, RMNode> inactiveNodes =
new ConcurrentHashMap<String, RMNode>();
private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
new ConcurrentHashMap<ApplicationId, ByteBuffer>();
private boolean isWorkPreservingRecoveryEnabled;
private AMLivelinessMonitor amLivelinessMonitor;
private AMLivelinessMonitor amFinishingMonitor;
private RMStateStore stateStore = null;
private ContainerAllocationExpirer containerAllocationExpirer;
private DelegationTokenRenewer delegationTokenRenewer;
private AMRMTokenSecretManager amRMTokenSecretManager;
private RMContainerTokenSecretManager containerTokenSecretManager;
private NMTokenSecretManagerInRM nmTokenSecretManager;
private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
private ClientRMService clientRMService;
private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
private ResourceScheduler scheduler;
private ReservationSystem reservationSystem;
private NodesListManager nodesListManager;
private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
private RMNodeLabelsManager nodeLabelManager;
private long epoch;
private Clock systemClock = new SystemClock();
private long schedulerRecoveryStartTime = 0;
private long schedulerRecoveryWaitTime = 0;
private boolean printLog = true;
private boolean isSchedulerReady = false;
public RMActiveServiceContext() {
}
@Private
@Unstable
public RMActiveServiceContext(Dispatcher rmDispatcher,
ContainerAllocationExpirer containerAllocationExpirer,
AMLivelinessMonitor amLivelinessMonitor,
AMLivelinessMonitor amFinishingMonitor,
DelegationTokenRenewer delegationTokenRenewer,
AMRMTokenSecretManager appTokenSecretManager,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
this();
this.setContainerAllocationExpirer(containerAllocationExpirer);
this.setAMLivelinessMonitor(amLivelinessMonitor);
this.setAMFinishingMonitor(amFinishingMonitor);
this.setDelegationTokenRenewer(delegationTokenRenewer);
this.setAMRMTokenSecretManager(appTokenSecretManager);
this.setContainerTokenSecretManager(containerTokenSecretManager);
this.setNMTokenSecretManager(nmTokenSecretManager);
this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
RMStateStore nullStore = new NullRMStateStore();
nullStore.setRMDispatcher(rmDispatcher);
try {
nullStore.init(new YarnConfiguration());
setStateStore(nullStore);
} catch (Exception e) {
assert false;
}
}
@Private
@Unstable
public void setStateStore(RMStateStore store) {
stateStore = store;
}
@Private
@Unstable
public ClientRMService getClientRMService() {
return clientRMService;
}
@Private
@Unstable
public ApplicationMasterService getApplicationMasterService() {
return applicationMasterService;
}
@Private
@Unstable
public ResourceTrackerService getResourceTrackerService() {
return resourceTrackerService;
}
@Private
@Unstable
public RMStateStore getStateStore() {
return stateStore;
}
@Private
@Unstable
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
return this.applications;
}
@Private
@Unstable
public ConcurrentMap<NodeId, RMNode> getRMNodes() {
return this.nodes;
}
@Private
@Unstable
public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
return this.inactiveNodes;
}
@Private
@Unstable
public ContainerAllocationExpirer getContainerAllocationExpirer() {
return this.containerAllocationExpirer;
}
@Private
@Unstable
public AMLivelinessMonitor getAMLivelinessMonitor() {
return this.amLivelinessMonitor;
}
@Private
@Unstable
public AMLivelinessMonitor getAMFinishingMonitor() {
return this.amFinishingMonitor;
}
@Private
@Unstable
public DelegationTokenRenewer getDelegationTokenRenewer() {
return delegationTokenRenewer;
}
@Private
@Unstable
public AMRMTokenSecretManager getAMRMTokenSecretManager() {
return this.amRMTokenSecretManager;
}
@Private
@Unstable
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
return this.containerTokenSecretManager;
}
@Private
@Unstable
public NMTokenSecretManagerInRM getNMTokenSecretManager() {
return this.nmTokenSecretManager;
}
@Private
@Unstable
public ResourceScheduler getScheduler() {
return this.scheduler;
}
@Private
@Unstable
public ReservationSystem getReservationSystem() {
return this.reservationSystem;
}
@Private
@Unstable
public NodesListManager getNodesListManager() {
return this.nodesListManager;
}
@Private
@Unstable
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
return this.clientToAMTokenSecretManager;
}
@Private
@Unstable
public void setClientRMService(ClientRMService clientRMService) {
this.clientRMService = clientRMService;
}
@Private
@Unstable
public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() {
return this.rmDelegationTokenSecretManager;
}
@Private
@Unstable
public void setRMDelegationTokenSecretManager(
RMDelegationTokenSecretManager delegationTokenSecretManager) {
this.rmDelegationTokenSecretManager = delegationTokenSecretManager;
}
@Private
@Unstable
void setContainerAllocationExpirer(
ContainerAllocationExpirer containerAllocationExpirer) {
this.containerAllocationExpirer = containerAllocationExpirer;
}
@Private
@Unstable
void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) {
this.amLivelinessMonitor = amLivelinessMonitor;
}
@Private
@Unstable
void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) {
this.amFinishingMonitor = amFinishingMonitor;
}
@Private
@Unstable
void setContainerTokenSecretManager(
RMContainerTokenSecretManager containerTokenSecretManager) {
this.containerTokenSecretManager = containerTokenSecretManager;
}
@Private
@Unstable
void setNMTokenSecretManager(NMTokenSecretManagerInRM nmTokenSecretManager) {
this.nmTokenSecretManager = nmTokenSecretManager;
}
@Private
@Unstable
void setScheduler(ResourceScheduler scheduler) {
this.scheduler = scheduler;
}
@Private
@Unstable
void setReservationSystem(ReservationSystem reservationSystem) {
this.reservationSystem = reservationSystem;
}
@Private
@Unstable
void setDelegationTokenRenewer(DelegationTokenRenewer delegationTokenRenewer) {
this.delegationTokenRenewer = delegationTokenRenewer;
}
@Private
@Unstable
void setClientToAMTokenSecretManager(
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
}
@Private
@Unstable
void setAMRMTokenSecretManager(AMRMTokenSecretManager amRMTokenSecretManager) {
this.amRMTokenSecretManager = amRMTokenSecretManager;
}
@Private
@Unstable
void setNodesListManager(NodesListManager nodesListManager) {
this.nodesListManager = nodesListManager;
}
@Private
@Unstable
void setApplicationMasterService(
ApplicationMasterService applicationMasterService) {
this.applicationMasterService = applicationMasterService;
}
@Private
@Unstable
void setResourceTrackerService(ResourceTrackerService resourceTrackerService) {
this.resourceTrackerService = resourceTrackerService;
}
@Private
@Unstable
public void setWorkPreservingRecoveryEnabled(boolean enabled) {
this.isWorkPreservingRecoveryEnabled = enabled;
}
@Private
@Unstable
public boolean isWorkPreservingRecoveryEnabled() {
return this.isWorkPreservingRecoveryEnabled;
}
@Private
@Unstable
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
return rmApplicationHistoryWriter;
}
@Private
@Unstable
public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) {
this.systemMetricsPublisher = systemMetricsPublisher;
}
@Private
@Unstable
public SystemMetricsPublisher getSystemMetricsPublisher() {
return systemMetricsPublisher;
}
@Private
@Unstable
public void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
}
@Private
@Unstable
public long getEpoch() {
return this.epoch;
}
@Private
@Unstable
void setEpoch(long epoch) {
this.epoch = epoch;
}
@Private
@Unstable
public RMNodeLabelsManager getNodeLabelManager() {
return nodeLabelManager;
}
@Private
@Unstable
public void setNodeLabelManager(RMNodeLabelsManager mgr) {
nodeLabelManager = mgr;
}
@Private
@Unstable
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
this.schedulerRecoveryStartTime = systemClock.getTime();
this.schedulerRecoveryWaitTime = waitTime;
}
@Private
@Unstable
public boolean isSchedulerReadyForAllocatingContainers() {
if (isSchedulerReady) {
return isSchedulerReady;
}
isSchedulerReady =
(systemClock.getTime() - schedulerRecoveryStartTime) > schedulerRecoveryWaitTime;
if (!isSchedulerReady && printLog) {
LOG.info("Skip allocating containers. Scheduler is waiting for recovery.");
printLog = false;
}
if (isSchedulerReady) {
LOG.info("Scheduler recovery is done. Start allocating new containers.");
}
return isSchedulerReady;
}
@Private
@Unstable
public void setSystemClock(Clock clock) {
this.systemClock = clock;
}
@Private
@Unstable
public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
return systemCredentials;
}
}

View File

@ -19,24 +19,20 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -51,7 +47,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManag
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import com.google.common.annotations.VisibleForTesting;
@ -59,52 +54,16 @@ public class RMContextImpl implements RMContext {
private Dispatcher rmDispatcher;
private final ConcurrentMap<ApplicationId, RMApp> applications
= new ConcurrentHashMap<ApplicationId, RMApp>();
private final ConcurrentMap<NodeId, RMNode> nodes
= new ConcurrentHashMap<NodeId, RMNode>();
private final ConcurrentMap<String, RMNode> inactiveNodes
= new ConcurrentHashMap<String, RMNode>();
private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
new ConcurrentHashMap<ApplicationId, ByteBuffer>();
private boolean isHAEnabled;
private boolean isWorkPreservingRecoveryEnabled;
private HAServiceState haServiceState =
HAServiceProtocol.HAServiceState.INITIALIZING;
private AMLivelinessMonitor amLivelinessMonitor;
private AMLivelinessMonitor amFinishingMonitor;
private RMStateStore stateStore = null;
private ContainerAllocationExpirer containerAllocationExpirer;
private DelegationTokenRenewer delegationTokenRenewer;
private AMRMTokenSecretManager amRMTokenSecretManager;
private RMContainerTokenSecretManager containerTokenSecretManager;
private NMTokenSecretManagerInRM nmTokenSecretManager;
private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
private AdminService adminService;
private ClientRMService clientRMService;
private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
private ResourceScheduler scheduler;
private ReservationSystem reservationSystem;
private NodesListManager nodesListManager;
private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
private ConfigurationProvider configurationProvider;
private RMNodeLabelsManager nodeLabelManager;
private long epoch;
private Clock systemClock = new SystemClock();
private long schedulerRecoveryStartTime = 0;
private long schedulerRecoveryWaitTime = 0;
private boolean printLog = true;
private boolean isSchedulerReady = false;
private static final Log LOG = LogFactory.getLog(RMContextImpl.class);
private AdminService adminService;
private ConfigurationProvider configurationProvider;
private RMActiveServiceContext activeServiceContext;
/**
* Default constructor. To be used in conjunction with setter methods for
@ -128,24 +87,11 @@ public class RMContextImpl implements RMContext {
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
this();
this.setDispatcher(rmDispatcher);
this.setContainerAllocationExpirer(containerAllocationExpirer);
this.setAMLivelinessMonitor(amLivelinessMonitor);
this.setAMFinishingMonitor(amFinishingMonitor);
this.setDelegationTokenRenewer(delegationTokenRenewer);
this.setAMRMTokenSecretManager(appTokenSecretManager);
this.setContainerTokenSecretManager(containerTokenSecretManager);
this.setNMTokenSecretManager(nmTokenSecretManager);
this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
RMStateStore nullStore = new NullRMStateStore();
nullStore.setRMDispatcher(rmDispatcher);
try {
nullStore.init(new YarnConfiguration());
setStateStore(nullStore);
} catch (Exception e) {
assert false;
}
setActiveServiceContext(new RMActiveServiceContext(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
delegationTokenRenewer, appTokenSecretManager,
containerTokenSecretManager, nmTokenSecretManager,
clientToAMTokenSecretManager, rmApplicationHistoryWriter));
ConfigurationProvider provider = new LocalConfigurationProvider();
setConfigurationProvider(provider);
@ -155,80 +101,80 @@ public class RMContextImpl implements RMContext {
public Dispatcher getDispatcher() {
return this.rmDispatcher;
}
@Override
@Override
public RMStateStore getStateStore() {
return stateStore;
return activeServiceContext.getStateStore();
}
@Override
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
return this.applications;
return activeServiceContext.getRMApps();
}
@Override
public ConcurrentMap<NodeId, RMNode> getRMNodes() {
return this.nodes;
return activeServiceContext.getRMNodes();
}
@Override
public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
return this.inactiveNodes;
return activeServiceContext.getInactiveRMNodes();
}
@Override
public ContainerAllocationExpirer getContainerAllocationExpirer() {
return this.containerAllocationExpirer;
return activeServiceContext.getContainerAllocationExpirer();
}
@Override
public AMLivelinessMonitor getAMLivelinessMonitor() {
return this.amLivelinessMonitor;
return activeServiceContext.getAMLivelinessMonitor();
}
@Override
public AMLivelinessMonitor getAMFinishingMonitor() {
return this.amFinishingMonitor;
return activeServiceContext.getAMFinishingMonitor();
}
@Override
public DelegationTokenRenewer getDelegationTokenRenewer() {
return delegationTokenRenewer;
return activeServiceContext.getDelegationTokenRenewer();
}
@Override
public AMRMTokenSecretManager getAMRMTokenSecretManager() {
return this.amRMTokenSecretManager;
return activeServiceContext.getAMRMTokenSecretManager();
}
@Override
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
return this.containerTokenSecretManager;
return activeServiceContext.getContainerTokenSecretManager();
}
@Override
public NMTokenSecretManagerInRM getNMTokenSecretManager() {
return this.nmTokenSecretManager;
return activeServiceContext.getNMTokenSecretManager();
}
@Override
public ResourceScheduler getScheduler() {
return this.scheduler;
return activeServiceContext.getScheduler();
}
@Override
public ReservationSystem getReservationSystem() {
return this.reservationSystem;
return activeServiceContext.getReservationSystem();
}
@Override
public NodesListManager getNodesListManager() {
return this.nodesListManager;
return activeServiceContext.getNodesListManager();
}
@Override
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
return this.clientToAMTokenSecretManager;
return activeServiceContext.getClientToAMTokenSecretManager();
}
@Override
@ -238,22 +184,22 @@ public class RMContextImpl implements RMContext {
@VisibleForTesting
public void setStateStore(RMStateStore store) {
stateStore = store;
activeServiceContext.setStateStore(store);
}
@Override
public ClientRMService getClientRMService() {
return this.clientRMService;
return activeServiceContext.getClientRMService();
}
@Override
public ApplicationMasterService getApplicationMasterService() {
return applicationMasterService;
return activeServiceContext.getApplicationMasterService();
}
@Override
public ResourceTrackerService getResourceTrackerService() {
return resourceTrackerService;
return activeServiceContext.getResourceTrackerService();
}
void setHAEnabled(boolean isHAEnabled) {
@ -276,78 +222,78 @@ public class RMContextImpl implements RMContext {
@Override
public void setClientRMService(ClientRMService clientRMService) {
this.clientRMService = clientRMService;
activeServiceContext.setClientRMService(clientRMService);
}
@Override
public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() {
return this.rmDelegationTokenSecretManager;
return activeServiceContext.getRMDelegationTokenSecretManager();
}
@Override
public void setRMDelegationTokenSecretManager(
RMDelegationTokenSecretManager delegationTokenSecretManager) {
this.rmDelegationTokenSecretManager = delegationTokenSecretManager;
activeServiceContext
.setRMDelegationTokenSecretManager(delegationTokenSecretManager);
}
void setContainerAllocationExpirer(
ContainerAllocationExpirer containerAllocationExpirer) {
this.containerAllocationExpirer = containerAllocationExpirer;
activeServiceContext
.setContainerAllocationExpirer(containerAllocationExpirer);
}
void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) {
this.amLivelinessMonitor = amLivelinessMonitor;
activeServiceContext.setAMLivelinessMonitor(amLivelinessMonitor);
}
void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) {
this.amFinishingMonitor = amFinishingMonitor;
activeServiceContext.setAMFinishingMonitor(amFinishingMonitor);
}
void setContainerTokenSecretManager(
RMContainerTokenSecretManager containerTokenSecretManager) {
this.containerTokenSecretManager = containerTokenSecretManager;
activeServiceContext
.setContainerTokenSecretManager(containerTokenSecretManager);
}
void setNMTokenSecretManager(
NMTokenSecretManagerInRM nmTokenSecretManager) {
this.nmTokenSecretManager = nmTokenSecretManager;
void setNMTokenSecretManager(NMTokenSecretManagerInRM nmTokenSecretManager) {
activeServiceContext.setNMTokenSecretManager(nmTokenSecretManager);
}
void setScheduler(ResourceScheduler scheduler) {
this.scheduler = scheduler;
}
void setReservationSystem(ReservationSystem reservationSystem) {
this.reservationSystem = reservationSystem;
activeServiceContext.setScheduler(scheduler);
}
void setDelegationTokenRenewer(
DelegationTokenRenewer delegationTokenRenewer) {
this.delegationTokenRenewer = delegationTokenRenewer;
void setReservationSystem(ReservationSystem reservationSystem) {
activeServiceContext.setReservationSystem(reservationSystem);
}
void setDelegationTokenRenewer(DelegationTokenRenewer delegationTokenRenewer) {
activeServiceContext.setDelegationTokenRenewer(delegationTokenRenewer);
}
void setClientToAMTokenSecretManager(
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
activeServiceContext
.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
}
void setAMRMTokenSecretManager(
AMRMTokenSecretManager amRMTokenSecretManager) {
this.amRMTokenSecretManager = amRMTokenSecretManager;
void setAMRMTokenSecretManager(AMRMTokenSecretManager amRMTokenSecretManager) {
activeServiceContext.setAMRMTokenSecretManager(amRMTokenSecretManager);
}
void setNodesListManager(NodesListManager nodesListManager) {
this.nodesListManager = nodesListManager;
activeServiceContext.setNodesListManager(nodesListManager);
}
void setApplicationMasterService(
ApplicationMasterService applicationMasterService) {
this.applicationMasterService = applicationMasterService;
activeServiceContext.setApplicationMasterService(applicationMasterService);
}
void setResourceTrackerService(
ResourceTrackerService resourceTrackerService) {
this.resourceTrackerService = resourceTrackerService;
void setResourceTrackerService(ResourceTrackerService resourceTrackerService) {
activeServiceContext.setResourceTrackerService(resourceTrackerService);
}
@Override
@ -363,34 +309,35 @@ public class RMContextImpl implements RMContext {
}
public void setWorkPreservingRecoveryEnabled(boolean enabled) {
this.isWorkPreservingRecoveryEnabled = enabled;
activeServiceContext.setWorkPreservingRecoveryEnabled(enabled);
}
@Override
public boolean isWorkPreservingRecoveryEnabled() {
return this.isWorkPreservingRecoveryEnabled;
return activeServiceContext.isWorkPreservingRecoveryEnabled();
}
@Override
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
return rmApplicationHistoryWriter;
return activeServiceContext.getRMApplicationHistoryWriter();
}
@Override
public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) {
this.systemMetricsPublisher = systemMetricsPublisher;
activeServiceContext.setSystemMetricsPublisher(systemMetricsPublisher);
}
@Override
public SystemMetricsPublisher getSystemMetricsPublisher() {
return systemMetricsPublisher;
return activeServiceContext.getSystemMetricsPublisher();
}
@Override
public void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
activeServiceContext
.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
}
@Override
@ -405,51 +352,51 @@ public class RMContextImpl implements RMContext {
@Override
public long getEpoch() {
return this.epoch;
return activeServiceContext.getEpoch();
}
void setEpoch(long epoch) {
this.epoch = epoch;
activeServiceContext.setEpoch(epoch);
}
@Override
public RMNodeLabelsManager getNodeLabelManager() {
return nodeLabelManager;
return activeServiceContext.getNodeLabelManager();
}
@Override
public void setNodeLabelManager(RMNodeLabelsManager mgr) {
nodeLabelManager = mgr;
activeServiceContext.setNodeLabelManager(mgr);
}
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
this.schedulerRecoveryStartTime = systemClock.getTime();
this.schedulerRecoveryWaitTime = waitTime;
activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime);
}
public boolean isSchedulerReadyForAllocatingContainers() {
if (isSchedulerReady) {
return isSchedulerReady;
}
isSchedulerReady = (systemClock.getTime() - schedulerRecoveryStartTime)
> schedulerRecoveryWaitTime;
if (!isSchedulerReady && printLog) {
LOG.info("Skip allocating containers. Scheduler is waiting for recovery.");
printLog = false;
}
if (isSchedulerReady) {
LOG.info("Scheduler recovery is done. Start allocating new containers.");
}
return isSchedulerReady;
return activeServiceContext.isSchedulerReadyForAllocatingContainers();
}
@Private
@VisibleForTesting
public void setSystemClock(Clock clock) {
this.systemClock = clock;
activeServiceContext.setSystemClock(clock);
}
public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
return systemCredentials;
return activeServiceContext.getSystemCredentialsForApps();
}
@Private
@Unstable
public RMActiveServiceContext getActiveServiceContext() {
return activeServiceContext;
}
@Private
@Unstable
void setActiveServiceContext(RMActiveServiceContext activeServiceContext) {
this.activeServiceContext = activeServiceContext;
}
}

View File

@ -400,6 +400,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
private ContainerAllocationExpirer containerAllocationExpirer;
private ResourceManager rm;
private boolean recoveryEnabled;
private RMActiveServiceContext activeServiceContext;
RMActiveServices(ResourceManager rm) {
super("RMActiveServices");
@ -408,6 +409,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override
protected void serviceInit(Configuration configuration) throws Exception {
activeServiceContext = new RMActiveServiceContext();
rmContext.setActiveServiceContext(activeServiceContext);
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
rmSecretManagerService = createRMSecretManagerService();
@ -1008,11 +1012,15 @@ public class ResourceManager extends CompositeService implements Recoverable {
if (activeServices != null) {
activeServices.stop();
activeServices = null;
rmContext.getRMNodes().clear();
rmContext.getInactiveRMNodes().clear();
rmContext.getRMApps().clear();
ClusterMetrics.destroy();
QueueMetrics.clearQueueMetrics();
}
}
void reinitialize(boolean initialize) throws Exception {
ClusterMetrics.destroy();
QueueMetrics.clearQueueMetrics();
if (initialize) {
resetDispatcher();
createAndInitActiveServices();
}
}
@ -1036,8 +1044,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
startActiveServices();
return null;
} catch (Exception e) {
resetDispatcher();
createAndInitActiveServices();
reinitialize(true);
throw e;
}
}
@ -1059,10 +1066,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
if (rmContext.getHAServiceState() ==
HAServiceProtocol.HAServiceState.ACTIVE) {
stopActiveServices();
if (initialize) {
resetDispatcher();
createAndInitActiveServices();
}
reinitialize(initialize);
}
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
LOG.info("Transitioned to standby state");

View File

@ -513,6 +513,68 @@ public class TestRMHA {
rm.stop();
}
@Test
public void testFailoverClearsRMContext() throws Exception {
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
Configuration conf = new YarnConfiguration(configuration);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
// 1. start RM
rm = new MockRM(conf, memStore);
rm.init(conf);
rm.start();
StateChangeRequestInfo requestInfo =
new StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
checkMonitorHealth();
checkStandbyRMFunctionality();
// 2. Transition to active
rm.adminService.transitionToActive(requestInfo);
checkMonitorHealth();
checkActiveRMFunctionality();
verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
assertEquals(1, rm.getRMContext().getRMNodes().size());
assertEquals(1, rm.getRMContext().getRMApps().size());
// 3. Create new RM
rm = new MockRM(conf, memStore) {
@Override
protected ResourceTrackerService createResourceTrackerService() {
return new ResourceTrackerService(this.rmContext,
this.nodesListManager, this.nmLivelinessMonitor,
this.rmContext.getContainerTokenSecretManager(),
this.rmContext.getNMTokenSecretManager()) {
@Override
protected void serviceStart() throws Exception {
throw new Exception("ResourceTracker service failed");
}
};
}
};
rm.init(conf);
rm.start();
checkMonitorHealth();
checkStandbyRMFunctionality();
// 4. Try Transition to active, throw exception
try {
rm.adminService.transitionToActive(requestInfo);
Assert.fail("Transitioned to Active should throw exception.");
} catch (Exception e) {
assertTrue("Error when transitioning to Active mode".contains(e
.getMessage()));
}
// 5. Clears the metrics
verifyClusterMetrics(0, 0, 0, 0, 0, 0);
assertEquals(0, rm.getRMContext().getRMNodes().size());
assertEquals(0, rm.getRMContext().getRMApps().size());
}
public void innerTestHAWithRMHostName(boolean includeBindHost) {
//this is run two times, with and without a bind host configured
if (includeBindHost) {