YARN-2865. Fixed RM to always create a new RMContext when transtions from StandBy to Active. Contributed by Rohith Sharmaks

(cherry picked from commit 9cb8b75ba5)
This commit is contained in:
Jian He 2014-11-19 19:48:33 -08:00
parent 988682ac29
commit db31ef7e7f
5 changed files with 630 additions and 159 deletions

View File

@ -88,6 +88,9 @@ Release 2.7.0 - UNRELEASED
YARN-2878. Fix DockerContainerExecutor.apt.vm formatting. (Abin Shahab via YARN-2878. Fix DockerContainerExecutor.apt.vm formatting. (Abin Shahab via
jianhe) jianhe)
YARN-2865. Fixed RM to always create a new RMContext when transtions from
StandBy to Active. (Rohith Sharmaks via jianhe)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -0,0 +1,455 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* The RMActiveServiceContext is the class that maintains all the
* RMActiveService contexts.This is expected to be used only by ResourceManager
* and RMContext.
*/
@Private
@Unstable
public class RMActiveServiceContext {
private static final Log LOG = LogFactory
.getLog(RMActiveServiceContext.class);
private final ConcurrentMap<ApplicationId, RMApp> applications =
new ConcurrentHashMap<ApplicationId, RMApp>();
private final ConcurrentMap<NodeId, RMNode> nodes =
new ConcurrentHashMap<NodeId, RMNode>();
private final ConcurrentMap<String, RMNode> inactiveNodes =
new ConcurrentHashMap<String, RMNode>();
private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
new ConcurrentHashMap<ApplicationId, ByteBuffer>();
private boolean isWorkPreservingRecoveryEnabled;
private AMLivelinessMonitor amLivelinessMonitor;
private AMLivelinessMonitor amFinishingMonitor;
private RMStateStore stateStore = null;
private ContainerAllocationExpirer containerAllocationExpirer;
private DelegationTokenRenewer delegationTokenRenewer;
private AMRMTokenSecretManager amRMTokenSecretManager;
private RMContainerTokenSecretManager containerTokenSecretManager;
private NMTokenSecretManagerInRM nmTokenSecretManager;
private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
private ClientRMService clientRMService;
private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
private ResourceScheduler scheduler;
private ReservationSystem reservationSystem;
private NodesListManager nodesListManager;
private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
private RMNodeLabelsManager nodeLabelManager;
private long epoch;
private Clock systemClock = new SystemClock();
private long schedulerRecoveryStartTime = 0;
private long schedulerRecoveryWaitTime = 0;
private boolean printLog = true;
private boolean isSchedulerReady = false;
public RMActiveServiceContext() {
}
@Private
@Unstable
public RMActiveServiceContext(Dispatcher rmDispatcher,
ContainerAllocationExpirer containerAllocationExpirer,
AMLivelinessMonitor amLivelinessMonitor,
AMLivelinessMonitor amFinishingMonitor,
DelegationTokenRenewer delegationTokenRenewer,
AMRMTokenSecretManager appTokenSecretManager,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
this();
this.setContainerAllocationExpirer(containerAllocationExpirer);
this.setAMLivelinessMonitor(amLivelinessMonitor);
this.setAMFinishingMonitor(amFinishingMonitor);
this.setDelegationTokenRenewer(delegationTokenRenewer);
this.setAMRMTokenSecretManager(appTokenSecretManager);
this.setContainerTokenSecretManager(containerTokenSecretManager);
this.setNMTokenSecretManager(nmTokenSecretManager);
this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
RMStateStore nullStore = new NullRMStateStore();
nullStore.setRMDispatcher(rmDispatcher);
try {
nullStore.init(new YarnConfiguration());
setStateStore(nullStore);
} catch (Exception e) {
assert false;
}
}
@Private
@Unstable
public void setStateStore(RMStateStore store) {
stateStore = store;
}
@Private
@Unstable
public ClientRMService getClientRMService() {
return clientRMService;
}
@Private
@Unstable
public ApplicationMasterService getApplicationMasterService() {
return applicationMasterService;
}
@Private
@Unstable
public ResourceTrackerService getResourceTrackerService() {
return resourceTrackerService;
}
@Private
@Unstable
public RMStateStore getStateStore() {
return stateStore;
}
@Private
@Unstable
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
return this.applications;
}
@Private
@Unstable
public ConcurrentMap<NodeId, RMNode> getRMNodes() {
return this.nodes;
}
@Private
@Unstable
public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
return this.inactiveNodes;
}
@Private
@Unstable
public ContainerAllocationExpirer getContainerAllocationExpirer() {
return this.containerAllocationExpirer;
}
@Private
@Unstable
public AMLivelinessMonitor getAMLivelinessMonitor() {
return this.amLivelinessMonitor;
}
@Private
@Unstable
public AMLivelinessMonitor getAMFinishingMonitor() {
return this.amFinishingMonitor;
}
@Private
@Unstable
public DelegationTokenRenewer getDelegationTokenRenewer() {
return delegationTokenRenewer;
}
@Private
@Unstable
public AMRMTokenSecretManager getAMRMTokenSecretManager() {
return this.amRMTokenSecretManager;
}
@Private
@Unstable
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
return this.containerTokenSecretManager;
}
@Private
@Unstable
public NMTokenSecretManagerInRM getNMTokenSecretManager() {
return this.nmTokenSecretManager;
}
@Private
@Unstable
public ResourceScheduler getScheduler() {
return this.scheduler;
}
@Private
@Unstable
public ReservationSystem getReservationSystem() {
return this.reservationSystem;
}
@Private
@Unstable
public NodesListManager getNodesListManager() {
return this.nodesListManager;
}
@Private
@Unstable
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
return this.clientToAMTokenSecretManager;
}
@Private
@Unstable
public void setClientRMService(ClientRMService clientRMService) {
this.clientRMService = clientRMService;
}
@Private
@Unstable
public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() {
return this.rmDelegationTokenSecretManager;
}
@Private
@Unstable
public void setRMDelegationTokenSecretManager(
RMDelegationTokenSecretManager delegationTokenSecretManager) {
this.rmDelegationTokenSecretManager = delegationTokenSecretManager;
}
@Private
@Unstable
void setContainerAllocationExpirer(
ContainerAllocationExpirer containerAllocationExpirer) {
this.containerAllocationExpirer = containerAllocationExpirer;
}
@Private
@Unstable
void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) {
this.amLivelinessMonitor = amLivelinessMonitor;
}
@Private
@Unstable
void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) {
this.amFinishingMonitor = amFinishingMonitor;
}
@Private
@Unstable
void setContainerTokenSecretManager(
RMContainerTokenSecretManager containerTokenSecretManager) {
this.containerTokenSecretManager = containerTokenSecretManager;
}
@Private
@Unstable
void setNMTokenSecretManager(NMTokenSecretManagerInRM nmTokenSecretManager) {
this.nmTokenSecretManager = nmTokenSecretManager;
}
@Private
@Unstable
void setScheduler(ResourceScheduler scheduler) {
this.scheduler = scheduler;
}
@Private
@Unstable
void setReservationSystem(ReservationSystem reservationSystem) {
this.reservationSystem = reservationSystem;
}
@Private
@Unstable
void setDelegationTokenRenewer(DelegationTokenRenewer delegationTokenRenewer) {
this.delegationTokenRenewer = delegationTokenRenewer;
}
@Private
@Unstable
void setClientToAMTokenSecretManager(
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
}
@Private
@Unstable
void setAMRMTokenSecretManager(AMRMTokenSecretManager amRMTokenSecretManager) {
this.amRMTokenSecretManager = amRMTokenSecretManager;
}
@Private
@Unstable
void setNodesListManager(NodesListManager nodesListManager) {
this.nodesListManager = nodesListManager;
}
@Private
@Unstable
void setApplicationMasterService(
ApplicationMasterService applicationMasterService) {
this.applicationMasterService = applicationMasterService;
}
@Private
@Unstable
void setResourceTrackerService(ResourceTrackerService resourceTrackerService) {
this.resourceTrackerService = resourceTrackerService;
}
@Private
@Unstable
public void setWorkPreservingRecoveryEnabled(boolean enabled) {
this.isWorkPreservingRecoveryEnabled = enabled;
}
@Private
@Unstable
public boolean isWorkPreservingRecoveryEnabled() {
return this.isWorkPreservingRecoveryEnabled;
}
@Private
@Unstable
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
return rmApplicationHistoryWriter;
}
@Private
@Unstable
public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) {
this.systemMetricsPublisher = systemMetricsPublisher;
}
@Private
@Unstable
public SystemMetricsPublisher getSystemMetricsPublisher() {
return systemMetricsPublisher;
}
@Private
@Unstable
public void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
}
@Private
@Unstable
public long getEpoch() {
return this.epoch;
}
@Private
@Unstable
void setEpoch(long epoch) {
this.epoch = epoch;
}
@Private
@Unstable
public RMNodeLabelsManager getNodeLabelManager() {
return nodeLabelManager;
}
@Private
@Unstable
public void setNodeLabelManager(RMNodeLabelsManager mgr) {
nodeLabelManager = mgr;
}
@Private
@Unstable
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
this.schedulerRecoveryStartTime = systemClock.getTime();
this.schedulerRecoveryWaitTime = waitTime;
}
@Private
@Unstable
public boolean isSchedulerReadyForAllocatingContainers() {
if (isSchedulerReady) {
return isSchedulerReady;
}
isSchedulerReady =
(systemClock.getTime() - schedulerRecoveryStartTime) > schedulerRecoveryWaitTime;
if (!isSchedulerReady && printLog) {
LOG.info("Skip allocating containers. Scheduler is waiting for recovery.");
printLog = false;
}
if (isSchedulerReady) {
LOG.info("Scheduler recovery is done. Start allocating new containers.");
}
return isSchedulerReady;
}
@Private
@Unstable
public void setSystemClock(Clock clock) {
this.systemClock = clock;
}
@Private
@Unstable
public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
return systemCredentials;
}
}

