YARN-1172. Convert SecretManagers in RM to services (Tsuyoshi OZAWA via kasha)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1553432 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1eaed17cdf
commit
4d85f9a0ed
|
@ -169,6 +169,8 @@ Release 2.4.0 - UNRELEASED
|
||||||
YARN-1307. Redesign znode structure for Zookeeper based RM state-store for
|
YARN-1307. Redesign znode structure for Zookeeper based RM state-store for
|
||||||
better organization and scalability. (Tsuyoshi OZAWA via vinodkv)
|
better organization and scalability. (Tsuyoshi OZAWA via vinodkv)
|
||||||
|
|
||||||
|
YARN-1172. Convert SecretManagers in RM to services (Tsuyoshi OZAWA via kasha)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -0,0 +1,143 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class RMSecretManagerService extends AbstractService {
|
||||||
|
|
||||||
|
AMRMTokenSecretManager amRmTokenSecretManager;
|
||||||
|
NMTokenSecretManagerInRM nmTokenSecretManager;
|
||||||
|
ClientToAMTokenSecretManagerInRM clientToAMSecretManager;
|
||||||
|
RMContainerTokenSecretManager containerTokenSecretManager;
|
||||||
|
RMDelegationTokenSecretManager rmDTSecretManager;
|
||||||
|
|
||||||
|
RMContextImpl rmContext;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct the service.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public RMSecretManagerService(Configuration conf, RMContextImpl rmContext) {
|
||||||
|
super(RMSecretManagerService.class.getName());
|
||||||
|
this.rmContext = rmContext;
|
||||||
|
|
||||||
|
// To initialize correctly, these managers should be created before
|
||||||
|
// being called serviceInit().
|
||||||
|
nmTokenSecretManager = createNMTokenSecretManager(conf);
|
||||||
|
rmContext.setNMTokenSecretManager(nmTokenSecretManager);
|
||||||
|
|
||||||
|
containerTokenSecretManager = createContainerTokenSecretManager(conf);
|
||||||
|
rmContext.setContainerTokenSecretManager(containerTokenSecretManager);
|
||||||
|
|
||||||
|
clientToAMSecretManager = createClientToAMTokenSecretManager();
|
||||||
|
rmContext.setClientToAMTokenSecretManager(clientToAMSecretManager);
|
||||||
|
|
||||||
|
amRmTokenSecretManager = createAMRMTokenSecretManager(conf);
|
||||||
|
rmContext.setAMRMTokenSecretManager(amRmTokenSecretManager);
|
||||||
|
|
||||||
|
rmDTSecretManager =
|
||||||
|
createRMDelegationTokenSecretManager(conf, rmContext);
|
||||||
|
rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serviceInit(Configuration conf) throws Exception {
|
||||||
|
super.serviceInit(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serviceStart() throws Exception {
|
||||||
|
amRmTokenSecretManager.start();
|
||||||
|
containerTokenSecretManager.start();
|
||||||
|
nmTokenSecretManager.start();
|
||||||
|
|
||||||
|
try {
|
||||||
|
rmDTSecretManager.startThreads();
|
||||||
|
} catch(IOException ie) {
|
||||||
|
throw new YarnRuntimeException("Failed to start secret manager threads", ie);
|
||||||
|
}
|
||||||
|
super.serviceStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serviceStop() throws Exception {
|
||||||
|
if (rmDTSecretManager != null) {
|
||||||
|
rmDTSecretManager.stopThreads();
|
||||||
|
}
|
||||||
|
if (amRmTokenSecretManager != null) {
|
||||||
|
amRmTokenSecretManager.stop();
|
||||||
|
}
|
||||||
|
if (containerTokenSecretManager != null) {
|
||||||
|
containerTokenSecretManager.stop();
|
||||||
|
}
|
||||||
|
if(nmTokenSecretManager != null) {
|
||||||
|
nmTokenSecretManager.stop();
|
||||||
|
}
|
||||||
|
super.serviceStop();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected RMContainerTokenSecretManager createContainerTokenSecretManager(
|
||||||
|
Configuration conf) {
|
||||||
|
return new RMContainerTokenSecretManager(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected NMTokenSecretManagerInRM createNMTokenSecretManager(
|
||||||
|
Configuration conf) {
|
||||||
|
return new NMTokenSecretManagerInRM(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected AMRMTokenSecretManager createAMRMTokenSecretManager(
|
||||||
|
Configuration conf) {
|
||||||
|
return new AMRMTokenSecretManager(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ClientToAMTokenSecretManagerInRM createClientToAMTokenSecretManager() {
|
||||||
|
return new ClientToAMTokenSecretManagerInRM();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected RMDelegationTokenSecretManager createRMDelegationTokenSecretManager(
|
||||||
|
Configuration conf, RMContext rmContext) {
|
||||||
|
long secretKeyInterval =
|
||||||
|
conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY,
|
||||||
|
YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
|
||||||
|
long tokenMaxLifetime =
|
||||||
|
conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY,
|
||||||
|
YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
|
||||||
|
long tokenRenewInterval =
|
||||||
|
conf.getLong(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
|
||||||
|
YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
|
||||||
|
|
||||||
|
return new RMDelegationTokenSecretManager(secretKeyInterval,
|
||||||
|
tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -81,7 +81,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
|
||||||
|
@ -134,13 +133,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
* in Active state.
|
* in Active state.
|
||||||
*/
|
*/
|
||||||
protected RMActiveServices activeServices;
|
protected RMActiveServices activeServices;
|
||||||
protected ClientToAMTokenSecretManagerInRM clientToAMSecretManager;
|
protected RMSecretManagerService rmSecretManagerService;
|
||||||
|
|
||||||
protected RMContainerTokenSecretManager containerTokenSecretManager;
|
|
||||||
protected NMTokenSecretManagerInRM nmTokenSecretManager;
|
|
||||||
|
|
||||||
protected AMRMTokenSecretManager amRmTokenSecretManager;
|
|
||||||
|
|
||||||
private Dispatcher rmDispatcher;
|
private Dispatcher rmDispatcher;
|
||||||
|
|
||||||
protected ResourceScheduler scheduler;
|
protected ResourceScheduler scheduler;
|
||||||
|
@ -154,7 +147,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
protected RMAppManager rmAppManager;
|
protected RMAppManager rmAppManager;
|
||||||
protected ApplicationACLsManager applicationACLsManager;
|
protected ApplicationACLsManager applicationACLsManager;
|
||||||
protected QueueACLsManager queueACLsManager;
|
protected QueueACLsManager queueACLsManager;
|
||||||
protected RMDelegationTokenSecretManager rmDTSecretManager;
|
|
||||||
private DelegationTokenRenewer delegationTokenRenewer;
|
private DelegationTokenRenewer delegationTokenRenewer;
|
||||||
private WebApp webApp;
|
private WebApp webApp;
|
||||||
protected ResourceTrackerService resourceTracker;
|
protected ResourceTrackerService resourceTracker;
|
||||||
|
@ -211,16 +203,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
rmContext.setStateStore(rmStore);
|
rmContext.setStateStore(rmStore);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected RMContainerTokenSecretManager createContainerTokenSecretManager(
|
|
||||||
Configuration conf) {
|
|
||||||
return new RMContainerTokenSecretManager(conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected NMTokenSecretManagerInRM createNMTokenSecretManager(
|
|
||||||
Configuration conf) {
|
|
||||||
return new NMTokenSecretManagerInRM(conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
|
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
|
||||||
return new SchedulerEventDispatcher(this.scheduler);
|
return new SchedulerEventDispatcher(this.scheduler);
|
||||||
}
|
}
|
||||||
|
@ -234,11 +216,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
return new AsyncDispatcher();
|
return new AsyncDispatcher();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected AMRMTokenSecretManager createAMRMTokenSecretManager(
|
|
||||||
Configuration conf) {
|
|
||||||
return new AMRMTokenSecretManager(conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected ResourceScheduler createScheduler() {
|
protected ResourceScheduler createScheduler() {
|
||||||
String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER,
|
String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER);
|
YarnConfiguration.DEFAULT_RM_SCHEDULER);
|
||||||
|
@ -324,11 +301,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
addIfService(rmDispatcher);
|
addIfService(rmDispatcher);
|
||||||
rmContext.setDispatcher(rmDispatcher);
|
rmContext.setDispatcher(rmDispatcher);
|
||||||
|
|
||||||
clientToAMSecretManager = new ClientToAMTokenSecretManagerInRM();
|
rmSecretManagerService = createRMSecretManagerService();
|
||||||
rmContext.setClientToAMTokenSecretManager(clientToAMSecretManager);
|
addService(rmSecretManagerService);
|
||||||
|
|
||||||
amRmTokenSecretManager = createAMRMTokenSecretManager(conf);
|
|
||||||
rmContext.setAMRMTokenSecretManager(amRmTokenSecretManager);
|
|
||||||
|
|
||||||
containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
|
containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
|
||||||
addService(containerAllocationExpirer);
|
addService(containerAllocationExpirer);
|
||||||
|
@ -342,12 +316,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
addService(amFinishingMonitor);
|
addService(amFinishingMonitor);
|
||||||
rmContext.setAMFinishingMonitor(amFinishingMonitor);
|
rmContext.setAMFinishingMonitor(amFinishingMonitor);
|
||||||
|
|
||||||
containerTokenSecretManager = createContainerTokenSecretManager(conf);
|
|
||||||
rmContext.setContainerTokenSecretManager(containerTokenSecretManager);
|
|
||||||
|
|
||||||
nmTokenSecretManager = createNMTokenSecretManager(conf);
|
|
||||||
rmContext.setNMTokenSecretManager(nmTokenSecretManager);
|
|
||||||
|
|
||||||
boolean isRecoveryEnabled = conf.getBoolean(
|
boolean isRecoveryEnabled = conf.getBoolean(
|
||||||
YarnConfiguration.RECOVERY_ENABLED,
|
YarnConfiguration.RECOVERY_ENABLED,
|
||||||
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
|
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
|
||||||
|
@ -435,8 +403,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
rmAppManager = createRMAppManager();
|
rmAppManager = createRMAppManager();
|
||||||
// Register event handler for RMAppManagerEvents
|
// Register event handler for RMAppManagerEvents
|
||||||
rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
|
rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
|
||||||
rmDTSecretManager = createRMDelegationTokenSecretManager(rmContext);
|
|
||||||
rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager);
|
|
||||||
|
|
||||||
clientRM = createClientRMService();
|
clientRM = createClientRMService();
|
||||||
rmContext.setClientRMService(clientRM);
|
rmContext.setClientRMService(clientRM);
|
||||||
|
@ -460,10 +426,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
amRmTokenSecretManager.start();
|
|
||||||
containerTokenSecretManager.start();
|
|
||||||
nmTokenSecretManager.start();
|
|
||||||
|
|
||||||
RMStateStore rmStore = rmContext.getStateStore();
|
RMStateStore rmStore = rmContext.getStateStore();
|
||||||
// The state store needs to start irrespective of recoveryEnabled as apps
|
// The state store needs to start irrespective of recoveryEnabled as apps
|
||||||
// need events to move to further states.
|
// need events to move to further states.
|
||||||
|
@ -481,13 +443,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
startWepApp();
|
startWepApp();
|
||||||
try {
|
|
||||||
rmDTSecretManager.startThreads();
|
|
||||||
} catch(IOException ie) {
|
|
||||||
throw new YarnRuntimeException("Failed to start secret manager threads", ie);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
|
if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
|
||||||
int port = webApp.port();
|
int port = webApp.port();
|
||||||
|
@ -502,19 +458,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
if (webApp != null) {
|
if (webApp != null) {
|
||||||
webApp.stop();
|
webApp.stop();
|
||||||
}
|
}
|
||||||
if (rmDTSecretManager != null) {
|
|
||||||
rmDTSecretManager.stopThreads();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (amRmTokenSecretManager != null) {
|
|
||||||
amRmTokenSecretManager.stop();
|
|
||||||
}
|
|
||||||
if (containerTokenSecretManager != null) {
|
|
||||||
containerTokenSecretManager.stop();
|
|
||||||
}
|
|
||||||
if(nmTokenSecretManager != null) {
|
|
||||||
nmTokenSecretManager.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
DefaultMetricsSystem.shutdown();
|
DefaultMetricsSystem.shutdown();
|
||||||
|
|
||||||
|
@ -939,30 +883,15 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
|
|
||||||
protected ResourceTrackerService createResourceTrackerService() {
|
protected ResourceTrackerService createResourceTrackerService() {
|
||||||
return new ResourceTrackerService(this.rmContext, this.nodesListManager,
|
return new ResourceTrackerService(this.rmContext, this.nodesListManager,
|
||||||
this.nmLivelinessMonitor, this.containerTokenSecretManager,
|
this.nmLivelinessMonitor,
|
||||||
this.nmTokenSecretManager);
|
this.rmContext.getContainerTokenSecretManager(),
|
||||||
}
|
this.rmContext.getNMTokenSecretManager());
|
||||||
|
|
||||||
protected RMDelegationTokenSecretManager
|
|
||||||
createRMDelegationTokenSecretManager(RMContext rmContext) {
|
|
||||||
long secretKeyInterval =
|
|
||||||
conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY,
|
|
||||||
YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
|
|
||||||
long tokenMaxLifetime =
|
|
||||||
conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY,
|
|
||||||
YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
|
|
||||||
long tokenRenewInterval =
|
|
||||||
conf.getLong(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
|
|
||||||
YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
|
|
||||||
|
|
||||||
return new RMDelegationTokenSecretManager(secretKeyInterval,
|
|
||||||
tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ClientRMService createClientRMService() {
|
protected ClientRMService createClientRMService() {
|
||||||
return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
|
return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
|
||||||
this.applicationACLsManager, this.queueACLsManager,
|
this.applicationACLsManager, this.queueACLsManager,
|
||||||
this.rmDTSecretManager);
|
getRMDTSecretManager());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ApplicationMasterService createApplicationMasterService() {
|
protected ApplicationMasterService createApplicationMasterService() {
|
||||||
|
@ -973,6 +902,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
return new AdminService(this, rmContext);
|
return new AdminService(this, rmContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected RMSecretManagerService createRMSecretManagerService() {
|
||||||
|
return new RMSecretManagerService(conf, rmContext);
|
||||||
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public ClientRMService getClientRMService() {
|
public ClientRMService getClientRMService() {
|
||||||
return this.clientRM;
|
return this.clientRM;
|
||||||
|
@ -1013,23 +946,28 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public RMContainerTokenSecretManager getRMContainerTokenSecretManager() {
|
public RMContainerTokenSecretManager getRMContainerTokenSecretManager() {
|
||||||
return this.containerTokenSecretManager;
|
return this.rmContext.getContainerTokenSecretManager();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public NMTokenSecretManagerInRM getRMNMTokenSecretManager() {
|
public NMTokenSecretManagerInRM getRMNMTokenSecretManager() {
|
||||||
return this.nmTokenSecretManager;
|
return this.rmContext.getNMTokenSecretManager();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public AMRMTokenSecretManager getAMRMTokenSecretManager(){
|
public AMRMTokenSecretManager getAMRMTokenSecretManager(){
|
||||||
return this.amRmTokenSecretManager;
|
return this.rmContext.getAMRMTokenSecretManager();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public RMDelegationTokenSecretManager getRMDTSecretManager(){
|
||||||
|
return this.rmContext.getRMDelegationTokenSecretManager();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void recover(RMState state) throws Exception {
|
public void recover(RMState state) throws Exception {
|
||||||
// recover RMdelegationTokenSecretManager
|
// recover RMdelegationTokenSecretManager
|
||||||
rmDTSecretManager.recover(state);
|
getRMDTSecretManager().recover(state);
|
||||||
|
|
||||||
// recover applications
|
// recover applications
|
||||||
rmAppManager.recover(state);
|
rmAppManager.recover(state);
|
||||||
|
|
|
@ -61,7 +61,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
|
@ -309,7 +310,7 @@ public class MockRM extends ResourceManager {
|
||||||
protected ClientRMService createClientRMService() {
|
protected ClientRMService createClientRMService() {
|
||||||
return new ClientRMService(getRMContext(), getResourceScheduler(),
|
return new ClientRMService(getRMContext(), getResourceScheduler(),
|
||||||
rmAppManager, applicationACLsManager, queueACLsManager,
|
rmAppManager, applicationACLsManager, queueACLsManager,
|
||||||
rmDTSecretManager) {
|
getRMDTSecretManager()) {
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() {
|
protected void serviceStart() {
|
||||||
// override to not start rpc handler
|
// override to not start rpc handler
|
||||||
|
@ -325,8 +326,12 @@ public class MockRM extends ResourceManager {
|
||||||
@Override
|
@Override
|
||||||
protected ResourceTrackerService createResourceTrackerService() {
|
protected ResourceTrackerService createResourceTrackerService() {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
|
||||||
|
RMContainerTokenSecretManager containerTokenSecretManager =
|
||||||
|
getRMContainerTokenSecretManager();
|
||||||
containerTokenSecretManager.rollMasterKey();
|
containerTokenSecretManager.rollMasterKey();
|
||||||
|
NMTokenSecretManagerInRM nmTokenSecretManager =
|
||||||
|
getRMNMTokenSecretManager();
|
||||||
nmTokenSecretManager.rollMasterKey();
|
nmTokenSecretManager.rollMasterKey();
|
||||||
return new ResourceTrackerService(getRMContext(), nodesListManager,
|
return new ResourceTrackerService(getRMContext(), nodesListManager,
|
||||||
this.nmLivelinessMonitor, containerTokenSecretManager,
|
this.nmLivelinessMonitor, containerTokenSecretManager,
|
||||||
|
@ -398,12 +403,8 @@ public class MockRM extends ResourceManager {
|
||||||
return this.nodesListManager;
|
return this.nodesListManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
public RMDelegationTokenSecretManager getRMDTSecretManager() {
|
|
||||||
return this.rmDTSecretManager;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
|
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
|
||||||
return this.clientToAMSecretManager;
|
return this.getRMContext().getClientToAMTokenSecretManager();
|
||||||
}
|
}
|
||||||
|
|
||||||
public RMAppManager getRMAppManager() {
|
public RMAppManager getRMAppManager() {
|
||||||
|
|
|
@ -86,7 +86,7 @@ public abstract class QueueACLsTestBase {
|
||||||
protected ClientRMService createClientRMService() {
|
protected ClientRMService createClientRMService() {
|
||||||
return new ClientRMService(getRMContext(), this.scheduler,
|
return new ClientRMService(getRMContext(), this.scheduler,
|
||||||
this.rmAppManager, this.applicationACLsManager,
|
this.rmAppManager, this.applicationACLsManager,
|
||||||
this.queueACLsManager, this.rmDTSecretManager);
|
this.queueACLsManager, getRMDTSecretManager());
|
||||||
};
|
};
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -133,7 +133,7 @@ public class TestClientRMService {
|
||||||
protected ClientRMService createClientRMService() {
|
protected ClientRMService createClientRMService() {
|
||||||
return new ClientRMService(this.rmContext, scheduler,
|
return new ClientRMService(this.rmContext, scheduler,
|
||||||
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
|
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
|
||||||
this.rmDTSecretManager);
|
this.getRMDTSecretManager());
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
rm.start();
|
rm.start();
|
||||||
|
|
|
@ -1567,7 +1567,7 @@ public class TestRMRestart {
|
||||||
@Override
|
@Override
|
||||||
protected ClientRMService createClientRMService() {
|
protected ClientRMService createClientRMService() {
|
||||||
return new ClientRMService(getRMContext(), getResourceScheduler(),
|
return new ClientRMService(getRMContext(), getResourceScheduler(),
|
||||||
rmAppManager, applicationACLsManager, null, rmDTSecretManager){
|
rmAppManager, applicationACLsManager, null, getRMDTSecretManager()){
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
// do nothing
|
// do nothing
|
||||||
|
|
|
@ -168,7 +168,7 @@ public class TestClientToAMTokens {
|
||||||
protected ClientRMService createClientRMService() {
|
protected ClientRMService createClientRMService() {
|
||||||
return new ClientRMService(this.rmContext, scheduler,
|
return new ClientRMService(this.rmContext, scheduler,
|
||||||
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
|
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
|
||||||
this.rmDTSecretManager);
|
getRMDTSecretManager());
|
||||||
};
|
};
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||||
|
@ -166,13 +167,21 @@ public class TestRMDelegationTokens {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected RMDelegationTokenSecretManager
|
protected RMSecretManagerService createRMSecretManagerService() {
|
||||||
createRMDelegationTokenSecretManager(RMContext rmContext) {
|
return new RMSecretManagerService(conf, rmContext) {
|
||||||
// KeyUpdateInterval-> 1 seconds
|
|
||||||
// TokenMaxLifetime-> 2 seconds.
|
@Override
|
||||||
return new TestRMDelegationTokenSecretManager(1000, 1000, 2000, 1000,
|
protected RMDelegationTokenSecretManager
|
||||||
rmContext);
|
createRMDelegationTokenSecretManager(Configuration conf,
|
||||||
|
RMContext rmContext) {
|
||||||
|
// KeyUpdateInterval-> 1 seconds
|
||||||
|
// TokenMaxLifetime-> 2 seconds.
|
||||||
|
return new TestRMDelegationTokenSecretManager(1000, 1000, 2000, 1000,
|
||||||
|
rmContext);
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public class TestRMDelegationTokenSecretManager extends
|
public class TestRMDelegationTokenSecretManager extends
|
||||||
|
|
Loading…
Reference in New Issue