YARN-1172. Convert SecretManagers in RM to services (Tsuyoshi OZAWA via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1553431 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2013-12-26 00:21:56 +00:00
parent 150440c607
commit d841a0f474
9 changed files with 193 additions and 100 deletions

View File

@ -187,6 +187,8 @@ Release 2.4.0 - UNRELEASED
YARN-1307. Redesign znode structure for Zookeeper based RM state-store for
better organization and scalability. (Tsuyoshi OZAWA via vinodkv)
YARN-1172. Convert SecretManagers in RM to services (Tsuyoshi OZAWA via kasha)
OPTIMIZATIONS
BUG FIXES

View File

@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import java.io.IOException;
public class RMSecretManagerService extends AbstractService {
AMRMTokenSecretManager amRmTokenSecretManager;
NMTokenSecretManagerInRM nmTokenSecretManager;
ClientToAMTokenSecretManagerInRM clientToAMSecretManager;
RMContainerTokenSecretManager containerTokenSecretManager;
RMDelegationTokenSecretManager rmDTSecretManager;
RMContextImpl rmContext;
/**
* Construct the service.
*
*/
public RMSecretManagerService(Configuration conf, RMContextImpl rmContext) {
super(RMSecretManagerService.class.getName());
this.rmContext = rmContext;
// To initialize correctly, these managers should be created before
// being called serviceInit().
nmTokenSecretManager = createNMTokenSecretManager(conf);
rmContext.setNMTokenSecretManager(nmTokenSecretManager);
containerTokenSecretManager = createContainerTokenSecretManager(conf);
rmContext.setContainerTokenSecretManager(containerTokenSecretManager);
clientToAMSecretManager = createClientToAMTokenSecretManager();
rmContext.setClientToAMTokenSecretManager(clientToAMSecretManager);
amRmTokenSecretManager = createAMRMTokenSecretManager(conf);
rmContext.setAMRMTokenSecretManager(amRmTokenSecretManager);
rmDTSecretManager =
createRMDelegationTokenSecretManager(conf, rmContext);
rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager);
}
@Override
public void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
}
@Override
public void serviceStart() throws Exception {
amRmTokenSecretManager.start();
containerTokenSecretManager.start();
nmTokenSecretManager.start();
try {
rmDTSecretManager.startThreads();
} catch(IOException ie) {
throw new YarnRuntimeException("Failed to start secret manager threads", ie);
}
super.serviceStart();
}
@Override
public void serviceStop() throws Exception {
if (rmDTSecretManager != null) {
rmDTSecretManager.stopThreads();
}
if (amRmTokenSecretManager != null) {
amRmTokenSecretManager.stop();
}
if (containerTokenSecretManager != null) {
containerTokenSecretManager.stop();
}
if(nmTokenSecretManager != null) {
nmTokenSecretManager.stop();
}
super.serviceStop();
}
protected RMContainerTokenSecretManager createContainerTokenSecretManager(
Configuration conf) {
return new RMContainerTokenSecretManager(conf);
}
protected NMTokenSecretManagerInRM createNMTokenSecretManager(
Configuration conf) {
return new NMTokenSecretManagerInRM(conf);
}
protected AMRMTokenSecretManager createAMRMTokenSecretManager(
Configuration conf) {
return new AMRMTokenSecretManager(conf);
}
protected ClientToAMTokenSecretManagerInRM createClientToAMTokenSecretManager() {
return new ClientToAMTokenSecretManagerInRM();
}
@VisibleForTesting
protected RMDelegationTokenSecretManager createRMDelegationTokenSecretManager(
Configuration conf, RMContext rmContext) {
long secretKeyInterval =
conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY,
YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
long tokenMaxLifetime =
conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY,
YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
long tokenRenewInterval =
conf.getLong(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
return new RMDelegationTokenSecretManager(secretKeyInterval,
tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext);
}
}

View File