View File

@ -19,24 +19,20 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -51,7 +47,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManag
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -59,52 +54,16 @@ public class RMContextImpl implements RMContext {
private Dispatcher rmDispatcher; private Dispatcher rmDispatcher;
private final ConcurrentMap<ApplicationId, RMApp> applications
= new ConcurrentHashMap<ApplicationId, RMApp>();
private final ConcurrentMap<NodeId, RMNode> nodes
= new ConcurrentHashMap<NodeId, RMNode>();
private final ConcurrentMap<String, RMNode> inactiveNodes
= new ConcurrentHashMap<String, RMNode>();
private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
new ConcurrentHashMap<ApplicationId, ByteBuffer>();
private boolean isHAEnabled; private boolean isHAEnabled;
private boolean isWorkPreservingRecoveryEnabled;
private HAServiceState haServiceState = private HAServiceState haServiceState =
HAServiceProtocol.HAServiceState.INITIALIZING; HAServiceProtocol.HAServiceState.INITIALIZING;
private AMLivelinessMonitor amLivelinessMonitor;
private AMLivelinessMonitor amFinishingMonitor;
private RMStateStore stateStore = null;
private ContainerAllocationExpirer containerAllocationExpirer;
private DelegationTokenRenewer delegationTokenRenewer;
private AMRMTokenSecretManager amRMTokenSecretManager;
private RMContainerTokenSecretManager containerTokenSecretManager;
private NMTokenSecretManagerInRM nmTokenSecretManager;
private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
private AdminService adminService; private AdminService adminService;
private ClientRMService clientRMService;
private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
private ResourceScheduler scheduler;
private ReservationSystem reservationSystem;
private NodesListManager nodesListManager;
private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
private ConfigurationProvider configurationProvider;
private RMNodeLabelsManager nodeLabelManager;
private long epoch;
private Clock systemClock = new SystemClock();
private long schedulerRecoveryStartTime = 0;
private long schedulerRecoveryWaitTime = 0;
private boolean printLog = true;
private boolean isSchedulerReady = false;
private static final Log LOG = LogFactory.getLog(RMContextImpl.class); private ConfigurationProvider configurationProvider;
private RMActiveServiceContext activeServiceContext;
/** /**
* Default constructor. To be used in conjunction with setter methods for * Default constructor. To be used in conjunction with setter methods for
@ -128,24 +87,11 @@ public class RMContextImpl implements RMContext {
RMApplicationHistoryWriter rmApplicationHistoryWriter) { RMApplicationHistoryWriter rmApplicationHistoryWriter) {
this(); this();
this.setDispatcher(rmDispatcher); this.setDispatcher(rmDispatcher);
this.setContainerAllocationExpirer(containerAllocationExpirer); setActiveServiceContext(new RMActiveServiceContext(rmDispatcher,
this.setAMLivelinessMonitor(amLivelinessMonitor); containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
this.setAMFinishingMonitor(amFinishingMonitor); delegationTokenRenewer, appTokenSecretManager,
this.setDelegationTokenRenewer(delegationTokenRenewer); containerTokenSecretManager, nmTokenSecretManager,
this.setAMRMTokenSecretManager(appTokenSecretManager); clientToAMTokenSecretManager, rmApplicationHistoryWriter));
this.setContainerTokenSecretManager(containerTokenSecretManager);
this.setNMTokenSecretManager(nmTokenSecretManager);
this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
RMStateStore nullStore = new NullRMStateStore();
nullStore.setRMDispatcher(rmDispatcher);
try {
nullStore.init(new YarnConfiguration());
setStateStore(nullStore);
} catch (Exception e) {
assert false;
}
ConfigurationProvider provider = new LocalConfigurationProvider(); ConfigurationProvider provider = new LocalConfigurationProvider();
setConfigurationProvider(provider); setConfigurationProvider(provider);
@ -158,77 +104,77 @@ public class RMContextImpl implements RMContext {
@Override @Override
public RMStateStore getStateStore() { public RMStateStore getStateStore() {
return stateStore; return activeServiceContext.getStateStore();
} }
@Override @Override
public ConcurrentMap<ApplicationId, RMApp> getRMApps() { public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
return this.applications; return activeServiceContext.getRMApps();
} }
@Override @Override
public ConcurrentMap<NodeId, RMNode> getRMNodes() { public ConcurrentMap<NodeId, RMNode> getRMNodes() {
return this.nodes; return activeServiceContext.getRMNodes();
} }
@Override @Override
public ConcurrentMap<String, RMNode> getInactiveRMNodes() { public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
return this.inactiveNodes; return activeServiceContext.getInactiveRMNodes();
} }
@Override @Override
public ContainerAllocationExpirer getContainerAllocationExpirer() { public ContainerAllocationExpirer getContainerAllocationExpirer() {
return this.containerAllocationExpirer; return activeServiceContext.getContainerAllocationExpirer();
} }
@Override @Override
public AMLivelinessMonitor getAMLivelinessMonitor() { public AMLivelinessMonitor getAMLivelinessMonitor() {
return this.amLivelinessMonitor; return activeServiceContext.getAMLivelinessMonitor();
} }
@Override @Override
public AMLivelinessMonitor getAMFinishingMonitor() { public AMLivelinessMonitor getAMFinishingMonitor() {
return this.amFinishingMonitor; return activeServiceContext.getAMFinishingMonitor();
} }
@Override @Override
public DelegationTokenRenewer getDelegationTokenRenewer() { public DelegationTokenRenewer getDelegationTokenRenewer() {
return delegationTokenRenewer; return activeServiceContext.getDelegationTokenRenewer();
} }
@Override @Override
public AMRMTokenSecretManager getAMRMTokenSecretManager() { public AMRMTokenSecretManager getAMRMTokenSecretManager() {
return this.amRMTokenSecretManager; return activeServiceContext.getAMRMTokenSecretManager();
} }
@Override @Override
public RMContainerTokenSecretManager getContainerTokenSecretManager() { public RMContainerTokenSecretManager getContainerTokenSecretManager() {
return this.containerTokenSecretManager; return activeServiceContext.getContainerTokenSecretManager();
} }
@Override @Override
public NMTokenSecretManagerInRM getNMTokenSecretManager() { public NMTokenSecretManagerInRM getNMTokenSecretManager() {
return this.nmTokenSecretManager; return activeServiceContext.getNMTokenSecretManager();
} }
@Override @Override
public ResourceScheduler getScheduler() { public ResourceScheduler getScheduler() {
return this.scheduler; return activeServiceContext.getScheduler();
} }
@Override @Override
public ReservationSystem getReservationSystem() { public ReservationSystem getReservationSystem() {
return this.reservationSystem; return activeServiceContext.getReservationSystem();
} }
@Override @Override
public NodesListManager getNodesListManager() { public NodesListManager getNodesListManager() {
return this.nodesListManager; return activeServiceContext.getNodesListManager();
} }
@Override @Override
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() { public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
return this.clientToAMTokenSecretManager; return activeServiceContext.getClientToAMTokenSecretManager();
} }
@Override @Override
@ -238,22 +184,22 @@ public class RMContextImpl implements RMContext {
@VisibleForTesting @VisibleForTesting
public void setStateStore(RMStateStore store) { public void setStateStore(RMStateStore store) {
stateStore = store; activeServiceContext.setStateStore(store);
} }
@Override @Override
public ClientRMService getClientRMService() { public ClientRMService getClientRMService() {
return this.clientRMService; return activeServiceContext.getClientRMService();
} }
@Override @Override
public ApplicationMasterService getApplicationMasterService() { public ApplicationMasterService getApplicationMasterService() {
return applicationMasterService; return activeServiceContext.getApplicationMasterService();
} }
@Override @Override
public ResourceTrackerService getResourceTrackerService() { public ResourceTrackerService getResourceTrackerService() {
return resourceTrackerService; return activeServiceContext.getResourceTrackerService();
} }
void setHAEnabled(boolean isHAEnabled) { void setHAEnabled(boolean isHAEnabled) {
@ -276,78 +222,78 @@ public class RMContextImpl implements RMContext {
@Override @Override
public void setClientRMService(ClientRMService clientRMService) { public void setClientRMService(ClientRMService clientRMService) {
this.clientRMService = clientRMService; activeServiceContext.setClientRMService(clientRMService);
} }
@Override @Override
public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() { public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() {
return this.rmDelegationTokenSecretManager; return activeServiceContext.getRMDelegationTokenSecretManager();
} }
@Override @Override
public void setRMDelegationTokenSecretManager( public void setRMDelegationTokenSecretManager(
RMDelegationTokenSecretManager delegationTokenSecretManager) { RMDelegationTokenSecretManager delegationTokenSecretManager) {
this.rmDelegationTokenSecretManager = delegationTokenSecretManager; activeServiceContext
.setRMDelegationTokenSecretManager(delegationTokenSecretManager);
} }
void setContainerAllocationExpirer( void setContainerAllocationExpirer(
ContainerAllocationExpirer containerAllocationExpirer) { ContainerAllocationExpirer containerAllocationExpirer) {
this.containerAllocationExpirer = containerAllocationExpirer; activeServiceContext
.setContainerAllocationExpirer(containerAllocationExpirer);
} }
void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) { void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) {
this.amLivelinessMonitor = amLivelinessMonitor; activeServiceContext.setAMLivelinessMonitor(amLivelinessMonitor);
} }
void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) { void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) {
this.amFinishingMonitor = amFinishingMonitor; activeServiceContext.setAMFinishingMonitor(amFinishingMonitor);
} }
void setContainerTokenSecretManager( void setContainerTokenSecretManager(
RMContainerTokenSecretManager containerTokenSecretManager) { RMContainerTokenSecretManager containerTokenSecretManager) {
this.containerTokenSecretManager = containerTokenSecretManager; activeServiceContext
.setContainerTokenSecretManager(containerTokenSecretManager);
} }
void setNMTokenSecretManager( void setNMTokenSecretManager(NMTokenSecretManagerInRM nmTokenSecretManager) {
NMTokenSecretManagerInRM nmTokenSecretManager) { activeServiceContext.setNMTokenSecretManager(nmTokenSecretManager);
this.nmTokenSecretManager = nmTokenSecretManager;
} }
void setScheduler(ResourceScheduler scheduler) { void setScheduler(ResourceScheduler scheduler) {
this.scheduler = scheduler; activeServiceContext.setScheduler(scheduler);
} }
void setReservationSystem(ReservationSystem reservationSystem) { void setReservationSystem(ReservationSystem reservationSystem) {
this.reservationSystem = reservationSystem; activeServiceContext.setReservationSystem(reservationSystem);
} }
void setDelegationTokenRenewer( void setDelegationTokenRenewer(DelegationTokenRenewer delegationTokenRenewer) {
DelegationTokenRenewer delegationTokenRenewer) { activeServiceContext.setDelegationTokenRenewer(delegationTokenRenewer);
this.delegationTokenRenewer = delegationTokenRenewer;
} }
void setClientToAMTokenSecretManager( void setClientToAMTokenSecretManager(
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) { ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
this.clientToAMTokenSecretManager = clientToAMTokenSecretManager; activeServiceContext
.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
} }
void setAMRMTokenSecretManager( void setAMRMTokenSecretManager(AMRMTokenSecretManager amRMTokenSecretManager) {
AMRMTokenSecretManager amRMTokenSecretManager) { activeServiceContext.setAMRMTokenSecretManager(amRMTokenSecretManager);
this.amRMTokenSecretManager = amRMTokenSecretManager;
} }
void setNodesListManager(NodesListManager nodesListManager) { void setNodesListManager(NodesListManager nodesListManager) {
this.nodesListManager = nodesListManager; activeServiceContext.setNodesListManager(nodesListManager);
} }
void setApplicationMasterService( void setApplicationMasterService(
ApplicationMasterService applicationMasterService) { ApplicationMasterService applicationMasterService) {
this.applicationMasterService = applicationMasterService; activeServiceContext.setApplicationMasterService(applicationMasterService);
} }
void setResourceTrackerService( void setResourceTrackerService(ResourceTrackerService resourceTrackerService) {
ResourceTrackerService resourceTrackerService) { activeServiceContext.setResourceTrackerService(resourceTrackerService);
this.resourceTrackerService = resourceTrackerService;
} }
@Override @Override
@ -363,34 +309,35 @@ public class RMContextImpl implements RMContext {
} }
public void setWorkPreservingRecoveryEnabled(boolean enabled) { public void setWorkPreservingRecoveryEnabled(boolean enabled) {
this.isWorkPreservingRecoveryEnabled = enabled; activeServiceContext.setWorkPreservingRecoveryEnabled(enabled);
} }
@Override @Override
public boolean isWorkPreservingRecoveryEnabled() { public boolean isWorkPreservingRecoveryEnabled() {
return this.isWorkPreservingRecoveryEnabled; return activeServiceContext.isWorkPreservingRecoveryEnabled();
} }
@Override @Override
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
return rmApplicationHistoryWriter; return activeServiceContext.getRMApplicationHistoryWriter();
} }
@Override @Override
public void setSystemMetricsPublisher( public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) { SystemMetricsPublisher systemMetricsPublisher) {
this.systemMetricsPublisher = systemMetricsPublisher; activeServiceContext.setSystemMetricsPublisher(systemMetricsPublisher);
} }
@Override @Override
public SystemMetricsPublisher getSystemMetricsPublisher() { public SystemMetricsPublisher getSystemMetricsPublisher() {
return systemMetricsPublisher; return activeServiceContext.getSystemMetricsPublisher();
} }
@Override @Override
public void setRMApplicationHistoryWriter( public void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter) { RMApplicationHistoryWriter rmApplicationHistoryWriter) {
this.rmApplicationHistoryWriter = rmApplicationHistoryWriter; activeServiceContext
.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
} }
@Override @Override
@ -405,51 +352,51 @@ public class RMContextImpl implements RMContext {
@Override @Override
public long getEpoch() { public long getEpoch() {
return this.epoch; return activeServiceContext.getEpoch();
} }
void setEpoch(long epoch) { void setEpoch(long epoch) {
this.epoch = epoch; activeServiceContext.setEpoch(epoch);
} }
@Override @Override
public RMNodeLabelsManager getNodeLabelManager() { public RMNodeLabelsManager getNodeLabelManager() {
return nodeLabelManager; return activeServiceContext.getNodeLabelManager();
} }
@Override @Override
public void setNodeLabelManager(RMNodeLabelsManager mgr) { public void setNodeLabelManager(RMNodeLabelsManager mgr) {
nodeLabelManager = mgr; activeServiceContext.setNodeLabelManager(mgr);
} }
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
this.schedulerRecoveryStartTime = systemClock.getTime(); activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime);
this.schedulerRecoveryWaitTime = waitTime;
} }
public boolean isSchedulerReadyForAllocatingContainers() { public boolean isSchedulerReadyForAllocatingContainers() {
if (isSchedulerReady) { return activeServiceContext.isSchedulerReadyForAllocatingContainers();
return isSchedulerReady;
}
isSchedulerReady = (systemClock.getTime() - schedulerRecoveryStartTime)
> schedulerRecoveryWaitTime;
if (!isSchedulerReady && printLog) {
LOG.info("Skip allocating containers. Scheduler is waiting for recovery.");
printLog = false;
}
if (isSchedulerReady) {
LOG.info("Scheduler recovery is done. Start allocating new containers.");
}
return isSchedulerReady;
} }
@Private @Private
@VisibleForTesting @VisibleForTesting
public void setSystemClock(Clock clock) { public void setSystemClock(Clock clock) {
this.systemClock = clock; activeServiceContext.setSystemClock(clock);
} }
public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() { public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
return systemCredentials; return activeServiceContext.getSystemCredentialsForApps();
} }
@Private
@Unstable
public RMActiveServiceContext getActiveServiceContext() {
return activeServiceContext;
}
@Private
@Unstable
void setActiveServiceContext(RMActiveServiceContext activeServiceContext) {
this.activeServiceContext = activeServiceContext;
}
} }

View File

@ -400,6 +400,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
private ContainerAllocationExpirer containerAllocationExpirer; private ContainerAllocationExpirer containerAllocationExpirer;
private ResourceManager rm; private ResourceManager rm;
private boolean recoveryEnabled; private boolean recoveryEnabled;
private RMActiveServiceContext activeServiceContext;
RMActiveServices(ResourceManager rm) { RMActiveServices(ResourceManager rm) {
super("RMActiveServices"); super("RMActiveServices");
@ -408,6 +409,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override @Override
protected void serviceInit(Configuration configuration) throws Exception { protected void serviceInit(Configuration configuration) throws Exception {
activeServiceContext = new RMActiveServiceContext();
rmContext.setActiveServiceContext(activeServiceContext);
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
rmSecretManagerService = createRMSecretManagerService(); rmSecretManagerService = createRMSecretManagerService();
@ -1008,11 +1012,15 @@ public class ResourceManager extends CompositeService implements Recoverable {
if (activeServices != null) { if (activeServices != null) {
activeServices.stop(); activeServices.stop();
activeServices = null; activeServices = null;
rmContext.getRMNodes().clear(); }
rmContext.getInactiveRMNodes().clear(); }
rmContext.getRMApps().clear();
void reinitialize(boolean initialize) throws Exception {
ClusterMetrics.destroy(); ClusterMetrics.destroy();
QueueMetrics.clearQueueMetrics(); QueueMetrics.clearQueueMetrics();
if (initialize) {
resetDispatcher();
createAndInitActiveServices();
} }
} }
@ -1036,8 +1044,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
startActiveServices(); startActiveServices();
return null; return null;
} catch (Exception e) { } catch (Exception e) {
resetDispatcher(); reinitialize(true);
createAndInitActiveServices();
throw e; throw e;
} }
} }
@ -1059,10 +1066,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
if (rmContext.getHAServiceState() == if (rmContext.getHAServiceState() ==
HAServiceProtocol.HAServiceState.ACTIVE) { HAServiceProtocol.HAServiceState.ACTIVE) {
stopActiveServices(); stopActiveServices();
if (initialize) { reinitialize(initialize);
resetDispatcher();
createAndInitActiveServices();
}
} }
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY); rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
LOG.info("Transitioned to standby state"); LOG.info("Transitioned to standby state");

View File

@ -513,6 +513,68 @@ public class TestRMHA {
rm.stop(); rm.stop();
} }
@Test
public void testFailoverClearsRMContext() throws Exception {
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
Configuration conf = new YarnConfiguration(configuration);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
// 1. start RM
rm = new MockRM(conf, memStore);
rm.init(conf);
rm.start();
StateChangeRequestInfo requestInfo =
new StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
checkMonitorHealth();
checkStandbyRMFunctionality();
// 2. Transition to active
rm.adminService.transitionToActive(requestInfo);
checkMonitorHealth();
checkActiveRMFunctionality();
verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
assertEquals(1, rm.getRMContext().getRMNodes().size());
assertEquals(1, rm.getRMContext().getRMApps().size());
// 3. Create new RM
rm = new MockRM(conf, memStore) {
@Override
protected ResourceTrackerService createResourceTrackerService() {
return new ResourceTrackerService(this.rmContext,
this.nodesListManager, this.nmLivelinessMonitor,
this.rmContext.getContainerTokenSecretManager(),
this.rmContext.getNMTokenSecretManager()) {
@Override
protected void serviceStart() throws Exception {
throw new Exception("ResourceTracker service failed");
}
};
}
};
rm.init(conf);
rm.start();
checkMonitorHealth();
checkStandbyRMFunctionality();
// 4. Try Transition to active, throw exception
try {
rm.adminService.transitionToActive(requestInfo);
Assert.fail("Transitioned to Active should throw exception.");
} catch (Exception e) {
assertTrue("Error when transitioning to Active mode".contains(e
.getMessage()));
}
// 5. Clears the metrics
verifyClusterMetrics(0, 0, 0, 0, 0, 0);
assertEquals(0, rm.getRMContext().getRMNodes().size());
assertEquals(0, rm.getRMContext().getRMApps().size());
}
public void innerTestHAWithRMHostName(boolean includeBindHost) { public void innerTestHAWithRMHostName(boolean includeBindHost) {
//this is run two times, with and without a bind host configured //this is run two times, with and without a bind host configured
if (includeBindHost) { if (includeBindHost) {