YARN-1098. Separate out RM services into Always On and Active (Karthik Kambatla via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1521560 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-09-10 17:43:51 +00:00
parent 11944e52fd
commit bcb865314f
2 changed files with 227 additions and 209 deletions

View File

@ -27,6 +27,8 @@ Release 2.3.0 - UNRELEASED
IMPROVEMENTS
YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza)
YARN-1098. Separate out RM services into Always On and Active (Karthik
Kambatla via bikas)
OPTIMIZATIONS

View File

@ -107,9 +107,18 @@ public class ResourceManager extends CompositeService implements Recoverable {
private static final Log LOG = LogFactory.getLog(ResourceManager.class);
public static final long clusterTimeStamp = System.currentTimeMillis();
/**
* "Active" services. Services that need to run only on the Active RM.
* These services are managed (initialized, started, stopped) by the
* {@link CompositeService} RMActiveServices.
*
* RM is active when (1) HA is disabled, or (2) HA is enabled and the RM is
* in Active state.
*/
protected RMActiveServices activeServices;
protected ClientToAMTokenSecretManagerInRM clientToAMSecretManager =
new ClientToAMTokenSecretManagerInRM();
protected RMContainerTokenSecretManager containerTokenSecretManager;
protected NMTokenSecretManagerInRM nmTokenSecretManager;
@ -135,6 +144,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected ResourceTrackerService resourceTracker;
private boolean recoveryEnabled;
/** End of Active services */
private Configuration conf;
public ResourceManager() {
@ -147,137 +158,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override
protected void serviceInit(Configuration conf) throws Exception {
validateConfigs(conf);
this.conf = conf;
this.conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
this.rmDispatcher = createDispatcher();
addIfService(this.rmDispatcher);
this.amRmTokenSecretManager = createAMRMTokenSecretManager(conf);
this.containerAllocationExpirer = new ContainerAllocationExpirer(
this.rmDispatcher);
addService(this.containerAllocationExpirer);
AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
addService(amLivelinessMonitor);
AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
addService(amFinishingMonitor);
this.containerTokenSecretManager = createContainerTokenSecretManager(conf);
this.nmTokenSecretManager = createNMTokenSecretManager(conf);
boolean isRecoveryEnabled = conf.getBoolean(
YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
RMStateStore rmStore = null;
if(isRecoveryEnabled) {
recoveryEnabled = true;
rmStore = RMStateStoreFactory.getStore(conf);
} else {
recoveryEnabled = false;
rmStore = new NullRMStateStore();
}
try {
rmStore.init(conf);
rmStore.setRMDispatcher(rmDispatcher);
} catch (Exception e) {
// the Exception from stateStore.init() needs to be handled for
// HA and we need to give up master status if we got fenced
LOG.error("Failed to init state store", e);
ExitUtil.terminate(1, e);
}
if (UserGroupInformation.isSecurityEnabled()) {
this.delegationTokenRenewer = createDelegationTokenRenewer();
}
this.rmContext =
new RMContextImpl(this.rmDispatcher, rmStore,
this.containerAllocationExpirer, amLivelinessMonitor,
amFinishingMonitor, delegationTokenRenewer, this.amRmTokenSecretManager,
this.containerTokenSecretManager, this.nmTokenSecretManager,
this.clientToAMSecretManager);
// Register event handler for NodesListManager
this.nodesListManager = new NodesListManager(this.rmContext);
this.rmDispatcher.register(NodesListManagerEventType.class,
this.nodesListManager);
addService(nodesListManager);
// Initialize the scheduler
this.scheduler = createScheduler();
this.schedulerDispatcher = createSchedulerEventDispatcher();
addIfService(this.schedulerDispatcher);
this.rmDispatcher.register(SchedulerEventType.class,
this.schedulerDispatcher);
// Register event handler for RmAppEvents
this.rmDispatcher.register(RMAppEventType.class,
new ApplicationEventDispatcher(this.rmContext));
// Register event handler for RmAppAttemptEvents
this.rmDispatcher.register(RMAppAttemptEventType.class,
new ApplicationAttemptEventDispatcher(this.rmContext));
// Register event handler for RmNodes
this.rmDispatcher.register(RMNodeEventType.class,
new NodeEventDispatcher(this.rmContext));
this.nmLivelinessMonitor = createNMLivelinessMonitor();
addService(this.nmLivelinessMonitor);
this.resourceTracker = createResourceTrackerService();
addService(resourceTracker);
DefaultMetricsSystem.initialize("ResourceManager");
JvmMetrics.initSingleton("ResourceManager", null);
try {
this.scheduler.reinitialize(conf, this.rmContext);
} catch (IOException ioe) {
throw new RuntimeException("Failed to initialize scheduler", ioe);
}
// creating monitors that handle preemption
createPolicyMonitors();
masterService = createApplicationMasterService();
addService(masterService) ;
this.applicationACLsManager = new ApplicationACLsManager(conf);
this.rmAppManager = createRMAppManager();
// Register event handler for RMAppManagerEvents
this.rmDispatcher.register(RMAppManagerEventType.class,
this.rmAppManager);
this.rmDTSecretManager = createRMDelegationTokenSecretManager(this.rmContext);
rmContext.setRMDelegationTokenSecretManager(this.rmDTSecretManager);
clientRM = createClientRMService();
rmContext.setClientRMService(clientRM);
addService(clientRM);
adminService = createAdminService(clientRM, masterService, resourceTracker);
addService(adminService);
this.applicationMasterLauncher = createAMLauncher();
this.rmDispatcher.register(AMLauncherEventType.class,
this.applicationMasterLauncher);
addService(applicationMasterLauncher);
if (UserGroupInformation.isSecurityEnabled()) {
addService(delegationTokenRenewer);
delegationTokenRenewer.setRMContext(rmContext);
}
new RMNMInfo(this.rmContext, this.scheduler);
activeServices = new RMActiveServices();
addService(activeServices);
super.serviceInit(conf);
}
@ -378,6 +263,217 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
}
/**
* RMActiveServices handles all the Active services in the RM.
*/
@Private
class RMActiveServices extends CompositeService {
RMActiveServices() {
super("RMActiveServices");
}
@Override
protected void serviceInit(Configuration configuration) throws Exception {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
rmDispatcher = createDispatcher();
addIfService(rmDispatcher);
amRmTokenSecretManager = createAMRMTokenSecretManager(conf);
containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
addService(containerAllocationExpirer);
AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
addService(amLivelinessMonitor);
AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
addService(amFinishingMonitor);
containerTokenSecretManager = createContainerTokenSecretManager(conf);
nmTokenSecretManager = createNMTokenSecretManager(conf);
boolean isRecoveryEnabled = conf.getBoolean(
YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
RMStateStore rmStore = null;
if(isRecoveryEnabled) {
recoveryEnabled = true;
rmStore = RMStateStoreFactory.getStore(conf);
} else {
recoveryEnabled = false;
rmStore = new NullRMStateStore();
}
try {
rmStore.init(conf);
rmStore.setRMDispatcher(rmDispatcher);
} catch (Exception e) {
// the Exception from stateStore.init() needs to be handled for
// HA and we need to give up master status if we got fenced
LOG.error("Failed to init state store", e);
ExitUtil.terminate(1, e);
}
if (UserGroupInformation.isSecurityEnabled()) {
delegationTokenRenewer = createDelegationTokenRenewer();
}
rmContext = new RMContextImpl(
rmDispatcher, rmStore, containerAllocationExpirer, amLivelinessMonitor,
amFinishingMonitor, delegationTokenRenewer, amRmTokenSecretManager,
containerTokenSecretManager, nmTokenSecretManager,
clientToAMSecretManager);
// Register event handler for NodesListManager
nodesListManager = new NodesListManager(rmContext);
rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
addService(nodesListManager);
// Initialize the scheduler
scheduler = createScheduler();
schedulerDispatcher = createSchedulerEventDispatcher();
addIfService(schedulerDispatcher);
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
// Register event handler for RmAppEvents
rmDispatcher.register(RMAppEventType.class,
new ApplicationEventDispatcher(rmContext));
// Register event handler for RmAppAttemptEvents
rmDispatcher.register(RMAppAttemptEventType.class,
new ApplicationAttemptEventDispatcher(rmContext));
// Register event handler for RmNodes
rmDispatcher.register(
RMNodeEventType.class, new NodeEventDispatcher(rmContext));
nmLivelinessMonitor = createNMLivelinessMonitor();
addService(nmLivelinessMonitor);
resourceTracker = createResourceTrackerService();
addService(resourceTracker);
DefaultMetricsSystem.initialize("ResourceManager");
JvmMetrics.initSingleton("ResourceManager", null);
try {
scheduler.reinitialize(conf, rmContext);
} catch (IOException ioe) {
throw new RuntimeException("Failed to initialize scheduler", ioe);
}
// creating monitors that handle preemption
createPolicyMonitors();
masterService = createApplicationMasterService();
addService(masterService) ;
applicationACLsManager = new ApplicationACLsManager(conf);
rmAppManager = createRMAppManager();
// Register event handler for RMAppManagerEvents
rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
rmDTSecretManager = createRMDelegationTokenSecretManager(rmContext);
rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager);
clientRM = createClientRMService();
rmContext.setClientRMService(clientRM);
addService(clientRM);
adminService = createAdminService(clientRM, masterService, resourceTracker);
addService(adminService);
applicationMasterLauncher = createAMLauncher();
rmDispatcher.register(AMLauncherEventType.class,
applicationMasterLauncher);
addService(applicationMasterLauncher);
if (UserGroupInformation.isSecurityEnabled()) {
addService(delegationTokenRenewer);
delegationTokenRenewer.setRMContext(rmContext);
}
new RMNMInfo(rmContext, scheduler);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
amRmTokenSecretManager.start();
containerTokenSecretManager.start();
nmTokenSecretManager.start();
RMStateStore rmStore = rmContext.getStateStore();
// The state store needs to start irrespective of recoveryEnabled as apps
// need events to move to further states.
rmStore.start();
if(recoveryEnabled) {
try {
RMState state = rmStore.loadState();
recover(state);
} catch (Exception e) {
// the Exception from loadState() needs to be handled for
// HA and we need to give up master status if we got fenced
LOG.error("Failed to load/recover state", e);
ExitUtil.terminate(1, e);
}
}
startWepApp();
try {
rmDTSecretManager.startThreads();
} catch(IOException ie) {
throw new YarnRuntimeException("Failed to start secret manager threads", ie);
}
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String hostname = getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS);
hostname = (hostname.contains(":")) ? hostname.substring(0, hostname.indexOf(":")) : hostname;
int port = webApp.port();
String resolvedAddress = hostname + ":" + port;
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, resolvedAddress);
}
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (webApp != null) {
webApp.stop();
}
if (rmDTSecretManager != null) {
rmDTSecretManager.stopThreads();
}
if (amRmTokenSecretManager != null) {
amRmTokenSecretManager.stop();
}
if (containerTokenSecretManager != null) {
containerTokenSecretManager.stop();
}
if(nmTokenSecretManager != null) {
nmTokenSecretManager.stop();
}
DefaultMetricsSystem.shutdown();
if (rmContext != null) {
RMStateStore store = rmContext.getStateStore();
try {
store.close();
} catch (Exception e) {
LOG.error("Error closing store.", e);
}
}
super.serviceStop();
}
}
@Private
public static class SchedulerEventDispatcher extends AbstractService
implements EventHandler<SchedulerEvent> {
@ -620,54 +716,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
throw new YarnRuntimeException("Failed to login", ie);
}
this.amRmTokenSecretManager.start();
this.containerTokenSecretManager.start();
this.nmTokenSecretManager.start();
RMStateStore rmStore = rmContext.getStateStore();
// The state store needs to start irrespective of recoveryEnabled as apps
// need events to move to further states.
rmStore.start();
if(recoveryEnabled) {
try {
RMState state = rmStore.loadState();
recover(state);
} catch (Exception e) {
// the Exception from loadState() needs to be handled for
// HA and we need to give up master status if we got fenced
LOG.error("Failed to load/recover state", e);
ExitUtil.terminate(1, e);
}
}
startWepApp();
try {
rmDTSecretManager.startThreads();
} catch(IOException ie) {
throw new YarnRuntimeException("Failed to start secret manager threads", ie);
}
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
String hostname = getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS);
hostname = (hostname.contains(":")) ? hostname.substring(0, hostname.indexOf(":")) : hostname;
int port = webApp.port();
String resolvedAddress = hostname + ":" + port;
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, resolvedAddress);
}
super.serviceStart();
/*synchronized(shutdown) {
try {
while(!shutdown.get()) {
shutdown.wait();
}
} catch(InterruptedException ie) {
LOG.info("Interrupted while waiting", ie);
}
}*/
}
protected void doSecureLogin() throws IOException {
@ -677,39 +726,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override
protected void serviceStop() throws Exception {
if (webApp != null) {
webApp.stop();
}
if (rmDTSecretManager != null) {
rmDTSecretManager.stopThreads();
}
if (amRmTokenSecretManager != null) {
this.amRmTokenSecretManager.stop();
}
if (containerTokenSecretManager != null) {
this.containerTokenSecretManager.stop();
}
if(nmTokenSecretManager != null) {
nmTokenSecretManager.stop();
}
/*synchronized(shutdown) {
shutdown.set(true);
shutdown.notifyAll();
}*/
DefaultMetricsSystem.shutdown();
if (rmContext != null) {
RMStateStore store = rmContext.getStateStore();
try {
store.close();
} catch (Exception e) {
LOG.error("Error closing store.", e);
}
}
super.serviceStop();
}