@ -81,7 +81,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
@ -134,13 +133,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
* in Active state.
*/
protected RMActiveServices activeServices;
protected ClientToAMTokenSecretManagerInRM clientToAMSecretManager;
protected RMContainerTokenSecretManager containerTokenSecretManager;
protected NMTokenSecretManagerInRM nmTokenSecretManager;
protected AMRMTokenSecretManager amRmTokenSecretManager;
protected RMSecretManagerService rmSecretManagerService;
private Dispatcher rmDispatcher;
protected ResourceScheduler scheduler;
@ -154,7 +147,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected RMAppManager rmAppManager;
protected ApplicationACLsManager applicationACLsManager;
protected QueueACLsManager queueACLsManager;
protected RMDelegationTokenSecretManager rmDTSecretManager;
private DelegationTokenRenewer delegationTokenRenewer;
private WebApp webApp;
protected ResourceTrackerService resourceTracker;
@ -211,16 +203,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
rmContext.setStateStore(rmStore);
}
protected RMContainerTokenSecretManager createContainerTokenSecretManager(
Configuration conf) {
return new RMContainerTokenSecretManager(conf);
}
protected NMTokenSecretManagerInRM createNMTokenSecretManager(
Configuration conf) {
return new NMTokenSecretManagerInRM(conf);
}
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler);
}
@ -234,11 +216,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
return new AsyncDispatcher();
}
protected AMRMTokenSecretManager createAMRMTokenSecretManager(
Configuration conf) {
return new AMRMTokenSecretManager(conf);
}
protected ResourceScheduler createScheduler() {
String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER,
YarnConfiguration.DEFAULT_RM_SCHEDULER);
@ -324,11 +301,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
addIfService(rmDispatcher);
rmContext.setDispatcher(rmDispatcher);
clientToAMSecretManager = new ClientToAMTokenSecretManagerInRM();
rmContext.setClientToAMTokenSecretManager(clientToAMSecretManager);
amRmTokenSecretManager = createAMRMTokenSecretManager(conf);
rmContext.setAMRMTokenSecretManager(amRmTokenSecretManager);
rmSecretManagerService = createRMSecretManagerService();
addService(rmSecretManagerService);
containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
addService(containerAllocationExpirer);
@ -342,12 +316,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
addService(amFinishingMonitor);
rmContext.setAMFinishingMonitor(amFinishingMonitor);
containerTokenSecretManager = createContainerTokenSecretManager(conf);
rmContext.setContainerTokenSecretManager(containerTokenSecretManager);
nmTokenSecretManager = createNMTokenSecretManager(conf);
rmContext.setNMTokenSecretManager(nmTokenSecretManager);
boolean isRecoveryEnabled = conf.getBoolean(
YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
@ -435,8 +403,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
rmAppManager = createRMAppManager();
// Register event handler for RMAppManagerEvents
rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
rmDTSecretManager = createRMDelegationTokenSecretManager(rmContext);
rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager);
clientRM = createClientRMService();
rmContext.setClientRMService(clientRM);
@ -460,10 +426,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override
protected void serviceStart() throws Exception {
amRmTokenSecretManager.start();
containerTokenSecretManager.start();
nmTokenSecretManager.start();
RMStateStore rmStore = rmContext.getStateStore();
// The state store needs to start irrespective of recoveryEnabled as apps
// need events to move to further states.
@ -481,13 +443,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
throw e;
}
}
startWepApp();
try {
rmDTSecretManager.startThreads();
} catch(IOException ie) {
throw new YarnRuntimeException("Failed to start secret manager threads", ie);
}
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
int port = webApp.port();
@ -502,19 +458,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
if (webApp != null) {
webApp.stop();
}
if (rmDTSecretManager != null) {
rmDTSecretManager.stopThreads();
}
if (amRmTokenSecretManager != null) {
amRmTokenSecretManager.stop();
}
if (containerTokenSecretManager != null) {
containerTokenSecretManager.stop();
}
if(nmTokenSecretManager != null) {
nmTokenSecretManager.stop();
}
DefaultMetricsSystem.shutdown();
@ -939,30 +883,15 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected ResourceTrackerService createResourceTrackerService() {
return new ResourceTrackerService(this.rmContext, this.nodesListManager,
this.nmLivelinessMonitor, this.containerTokenSecretManager,
this.nmTokenSecretManager);
}
protected RMDelegationTokenSecretManager
createRMDelegationTokenSecretManager(RMContext rmContext) {
long secretKeyInterval =
conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY,
YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
long tokenMaxLifetime =
conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY,
YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
long tokenRenewInterval =
conf.getLong(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
return new RMDelegationTokenSecretManager(secretKeyInterval,
tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext);
this.nmLivelinessMonitor,
this.rmContext.getContainerTokenSecretManager(),
this.rmContext.getNMTokenSecretManager());
}
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
this.applicationACLsManager, this.queueACLsManager,
this.rmDTSecretManager);
getRMDTSecretManager());
}
protected ApplicationMasterService createApplicationMasterService() {
@ -973,6 +902,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
return new AdminService(this, rmContext);
}
protected RMSecretManagerService createRMSecretManagerService() {
return new RMSecretManagerService(conf, rmContext);
}
@Private
public ClientRMService getClientRMService() {
return this.clientRM;
@ -1013,23 +946,28 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Private
public RMContainerTokenSecretManager getRMContainerTokenSecretManager() {
return this.containerTokenSecretManager;
return this.rmContext.getContainerTokenSecretManager();
}
@Private
public NMTokenSecretManagerInRM getRMNMTokenSecretManager() {
return this.nmTokenSecretManager;
return this.rmContext.getNMTokenSecretManager();
}
@Private
public AMRMTokenSecretManager getAMRMTokenSecretManager(){
return this.amRmTokenSecretManager;
return this.rmContext.getAMRMTokenSecretManager();
}
@Private
public RMDelegationTokenSecretManager getRMDTSecretManager(){
return this.rmContext.getRMDelegationTokenSecretManager();
}
@Override
public void recover(RMState state) throws Exception {
// recover RMdelegationTokenSecretManager
rmDTSecretManager.recover(state);
getRMDTSecretManager().recover(state);
// recover applications
rmAppManager.recover(state);

View File

@ -61,7 +61,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
@ -309,7 +310,7 @@ public class MockRM extends ResourceManager {
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), getResourceScheduler(),
rmAppManager, applicationACLsManager, queueACLsManager,
rmDTSecretManager) {
getRMDTSecretManager()) {
@Override
protected void serviceStart() {
// override to not start rpc handler
@ -325,8 +326,12 @@ public class MockRM extends ResourceManager {
@Override
protected ResourceTrackerService createResourceTrackerService() {
Configuration conf = new Configuration();
RMContainerTokenSecretManager containerTokenSecretManager =
getRMContainerTokenSecretManager();
containerTokenSecretManager.rollMasterKey();
NMTokenSecretManagerInRM nmTokenSecretManager =
getRMNMTokenSecretManager();
nmTokenSecretManager.rollMasterKey();
return new ResourceTrackerService(getRMContext(), nodesListManager,
this.nmLivelinessMonitor, containerTokenSecretManager,
@ -398,12 +403,8 @@ public class MockRM extends ResourceManager {
return this.nodesListManager;
}
public RMDelegationTokenSecretManager getRMDTSecretManager() {
return this.rmDTSecretManager;
}
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
return this.clientToAMSecretManager;
return this.getRMContext().getClientToAMTokenSecretManager();
}
public RMAppManager getRMAppManager() {

View File

@ -86,7 +86,7 @@ public abstract class QueueACLsTestBase {
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), this.scheduler,
this.rmAppManager, this.applicationACLsManager,
this.queueACLsManager, this.rmDTSecretManager);
this.queueACLsManager, getRMDTSecretManager());
};
@Override

View File

@ -133,7 +133,7 @@ public class TestClientRMService {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
this.rmDTSecretManager);
this.getRMDTSecretManager());
};
};
rm.start();

