YARN-2865. Fixed RM to always create a new RMContext when transtions from StandBy to Active. Contributed by Rohith Sharmaks
This commit is contained in:
parent
765aecb4e1
commit
9cb8b75ba5
|
@ -115,6 +115,9 @@ Release 2.7.0 - UNRELEASED
|
|||
YARN-2878. Fix DockerContainerExecutor.apt.vm formatting. (Abin Shahab via
|
||||
jianhe)
|
||||
|
||||
YARN-2865. Fixed RM to always create a new RMContext when transtions from
|
||||
StandBy to Active. (Rohith Sharmaks via jianhe)
|
||||
|
||||
Release 2.6.0 - 2014-11-18
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -0,0 +1,455 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
|
||||
/**
|
||||
* The RMActiveServiceContext is the class that maintains all the
|
||||
* RMActiveService contexts.This is expected to be used only by ResourceManager
|
||||
* and RMContext.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public class RMActiveServiceContext {
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(RMActiveServiceContext.class);
|
||||
|
||||
private final ConcurrentMap<ApplicationId, RMApp> applications =
|
||||
new ConcurrentHashMap<ApplicationId, RMApp>();
|
||||
|
||||
private final ConcurrentMap<NodeId, RMNode> nodes =
|
||||
new ConcurrentHashMap<NodeId, RMNode>();
|
||||
|
||||
private final ConcurrentMap<String, RMNode> inactiveNodes =
|
||||
new ConcurrentHashMap<String, RMNode>();
|
||||
|
||||
private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
|
||||
new ConcurrentHashMap<ApplicationId, ByteBuffer>();
|
||||
|
||||
private boolean isWorkPreservingRecoveryEnabled;
|
||||
|
||||
private AMLivelinessMonitor amLivelinessMonitor;
|
||||
private AMLivelinessMonitor amFinishingMonitor;
|
||||
private RMStateStore stateStore = null;
|
||||
private ContainerAllocationExpirer containerAllocationExpirer;
|
||||
private DelegationTokenRenewer delegationTokenRenewer;
|
||||
private AMRMTokenSecretManager amRMTokenSecretManager;
|
||||
private RMContainerTokenSecretManager containerTokenSecretManager;
|
||||
private NMTokenSecretManagerInRM nmTokenSecretManager;
|
||||
private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
|
||||
private ClientRMService clientRMService;
|
||||
private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
|
||||
private ResourceScheduler scheduler;
|
||||
private ReservationSystem reservationSystem;
|
||||
private NodesListManager nodesListManager;
|
||||
private ResourceTrackerService resourceTrackerService;
|
||||
private ApplicationMasterService applicationMasterService;
|
||||
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
|
||||
private SystemMetricsPublisher systemMetricsPublisher;
|
||||
private RMNodeLabelsManager nodeLabelManager;
|
||||
private long epoch;
|
||||
private Clock systemClock = new SystemClock();
|
||||
private long schedulerRecoveryStartTime = 0;
|
||||
private long schedulerRecoveryWaitTime = 0;
|
||||
private boolean printLog = true;
|
||||
private boolean isSchedulerReady = false;
|
||||
|
||||
public RMActiveServiceContext() {
|
||||
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public RMActiveServiceContext(Dispatcher rmDispatcher,
|
||||
ContainerAllocationExpirer containerAllocationExpirer,
|
||||
AMLivelinessMonitor amLivelinessMonitor,
|
||||
AMLivelinessMonitor amFinishingMonitor,
|
||||
DelegationTokenRenewer delegationTokenRenewer,
|
||||
AMRMTokenSecretManager appTokenSecretManager,
|
||||
RMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInRM nmTokenSecretManager,
|
||||
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
|
||||
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
|
||||
this();
|
||||
this.setContainerAllocationExpirer(containerAllocationExpirer);
|
||||
this.setAMLivelinessMonitor(amLivelinessMonitor);
|
||||
this.setAMFinishingMonitor(amFinishingMonitor);
|
||||
this.setDelegationTokenRenewer(delegationTokenRenewer);
|
||||
this.setAMRMTokenSecretManager(appTokenSecretManager);
|
||||
this.setContainerTokenSecretManager(containerTokenSecretManager);
|
||||
this.setNMTokenSecretManager(nmTokenSecretManager);
|
||||
this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
|
||||
this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
|
||||
|
||||
RMStateStore nullStore = new NullRMStateStore();
|
||||
nullStore.setRMDispatcher(rmDispatcher);
|
||||
try {
|
||||
nullStore.init(new YarnConfiguration());
|
||||
setStateStore(nullStore);
|
||||
} catch (Exception e) {
|
||||
assert false;
|
||||
}
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public void setStateStore(RMStateStore store) {
|
||||
stateStore = store;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public ClientRMService getClientRMService() {
|
||||
return clientRMService;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public ApplicationMasterService getApplicationMasterService() {
|
||||
return applicationMasterService;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public ResourceTrackerService getResourceTrackerService() {
|
||||
return resourceTrackerService;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public RMStateStore getStateStore() {
|
||||
return stateStore;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
||||
return this.applications;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public ConcurrentMap<NodeId, RMNode> getRMNodes() {
|
||||
return this.nodes;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
|
||||
return this.inactiveNodes;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public ContainerAllocationExpirer getContainerAllocationExpirer() {
|
||||
return this.containerAllocationExpirer;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public AMLivelinessMonitor getAMLivelinessMonitor() {
|
||||
return this.amLivelinessMonitor;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public AMLivelinessMonitor getAMFinishingMonitor() {
|
||||
return this.amFinishingMonitor;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public DelegationTokenRenewer getDelegationTokenRenewer() {
|
||||
return delegationTokenRenewer;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public AMRMTokenSecretManager getAMRMTokenSecretManager() {
|
||||
return this.amRMTokenSecretManager;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
|
||||
return this.containerTokenSecretManager;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public NMTokenSecretManagerInRM getNMTokenSecretManager() {
|
||||
return this.nmTokenSecretManager;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public ResourceScheduler getScheduler() {
|
||||
return this.scheduler;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public ReservationSystem getReservationSystem() {
|
||||
return this.reservationSystem;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public NodesListManager getNodesListManager() {
|
||||
return this.nodesListManager;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
|
||||
return this.clientToAMTokenSecretManager;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public void setClientRMService(ClientRMService clientRMService) {
|
||||
this.clientRMService = clientRMService;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() {
|
||||
return this.rmDelegationTokenSecretManager;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public void setRMDelegationTokenSecretManager(
|
||||
RMDelegationTokenSecretManager delegationTokenSecretManager) {
|
||||
this.rmDelegationTokenSecretManager = delegationTokenSecretManager;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setContainerAllocationExpirer(
|
||||
ContainerAllocationExpirer containerAllocationExpirer) {
|
||||
this.containerAllocationExpirer = containerAllocationExpirer;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) {
|
||||
this.amLivelinessMonitor = amLivelinessMonitor;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) {
|
||||
this.amFinishingMonitor = amFinishingMonitor;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setContainerTokenSecretManager(
|
||||
RMContainerTokenSecretManager containerTokenSecretManager) {
|
||||
this.containerTokenSecretManager = containerTokenSecretManager;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setNMTokenSecretManager(NMTokenSecretManagerInRM nmTokenSecretManager) {
|
||||
this.nmTokenSecretManager = nmTokenSecretManager;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setScheduler(ResourceScheduler scheduler) {
|
||||
this.scheduler = scheduler;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setReservationSystem(ReservationSystem reservationSystem) {
|
||||
this.reservationSystem = reservationSystem;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setDelegationTokenRenewer(DelegationTokenRenewer delegationTokenRenewer) {
|
||||
this.delegationTokenRenewer = delegationTokenRenewer;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setClientToAMTokenSecretManager(
|
||||
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
|
||||
this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setAMRMTokenSecretManager(AMRMTokenSecretManager amRMTokenSecretManager) {
|
||||
this.amRMTokenSecretManager = amRMTokenSecretManager;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setNodesListManager(NodesListManager nodesListManager) {
|
||||
this.nodesListManager = nodesListManager;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setApplicationMasterService(
|
||||
ApplicationMasterService applicationMasterService) {
|
||||
this.applicationMasterService = applicationMasterService;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setResourceTrackerService(ResourceTrackerService resourceTrackerService) {
|
||||
this.resourceTrackerService = resourceTrackerService;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public void setWorkPreservingRecoveryEnabled(boolean enabled) {
|
||||
this.isWorkPreservingRecoveryEnabled = enabled;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public boolean isWorkPreservingRecoveryEnabled() {
|
||||
return this.isWorkPreservingRecoveryEnabled;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
|
||||
return rmApplicationHistoryWriter;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public void setSystemMetricsPublisher(
|
||||
SystemMetricsPublisher systemMetricsPublisher) {
|
||||
this.systemMetricsPublisher = systemMetricsPublisher;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public SystemMetricsPublisher getSystemMetricsPublisher() {
|
||||
return systemMetricsPublisher;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public void setRMApplicationHistoryWriter(
|
||||
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
|
||||
this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public long getEpoch() {
|
||||
return this.epoch;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setEpoch(long epoch) {
|
||||
this.epoch = epoch;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public RMNodeLabelsManager getNodeLabelManager() {
|
||||
return nodeLabelManager;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public void setNodeLabelManager(RMNodeLabelsManager mgr) {
|
||||
nodeLabelManager = mgr;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
|
||||
this.schedulerRecoveryStartTime = systemClock.getTime();
|
||||
this.schedulerRecoveryWaitTime = waitTime;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public boolean isSchedulerReadyForAllocatingContainers() {
|
||||
if (isSchedulerReady) {
|
||||
return isSchedulerReady;
|
||||
}
|
||||
isSchedulerReady =
|
||||
(systemClock.getTime() - schedulerRecoveryStartTime) > schedulerRecoveryWaitTime;
|
||||
if (!isSchedulerReady && printLog) {
|
||||
LOG.info("Skip allocating containers. Scheduler is waiting for recovery.");
|
||||
printLog = false;
|
||||
}
|
||||
if (isSchedulerReady) {
|
||||
LOG.info("Scheduler recovery is done. Start allocating new containers.");
|
||||
}
|
||||
return isSchedulerReady;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public void setSystemClock(Clock clock) {
|
||||
this.systemClock = clock;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
|
||||
return systemCredentials;
|
||||
}
|
||||
}
|
|
@ -19,24 +19,20 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
|
@ -51,7 +47,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManag
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
|
@ -59,52 +54,16 @@ public class RMContextImpl implements RMContext {
|
|||
|
||||
private Dispatcher rmDispatcher;
|
||||
|
||||
private final ConcurrentMap<ApplicationId, RMApp> applications
|
||||
= new ConcurrentHashMap<ApplicationId, RMApp>();
|
||||
|
||||
private final ConcurrentMap<NodeId, RMNode> nodes
|
||||
= new ConcurrentHashMap<NodeId, RMNode>();
|
||||
|
||||
private final ConcurrentMap<String, RMNode> inactiveNodes
|
||||
= new ConcurrentHashMap<String, RMNode>();
|
||||
|
||||
private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
|
||||
new ConcurrentHashMap<ApplicationId, ByteBuffer>();
|
||||
|
||||
private boolean isHAEnabled;
|
||||
private boolean isWorkPreservingRecoveryEnabled;
|
||||
|
||||
private HAServiceState haServiceState =
|
||||
HAServiceProtocol.HAServiceState.INITIALIZING;
|
||||
|
||||
private AMLivelinessMonitor amLivelinessMonitor;
|
||||
private AMLivelinessMonitor amFinishingMonitor;
|
||||
private RMStateStore stateStore = null;
|
||||
private ContainerAllocationExpirer containerAllocationExpirer;
|
||||
private DelegationTokenRenewer delegationTokenRenewer;
|
||||
private AMRMTokenSecretManager amRMTokenSecretManager;
|
||||
private RMContainerTokenSecretManager containerTokenSecretManager;
|
||||
private NMTokenSecretManagerInRM nmTokenSecretManager;
|
||||
private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
|
||||
private AdminService adminService;
|
||||
private ClientRMService clientRMService;
|
||||
private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
|
||||
private ResourceScheduler scheduler;
|
||||
private ReservationSystem reservationSystem;
|
||||
private NodesListManager nodesListManager;
|
||||
private ResourceTrackerService resourceTrackerService;
|
||||
private ApplicationMasterService applicationMasterService;
|
||||
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
|
||||
private SystemMetricsPublisher systemMetricsPublisher;
|
||||
private ConfigurationProvider configurationProvider;
|
||||
private RMNodeLabelsManager nodeLabelManager;
|
||||
private long epoch;
|
||||
private Clock systemClock = new SystemClock();
|
||||
private long schedulerRecoveryStartTime = 0;
|
||||
private long schedulerRecoveryWaitTime = 0;
|
||||
private boolean printLog = true;
|
||||
private boolean isSchedulerReady = false;
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(RMContextImpl.class);
|
||||
private ConfigurationProvider configurationProvider;
|
||||
|
||||
private RMActiveServiceContext activeServiceContext;
|
||||
|
||||
/**
|
||||
* Default constructor. To be used in conjunction with setter methods for
|
||||
|
@ -128,24 +87,11 @@ public class RMContextImpl implements RMContext {
|
|||
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
|
||||
this();
|
||||
this.setDispatcher(rmDispatcher);
|
||||
this.setContainerAllocationExpirer(containerAllocationExpirer);
|
||||
this.setAMLivelinessMonitor(amLivelinessMonitor);
|
||||
this.setAMFinishingMonitor(amFinishingMonitor);
|
||||
this.setDelegationTokenRenewer(delegationTokenRenewer);
|
||||
this.setAMRMTokenSecretManager(appTokenSecretManager);
|
||||
this.setContainerTokenSecretManager(containerTokenSecretManager);
|
||||
this.setNMTokenSecretManager(nmTokenSecretManager);
|
||||
this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
|
||||
this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
|
||||
|
||||
RMStateStore nullStore = new NullRMStateStore();
|
||||
nullStore.setRMDispatcher(rmDispatcher);
|
||||
try {
|
||||
nullStore.init(new YarnConfiguration());
|
||||
setStateStore(nullStore);
|
||||
} catch (Exception e) {
|
||||
assert false;
|
||||
}
|
||||
setActiveServiceContext(new RMActiveServiceContext(rmDispatcher,
|
||||
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
||||
delegationTokenRenewer, appTokenSecretManager,
|
||||
containerTokenSecretManager, nmTokenSecretManager,
|
||||
clientToAMTokenSecretManager, rmApplicationHistoryWriter));
|
||||
|
||||
ConfigurationProvider provider = new LocalConfigurationProvider();
|
||||
setConfigurationProvider(provider);
|
||||
|
@ -158,77 +104,77 @@ public class RMContextImpl implements RMContext {
|
|||
|
||||
@Override
|
||||
public RMStateStore getStateStore() {
|
||||
return stateStore;
|
||||
return activeServiceContext.getStateStore();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
||||
return this.applications;
|
||||
return activeServiceContext.getRMApps();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConcurrentMap<NodeId, RMNode> getRMNodes() {
|
||||
return this.nodes;
|
||||
return activeServiceContext.getRMNodes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
|
||||
return this.inactiveNodes;
|
||||
return activeServiceContext.getInactiveRMNodes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerAllocationExpirer getContainerAllocationExpirer() {
|
||||
return this.containerAllocationExpirer;
|
||||
return activeServiceContext.getContainerAllocationExpirer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AMLivelinessMonitor getAMLivelinessMonitor() {
|
||||
return this.amLivelinessMonitor;
|
||||
return activeServiceContext.getAMLivelinessMonitor();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AMLivelinessMonitor getAMFinishingMonitor() {
|
||||
return this.amFinishingMonitor;
|
||||
return activeServiceContext.getAMFinishingMonitor();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DelegationTokenRenewer getDelegationTokenRenewer() {
|
||||
return delegationTokenRenewer;
|
||||
return activeServiceContext.getDelegationTokenRenewer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AMRMTokenSecretManager getAMRMTokenSecretManager() {
|
||||
return this.amRMTokenSecretManager;
|
||||
return activeServiceContext.getAMRMTokenSecretManager();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
|
||||
return this.containerTokenSecretManager;
|
||||
return activeServiceContext.getContainerTokenSecretManager();
|
||||
}
|
||||
|
||||
@Override
|
||||
public NMTokenSecretManagerInRM getNMTokenSecretManager() {
|
||||
return this.nmTokenSecretManager;
|
||||
return activeServiceContext.getNMTokenSecretManager();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceScheduler getScheduler() {
|
||||
return this.scheduler;
|
||||
return activeServiceContext.getScheduler();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReservationSystem getReservationSystem() {
|
||||
return this.reservationSystem;
|
||||
return activeServiceContext.getReservationSystem();
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodesListManager getNodesListManager() {
|
||||
return this.nodesListManager;
|
||||
return activeServiceContext.getNodesListManager();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
|
||||
return this.clientToAMTokenSecretManager;
|
||||
return activeServiceContext.getClientToAMTokenSecretManager();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -238,22 +184,22 @@ public class RMContextImpl implements RMContext {
|
|||
|
||||
@VisibleForTesting
|
||||
public void setStateStore(RMStateStore store) {
|
||||
stateStore = store;
|
||||
activeServiceContext.setStateStore(store);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientRMService getClientRMService() {
|
||||
return this.clientRMService;
|
||||
return activeServiceContext.getClientRMService();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationMasterService getApplicationMasterService() {
|
||||
return applicationMasterService;
|
||||
return activeServiceContext.getApplicationMasterService();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceTrackerService getResourceTrackerService() {
|
||||
return resourceTrackerService;
|
||||
return activeServiceContext.getResourceTrackerService();
|
||||
}
|
||||
|
||||
void setHAEnabled(boolean isHAEnabled) {
|
||||
|
@ -276,78 +222,78 @@ public class RMContextImpl implements RMContext {
|
|||
|
||||
@Override
|
||||
public void setClientRMService(ClientRMService clientRMService) {
|
||||
this.clientRMService = clientRMService;
|
||||
activeServiceContext.setClientRMService(clientRMService);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() {
|
||||
return this.rmDelegationTokenSecretManager;
|
||||
return activeServiceContext.getRMDelegationTokenSecretManager();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRMDelegationTokenSecretManager(
|
||||
RMDelegationTokenSecretManager delegationTokenSecretManager) {
|
||||
this.rmDelegationTokenSecretManager = delegationTokenSecretManager;
|
||||
activeServiceContext
|
||||
.setRMDelegationTokenSecretManager(delegationTokenSecretManager);
|
||||
}
|
||||
|
||||
void setContainerAllocationExpirer(
|
||||
ContainerAllocationExpirer containerAllocationExpirer) {
|
||||
this.containerAllocationExpirer = containerAllocationExpirer;
|
||||
activeServiceContext
|
||||
.setContainerAllocationExpirer(containerAllocationExpirer);
|
||||
}
|
||||
|
||||
void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) {
|
||||
this.amLivelinessMonitor = amLivelinessMonitor;
|
||||
activeServiceContext.setAMLivelinessMonitor(amLivelinessMonitor);
|
||||
}
|
||||
|
||||
void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) {
|
||||
this.amFinishingMonitor = amFinishingMonitor;
|
||||
activeServiceContext.setAMFinishingMonitor(amFinishingMonitor);
|
||||
}
|
||||
|
||||
void setContainerTokenSecretManager(
|
||||
RMContainerTokenSecretManager containerTokenSecretManager) {
|
||||
this.containerTokenSecretManager = containerTokenSecretManager;
|
||||
activeServiceContext
|
||||
.setContainerTokenSecretManager(containerTokenSecretManager);
|
||||
}
|
||||
|
||||
void setNMTokenSecretManager(
|
||||
NMTokenSecretManagerInRM nmTokenSecretManager) {
|
||||
this.nmTokenSecretManager = nmTokenSecretManager;
|
||||
void setNMTokenSecretManager(NMTokenSecretManagerInRM nmTokenSecretManager) {
|
||||
activeServiceContext.setNMTokenSecretManager(nmTokenSecretManager);
|
||||
}
|
||||
|
||||
void setScheduler(ResourceScheduler scheduler) {
|
||||
this.scheduler = scheduler;
|
||||
activeServiceContext.setScheduler(scheduler);
|
||||
}
|
||||
|
||||
void setReservationSystem(ReservationSystem reservationSystem) {
|
||||
this.reservationSystem = reservationSystem;
|
||||
activeServiceContext.setReservationSystem(reservationSystem);
|
||||
}
|
||||
|
||||
void setDelegationTokenRenewer(
|
||||
DelegationTokenRenewer delegationTokenRenewer) {
|
||||
this.delegationTokenRenewer = delegationTokenRenewer;
|
||||
void setDelegationTokenRenewer(DelegationTokenRenewer delegationTokenRenewer) {
|
||||
activeServiceContext.setDelegationTokenRenewer(delegationTokenRenewer);
|
||||
}
|
||||
|
||||
void setClientToAMTokenSecretManager(
|
||||
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
|
||||
this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
|
||||
activeServiceContext
|
||||
.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
|
||||
}
|
||||
|
||||
void setAMRMTokenSecretManager(
|
||||
AMRMTokenSecretManager amRMTokenSecretManager) {
|
||||
this.amRMTokenSecretManager = amRMTokenSecretManager;
|
||||
void setAMRMTokenSecretManager(AMRMTokenSecretManager amRMTokenSecretManager) {
|
||||
activeServiceContext.setAMRMTokenSecretManager(amRMTokenSecretManager);
|
||||
}
|
||||
|
||||
void setNodesListManager(NodesListManager nodesListManager) {
|
||||
this.nodesListManager = nodesListManager;
|
||||
activeServiceContext.setNodesListManager(nodesListManager);
|
||||
}
|
||||
|
||||
void setApplicationMasterService(
|
||||
ApplicationMasterService applicationMasterService) {
|
||||
this.applicationMasterService = applicationMasterService;
|
||||
activeServiceContext.setApplicationMasterService(applicationMasterService);
|
||||
}
|
||||
|
||||
void setResourceTrackerService(
|
||||
ResourceTrackerService resourceTrackerService) {
|
||||
this.resourceTrackerService = resourceTrackerService;
|
||||
void setResourceTrackerService(ResourceTrackerService resourceTrackerService) {
|
||||
activeServiceContext.setResourceTrackerService(resourceTrackerService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -363,34 +309,35 @@ public class RMContextImpl implements RMContext {
|
|||
}
|
||||
|
||||
public void setWorkPreservingRecoveryEnabled(boolean enabled) {
|
||||
this.isWorkPreservingRecoveryEnabled = enabled;
|
||||
activeServiceContext.setWorkPreservingRecoveryEnabled(enabled);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isWorkPreservingRecoveryEnabled() {
|
||||
return this.isWorkPreservingRecoveryEnabled;
|
||||
return activeServiceContext.isWorkPreservingRecoveryEnabled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
|
||||
return rmApplicationHistoryWriter;
|
||||
return activeServiceContext.getRMApplicationHistoryWriter();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSystemMetricsPublisher(
|
||||
SystemMetricsPublisher systemMetricsPublisher) {
|
||||
this.systemMetricsPublisher = systemMetricsPublisher;
|
||||
activeServiceContext.setSystemMetricsPublisher(systemMetricsPublisher);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SystemMetricsPublisher getSystemMetricsPublisher() {
|
||||
return systemMetricsPublisher;
|
||||
return activeServiceContext.getSystemMetricsPublisher();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRMApplicationHistoryWriter(
|
||||
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
|
||||
this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
|
||||
activeServiceContext
|
||||
.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -405,51 +352,51 @@ public class RMContextImpl implements RMContext {
|
|||
|
||||
@Override
|
||||
public long getEpoch() {
|
||||
return this.epoch;
|
||||
return activeServiceContext.getEpoch();
|
||||
}
|
||||
|
||||
void setEpoch(long epoch) {
|
||||
this.epoch = epoch;
|
||||
activeServiceContext.setEpoch(epoch);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RMNodeLabelsManager getNodeLabelManager() {
|
||||
return nodeLabelManager;
|
||||
return activeServiceContext.getNodeLabelManager();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNodeLabelManager(RMNodeLabelsManager mgr) {
|
||||
nodeLabelManager = mgr;
|
||||
activeServiceContext.setNodeLabelManager(mgr);
|
||||
}
|
||||
|
||||
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
|
||||
this.schedulerRecoveryStartTime = systemClock.getTime();
|
||||
this.schedulerRecoveryWaitTime = waitTime;
|
||||
activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime);
|
||||
}
|
||||
|
||||
public boolean isSchedulerReadyForAllocatingContainers() {
|
||||
if (isSchedulerReady) {
|
||||
return isSchedulerReady;
|
||||
}
|
||||
isSchedulerReady = (systemClock.getTime() - schedulerRecoveryStartTime)
|
||||
> schedulerRecoveryWaitTime;
|
||||
if (!isSchedulerReady && printLog) {
|
||||
LOG.info("Skip allocating containers. Scheduler is waiting for recovery.");
|
||||
printLog = false;
|
||||
}
|
||||
if (isSchedulerReady) {
|
||||
LOG.info("Scheduler recovery is done. Start allocating new containers.");
|
||||
}
|
||||
return isSchedulerReady;
|
||||
return activeServiceContext.isSchedulerReadyForAllocatingContainers();
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public void setSystemClock(Clock clock) {
|
||||
this.systemClock = clock;
|
||||
activeServiceContext.setSystemClock(clock);
|
||||
}
|
||||
|
||||
public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
|
||||
return systemCredentials;
|
||||
return activeServiceContext.getSystemCredentialsForApps();
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public RMActiveServiceContext getActiveServiceContext() {
|
||||
return activeServiceContext;
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
void setActiveServiceContext(RMActiveServiceContext activeServiceContext) {
|
||||
this.activeServiceContext = activeServiceContext;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -400,6 +400,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
private ContainerAllocationExpirer containerAllocationExpirer;
|
||||
private ResourceManager rm;
|
||||
private boolean recoveryEnabled;
|
||||
private RMActiveServiceContext activeServiceContext;
|
||||
|
||||
RMActiveServices(ResourceManager rm) {
|
||||
super("RMActiveServices");
|
||||
|
@ -408,6 +409,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
|
||||
@Override
|
||||
protected void serviceInit(Configuration configuration) throws Exception {
|
||||
activeServiceContext = new RMActiveServiceContext();
|
||||
rmContext.setActiveServiceContext(activeServiceContext);
|
||||
|
||||
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
|
||||
|
||||
rmSecretManagerService = createRMSecretManagerService();
|
||||
|
@ -1008,11 +1012,15 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
if (activeServices != null) {
|
||||
activeServices.stop();
|
||||
activeServices = null;
|
||||
rmContext.getRMNodes().clear();
|
||||
rmContext.getInactiveRMNodes().clear();
|
||||
rmContext.getRMApps().clear();
|
||||
}
|
||||
}
|
||||
|
||||
void reinitialize(boolean initialize) throws Exception {
|
||||
ClusterMetrics.destroy();
|
||||
QueueMetrics.clearQueueMetrics();
|
||||
if (initialize) {
|
||||
resetDispatcher();
|
||||
createAndInitActiveServices();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1036,8 +1044,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
startActiveServices();
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
resetDispatcher();
|
||||
createAndInitActiveServices();
|
||||
reinitialize(true);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -1059,10 +1066,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
if (rmContext.getHAServiceState() ==
|
||||
HAServiceProtocol.HAServiceState.ACTIVE) {
|
||||
stopActiveServices();
|
||||
if (initialize) {
|
||||
resetDispatcher();
|
||||
createAndInitActiveServices();
|
||||
}
|
||||
reinitialize(initialize);
|
||||
}
|
||||
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
|
||||
LOG.info("Transitioned to standby state");
|
||||
|
|
|
@ -513,6 +513,68 @@ public class TestRMHA {
|
|||
rm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverClearsRMContext() throws Exception {
|
||||
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||
configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||
Configuration conf = new YarnConfiguration(configuration);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
// 1. start RM
|
||||
rm = new MockRM(conf, memStore);
|
||||
rm.init(conf);
|
||||
rm.start();
|
||||
|
||||
StateChangeRequestInfo requestInfo =
|
||||
new StateChangeRequestInfo(
|
||||
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||
checkMonitorHealth();
|
||||
checkStandbyRMFunctionality();
|
||||
|
||||
// 2. Transition to active
|
||||
rm.adminService.transitionToActive(requestInfo);
|
||||
checkMonitorHealth();
|
||||
checkActiveRMFunctionality();
|
||||
verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
|
||||
assertEquals(1, rm.getRMContext().getRMNodes().size());
|
||||
assertEquals(1, rm.getRMContext().getRMApps().size());
|
||||
|
||||
// 3. Create new RM
|
||||
rm = new MockRM(conf, memStore) {
|
||||
@Override
|
||||
protected ResourceTrackerService createResourceTrackerService() {
|
||||
return new ResourceTrackerService(this.rmContext,
|
||||
this.nodesListManager, this.nmLivelinessMonitor,
|
||||
this.rmContext.getContainerTokenSecretManager(),
|
||||
this.rmContext.getNMTokenSecretManager()) {
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
throw new Exception("ResourceTracker service failed");
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
rm.init(conf);
|
||||
rm.start();
|
||||
checkMonitorHealth();
|
||||
checkStandbyRMFunctionality();
|
||||
|
||||
// 4. Try Transition to active, throw exception
|
||||
try {
|
||||
rm.adminService.transitionToActive(requestInfo);
|
||||
Assert.fail("Transitioned to Active should throw exception.");
|
||||
} catch (Exception e) {
|
||||
assertTrue("Error when transitioning to Active mode".contains(e
|
||||
.getMessage()));
|
||||
}
|
||||
// 5. Clears the metrics
|
||||
verifyClusterMetrics(0, 0, 0, 0, 0, 0);
|
||||
assertEquals(0, rm.getRMContext().getRMNodes().size());
|
||||
assertEquals(0, rm.getRMContext().getRMApps().size());
|
||||
}
|
||||
|
||||
public void innerTestHAWithRMHostName(boolean includeBindHost) {
|
||||
//this is run two times, with and without a bind host configured
|
||||
if (includeBindHost) {
|
||||
|
|
Loading…
Reference in New Issue