View File

@ -1567,7 +1567,7 @@ public class TestRMRestart {
@Override
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), getResourceScheduler(),
rmAppManager, applicationACLsManager, null, rmDTSecretManager){
rmAppManager, applicationACLsManager, null, getRMDTSecretManager()){
@Override
protected void serviceStart() throws Exception {
// do nothing

View File

@ -168,7 +168,7 @@ public class TestClientToAMTokens {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
this.rmDTSecretManager);
getRMDTSecretManager());
};
@Override

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService;
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@ -166,13 +167,21 @@ public class TestRMDelegationTokens {
}
@Override
protected RMDelegationTokenSecretManager
createRMDelegationTokenSecretManager(RMContext rmContext) {
// KeyUpdateInterval-> 1 seconds
// TokenMaxLifetime-> 2 seconds.
return new TestRMDelegationTokenSecretManager(1000, 1000, 2000, 1000,
rmContext);
protected RMSecretManagerService createRMSecretManagerService() {
return new RMSecretManagerService(conf, rmContext) {
@Override
protected RMDelegationTokenSecretManager
createRMDelegationTokenSecretManager(Configuration conf,
RMContext rmContext) {
// KeyUpdateInterval-> 1 seconds
// TokenMaxLifetime-> 2 seconds.
return new TestRMDelegationTokenSecretManager(1000, 1000, 2000, 1000,
rmContext);
}
};
}
}
public class TestRMDelegationTokenSecretManager extends